hermes-agent/agent/context_engine.py
teknium1 9b5dae17a5 feat(context-engine): host contract for external context engines
Condenses the substance of PRs #16453, #17453, #16451, #17600, and #13373
into a minimal generic host contract that external context engine plugins
(e.g. hermes-lcm) need to integrate cleanly. Drops scaffolding that
duplicated existing infrastructure or had marginal value.

Five concrete changes:

1. `_transition_context_engine_session()` on AIAgent — generic lifecycle
   helper that fires on_session_end → on_session_reset → on_session_start
   → optional carry_over_new_session_context. Engines implement only the
   hooks they need; missing hooks are skipped. Built-in compressor keeps
   its existing reset-only behavior because callers default to no
   metadata. `reset_session_state()` now optionally accepts
   previous_messages / old_session_id / carry_over_context and delegates
   to the transition helper when provided. (#16453)

2. `conversation_id` passed to `on_session_start()` — both the
   agent-init call site and the compression-boundary call site now
   forward `self._gateway_session_key` so plugin engines have a stable
   conversation identity that survives session_id rotation (compression
   splits, /new, resume). The key already existed on AIAgent; it just
   wasn't reaching engines. (#16453)

3. Canonical cache buckets forwarded to engines — the usage dict passed
   to `update_from_response()` now includes input_tokens, output_tokens,
   cache_read_tokens, cache_write_tokens, and reasoning_tokens on top of
   the legacy prompt/completion/total keys. Engines can make decisions on
   cache-hit ratios and reasoning costs instead of only aggregates. ABC
   docstring updated. (#17453)

4. Plugin-registered context engines visible in the picker —
   `_discover_context_engines()` in plugins_cmd.py now also includes
   engines registered via `ctx.register_context_engine()` from plugin
   manifests, deduplicating by name so repo-shipped descriptions win on
   collision. (#16451)

5. `_EngineCollector.register_command()` — context engines using the
   standard `register(ctx)` pattern can now expose slash commands (e.g.
   `/lcm`). Routes to the global plugin command registry with the same
   conflict-rejection policy regular plugins use (no shadowing built-ins,
   no clobbering other plugins). Previously these calls hit a no-op and
   the slash commands silently never appeared. (#17600)

Dropped from the original 5 PRs:

- Compression boundary signal (`boundary_reason="compression"`) from
  #16453 — already on main at `agent/conversation_compression.py:412-424`,
  landed via the bg-review extraction.

- `discover_plugins()` before fallback in run_agent.py from #16451 —
  redundant: `get_plugin_context_engine()` already routes through
  `_ensure_plugins_discovered()` which is idempotent.

- Runtime identity diagnostics method + helpers from #13373 (+251 LOC) —
  operators can already read engine state via `engine.get_status()`;
  the diagnostics view added marginal value relative to its surface area.

- The 553-LOC slash-command machinery from #17600 — replaced with a
  20-LOC `register_command` method on the collector that reuses the
  existing plugin command registry instead of building a parallel one.

Net: ~215 LOC of host-contract changes + 282 LOC of focused tests, vs
~1,176 LOC across the original 5 PRs.

Co-authored-by: Tosko4 <1294707+Tosko4@users.noreply.github.com>

Closes #16453.
Closes #17453.
Closes #16451.
Closes #17600.
Closes #13373.
Related: stephenschoettler/hermes-lcm#68.
2026-05-28 01:45:30 -07:00

217 lines
8.3 KiB
Python

"""Abstract base class for pluggable context engines.
A context engine controls how conversation context is managed when
approaching the model's token limit. The built-in ContextCompressor
is the default implementation. Third-party engines (e.g. LCM) can
replace it via the plugin system or by being placed in the
``plugins/context_engine/<name>/`` directory.
Selection is config-driven: ``context.engine`` in config.yaml.
Default is ``"compressor"`` (the built-in). Only one engine is active.
The engine is responsible for:
- Deciding when compaction should fire
- Performing compaction (summarization, DAG construction, etc.)
- Optionally exposing tools the agent can call (e.g. lcm_grep)
- Tracking token usage from API responses
Lifecycle:
1. Engine is instantiated and registered (plugin register() or default)
2. on_session_start() called when a conversation begins
3. update_from_response() called after each API response with usage data
4. should_compress() checked after each turn
5. compress() called when should_compress() returns True
6. on_session_end() called at real session boundaries (CLI exit, /reset,
gateway session expiry) — NOT per-turn
"""
from abc import ABC, abstractmethod
from typing import Any, Dict, List
class ContextEngine(ABC):
"""Base class all context engines must implement."""
# -- Identity ----------------------------------------------------------
@property
@abstractmethod
def name(self) -> str:
"""Short identifier (e.g. 'compressor', 'lcm')."""
# -- Token state (read by run_agent.py for display/logging) ------------
#
# Engines MUST maintain these. run_agent.py reads them directly.
last_prompt_tokens: int = 0
last_completion_tokens: int = 0
last_total_tokens: int = 0
threshold_tokens: int = 0
context_length: int = 0
compression_count: int = 0
# -- Compaction parameters (read by run_agent.py for preflight) --------
#
# These control the preflight compression check. Subclasses may
# override via __init__ or property; defaults are sensible for most
# engines.
#
# protect_first_n semantics (since PR #13754): count of non-system head
# messages always preserved verbatim, IN ADDITION to the system prompt
# which is always implicitly protected. Default 3 keeps the
# historical "system + first 3 non-system messages" head shape.
threshold_percent: float = 0.75
protect_first_n: int = 3
protect_last_n: int = 6
# -- Core interface ----------------------------------------------------
@abstractmethod
def update_from_response(self, usage: Dict[str, Any]) -> None:
"""Update tracked token usage from an API response.
Called after every LLM call with a normalized usage dict. The legacy
keys ``prompt_tokens``, ``completion_tokens``, and ``total_tokens``
are always present. Newer hosts also include canonical buckets:
``input_tokens``, ``output_tokens``, ``cache_read_tokens``,
``cache_write_tokens``, and ``reasoning_tokens``. Engines should
treat those fields as optional for compatibility with older hosts.
"""
@abstractmethod
def should_compress(self, prompt_tokens: int = None) -> bool:
"""Return True if compaction should fire this turn."""
@abstractmethod
def compress(
self,
messages: List[Dict[str, Any]],
current_tokens: int = None,
focus_topic: str = None,
) -> List[Dict[str, Any]]:
"""Compact the message list and return the new message list.
This is the main entry point. The engine receives the full message
list and returns a (possibly shorter) list that fits within the
context budget. The implementation is free to summarize, build a
DAG, or do anything else — as long as the returned list is a valid
OpenAI-format message sequence.
Args:
focus_topic: Optional topic string from manual ``/compress <focus>``.
Engines that support guided compression should prioritise
preserving information related to this topic. Engines that
don't support it may simply ignore this argument.
"""
# -- Optional: pre-flight check ----------------------------------------
def should_compress_preflight(self, messages: List[Dict[str, Any]]) -> bool:
"""Quick rough check before the API call (no real token count yet).
Default returns False (skip pre-flight). Override if your engine
can do a cheap estimate.
"""
return False
# -- Optional: manual /compress preflight ------------------------------
def has_content_to_compress(self, messages: List[Dict[str, Any]]) -> bool:
"""Quick check: is there anything in ``messages`` that can be compacted?
Used by the gateway ``/compress`` command as a preflight guard —
returning False lets the gateway report "nothing to compress yet"
without making an LLM call.
Default returns True (always attempt). Engines with a cheap way
to introspect their own head/tail boundaries should override this
to return False when the transcript is still entirely protected.
"""
return True
# -- Optional: session lifecycle ---------------------------------------
def on_session_start(self, session_id: str, **kwargs) -> None:
"""Called when a new conversation session begins.
Use this to load persisted state (DAG, store) for the session.
kwargs may include hermes_home, platform, model, etc.
"""
def on_session_end(self, session_id: str, messages: List[Dict[str, Any]]) -> None:
"""Called at real session boundaries (CLI exit, /reset, gateway expiry).
Use this to flush state, close DB connections, etc.
NOT called per-turn — only when the session truly ends.
"""
def on_session_reset(self) -> None:
"""Called on /new or /reset. Reset per-session state.
Default resets compression_count and token tracking.
"""
self.last_prompt_tokens = 0
self.last_completion_tokens = 0
self.last_total_tokens = 0
self.compression_count = 0
# -- Optional: tools ---------------------------------------------------
def get_tool_schemas(self) -> List[Dict[str, Any]]:
"""Return tool schemas this engine provides to the agent.
Default returns empty list (no tools). LCM would return schemas
for lcm_grep, lcm_describe, lcm_expand here.
"""
return []
def handle_tool_call(self, name: str, args: Dict[str, Any], **kwargs) -> str:
"""Handle a tool call from the agent.
Only called for tool names returned by get_tool_schemas().
Must return a JSON string.
kwargs may include:
messages: the current in-memory message list (for live ingestion)
"""
import json
return json.dumps({"error": f"Unknown context engine tool: {name}"})
# -- Optional: status / display ----------------------------------------
def get_status(self) -> Dict[str, Any]:
"""Return status dict for display/logging.
Default returns the standard fields run_agent.py expects.
"""
return {
"last_prompt_tokens": self.last_prompt_tokens,
"threshold_tokens": self.threshold_tokens,
"context_length": self.context_length,
"usage_percent": (
min(100, self.last_prompt_tokens / self.context_length * 100)
if self.context_length else 0
),
"compression_count": self.compression_count,
}
# -- Optional: model switch support ------------------------------------
def update_model(
self,
model: str,
context_length: int,
base_url: str = "",
api_key: str = "",
provider: str = "",
api_mode: str = "",
) -> None:
"""Called when the user switches models or on fallback activation.
Default updates context_length and recalculates threshold_tokens
from threshold_percent. Override if your engine needs more
(e.g. recalculate DAG budgets, switch summary models).
"""
self.context_length = context_length
self.threshold_tokens = int(context_length * self.threshold_percent)