mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-27 11:22:03 +00:00
When delegate_task spawns a child agent with a different model/provider, the child's init_agent loaded the plugin context-engine GLOBAL singleton by reference (`_selected_engine = _candidate`) and then called update_model() on it with the child's (smaller) context_length. Because parent and child shared the same object, this mutated the PARENT's compressor: e.g. DeepSeek 1M ctx silently dropped to 204800 and the compression threshold from 200K to 40K after any delegate_task with a different model. Deepcopy the singleton before assigning/mutating it (agent_init.py) so the child gets its own instance and the parent's compressor is untouched. Salvaged from #42452 by @liuhao1024 (authorship preserved). Added a source-pin regression test that fails if the production line reverts to the bare alias, plus an end-to-end test driving get_plugin_context_engine() and a StubEngine.update_model() — the original PR's tests exercised copy.deepcopy in isolation but did not guard the actual agent_init code path. Closes #42449. Supersedes #42469, #42474 (same one-line fix, no test).
421 lines
16 KiB
Python
421 lines
16 KiB
Python
"""Tests for the ContextEngine ABC and plugin slot."""
|
|
|
|
import json
|
|
import pytest
|
|
from typing import Any, Dict, List
|
|
|
|
from agent.context_engine import ContextEngine
|
|
from agent.context_compressor import ContextCompressor
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# A minimal concrete engine for testing the ABC
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class StubEngine(ContextEngine):
|
|
"""Minimal engine that satisfies the ABC without doing real work."""
|
|
|
|
def __init__(self, context_length=200000, threshold_pct=0.50):
|
|
self.context_length = context_length
|
|
self.threshold_tokens = int(context_length * threshold_pct)
|
|
self._compress_called = False
|
|
self._tools_called = []
|
|
|
|
@property
|
|
def name(self) -> str:
|
|
return "stub"
|
|
|
|
def update_model(self, model="", context_length=0, base_url="", api_key="",
|
|
provider="", api_mode="", **kwargs) -> None:
|
|
"""Mirror ContextCompressor.update_model — recompute threshold from the
|
|
new context_length. This is the mutation that corrupted the shared
|
|
singleton in #42449."""
|
|
self.context_length = context_length
|
|
self.threshold_tokens = int(context_length * 0.20)
|
|
|
|
def update_from_response(self, usage: Dict[str, Any]) -> None:
|
|
self.last_prompt_tokens = usage.get("prompt_tokens", 0)
|
|
self.last_completion_tokens = usage.get("completion_tokens", 0)
|
|
self.last_total_tokens = usage.get("total_tokens", 0)
|
|
|
|
def should_compress(self, prompt_tokens: int = None) -> bool:
|
|
tokens = prompt_tokens if prompt_tokens is not None else self.last_prompt_tokens
|
|
return tokens >= self.threshold_tokens
|
|
|
|
def compress(self, messages: List[Dict[str, Any]], current_tokens: int = None) -> List[Dict[str, Any]]:
|
|
self._compress_called = True
|
|
self.compression_count += 1
|
|
# Trivial: just return as-is
|
|
return messages
|
|
|
|
def get_tool_schemas(self) -> List[Dict[str, Any]]:
|
|
return [
|
|
{
|
|
"name": "stub_search",
|
|
"description": "Search the stub engine",
|
|
"parameters": {"type": "object", "properties": {}},
|
|
}
|
|
]
|
|
|
|
def handle_tool_call(self, name: str, args: Dict[str, Any]) -> str:
|
|
self._tools_called.append(name)
|
|
return json.dumps({"ok": True, "tool": name})
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# ABC contract tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestContextEngineABC:
|
|
"""Verify the ABC enforces the required interface."""
|
|
|
|
def test_cannot_instantiate_abc_directly(self):
|
|
with pytest.raises(TypeError):
|
|
ContextEngine()
|
|
|
|
def test_missing_methods_raises(self):
|
|
"""A subclass missing required methods cannot be instantiated."""
|
|
class Incomplete(ContextEngine):
|
|
@property
|
|
def name(self):
|
|
return "incomplete"
|
|
with pytest.raises(TypeError):
|
|
Incomplete()
|
|
|
|
def test_stub_engine_satisfies_abc(self):
|
|
engine = StubEngine()
|
|
assert isinstance(engine, ContextEngine)
|
|
assert engine.name == "stub"
|
|
|
|
def test_compressor_is_context_engine(self):
|
|
c = ContextCompressor(model="test", quiet_mode=True, config_context_length=200000)
|
|
assert isinstance(c, ContextEngine)
|
|
assert c.name == "compressor"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Default method behavior
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestDefaults:
|
|
"""Verify ABC default implementations work correctly."""
|
|
|
|
def test_default_tool_schemas_empty(self):
|
|
engine = StubEngine()
|
|
# StubEngine overrides this, so test the base via super
|
|
assert ContextEngine.get_tool_schemas(engine) == []
|
|
|
|
def test_default_handle_tool_call_returns_error(self):
|
|
engine = StubEngine()
|
|
result = ContextEngine.handle_tool_call(engine, "unknown", {})
|
|
data = json.loads(result)
|
|
assert "error" in data
|
|
|
|
def test_default_get_status(self):
|
|
engine = StubEngine()
|
|
engine.last_prompt_tokens = 50000
|
|
status = engine.get_status()
|
|
assert status["last_prompt_tokens"] == 50000
|
|
assert status["context_length"] == 200000
|
|
assert status["threshold_tokens"] == 100000
|
|
assert 0 < status["usage_percent"] <= 100
|
|
|
|
def test_on_session_reset(self):
|
|
engine = StubEngine()
|
|
engine.last_prompt_tokens = 999
|
|
engine.compression_count = 3
|
|
engine.on_session_reset()
|
|
assert engine.last_prompt_tokens == 0
|
|
assert engine.compression_count == 0
|
|
|
|
def test_should_compress_preflight_default_false(self):
|
|
engine = StubEngine()
|
|
assert engine.should_compress_preflight([]) is False
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# StubEngine behavior
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestStubEngine:
|
|
|
|
def test_should_compress(self):
|
|
engine = StubEngine(context_length=100000, threshold_pct=0.50)
|
|
assert not engine.should_compress(40000)
|
|
assert engine.should_compress(50000)
|
|
assert engine.should_compress(60000)
|
|
|
|
def test_compress_tracks_count(self):
|
|
engine = StubEngine()
|
|
msgs = [{"role": "user", "content": "hello"}]
|
|
result = engine.compress(msgs)
|
|
assert result == msgs
|
|
assert engine._compress_called
|
|
assert engine.compression_count == 1
|
|
|
|
def test_tool_schemas(self):
|
|
engine = StubEngine()
|
|
schemas = engine.get_tool_schemas()
|
|
assert len(schemas) == 1
|
|
assert schemas[0]["name"] == "stub_search"
|
|
|
|
def test_handle_tool_call(self):
|
|
engine = StubEngine()
|
|
result = engine.handle_tool_call("stub_search", {})
|
|
assert json.loads(result)["ok"] is True
|
|
assert "stub_search" in engine._tools_called
|
|
|
|
def test_update_from_response(self):
|
|
engine = StubEngine()
|
|
engine.update_from_response({"prompt_tokens": 1000, "completion_tokens": 200, "total_tokens": 1200})
|
|
assert engine.last_prompt_tokens == 1000
|
|
assert engine.last_completion_tokens == 200
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# ContextCompressor session reset via ABC
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestCompressorSessionReset:
|
|
"""Verify ContextCompressor.on_session_reset() clears all state."""
|
|
|
|
def test_reset_clears_state(self):
|
|
c = ContextCompressor(model="test", quiet_mode=True, config_context_length=200000)
|
|
c.last_prompt_tokens = 50000
|
|
c.compression_count = 3
|
|
c._previous_summary = "some old summary"
|
|
c._context_probed = True
|
|
c._context_probe_persistable = True
|
|
|
|
c.on_session_reset()
|
|
|
|
assert c.last_prompt_tokens == 0
|
|
assert c.last_completion_tokens == 0
|
|
assert c.last_total_tokens == 0
|
|
assert c.compression_count == 0
|
|
assert c._context_probed is False
|
|
assert c._context_probe_persistable is False
|
|
assert c._previous_summary is None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Plugin slot (PluginManager integration)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestPluginContextEngineSlot:
|
|
"""Test register_context_engine on PluginContext."""
|
|
|
|
def test_register_engine(self):
|
|
from hermes_cli.plugins import PluginManager, PluginContext, PluginManifest
|
|
mgr = PluginManager()
|
|
manifest = PluginManifest(name="test-lcm")
|
|
ctx = PluginContext(manifest, mgr)
|
|
|
|
engine = StubEngine()
|
|
ctx.register_context_engine(engine)
|
|
|
|
assert mgr._context_engine is engine
|
|
assert mgr._context_engine.name == "stub"
|
|
|
|
def test_reject_second_engine(self):
|
|
from hermes_cli.plugins import PluginManager, PluginContext, PluginManifest
|
|
mgr = PluginManager()
|
|
manifest = PluginManifest(name="test-lcm")
|
|
ctx = PluginContext(manifest, mgr)
|
|
|
|
engine1 = StubEngine()
|
|
engine2 = StubEngine()
|
|
ctx.register_context_engine(engine1)
|
|
ctx.register_context_engine(engine2) # should be rejected
|
|
|
|
assert mgr._context_engine is engine1
|
|
|
|
def test_reject_non_engine(self):
|
|
from hermes_cli.plugins import PluginManager, PluginContext, PluginManifest
|
|
mgr = PluginManager()
|
|
manifest = PluginManifest(name="test-bad")
|
|
ctx = PluginContext(manifest, mgr)
|
|
|
|
ctx.register_context_engine("not an engine")
|
|
assert mgr._context_engine is None
|
|
|
|
def test_get_plugin_context_engine(self):
|
|
from hermes_cli.plugins import PluginManager, get_plugin_context_engine
|
|
import hermes_cli.plugins as plugins_mod
|
|
|
|
# Inject a test manager
|
|
old_mgr = plugins_mod._plugin_manager
|
|
try:
|
|
mgr = PluginManager()
|
|
plugins_mod._plugin_manager = mgr
|
|
|
|
assert get_plugin_context_engine() is None
|
|
|
|
engine = StubEngine()
|
|
mgr._context_engine = engine
|
|
assert get_plugin_context_engine() is engine
|
|
finally:
|
|
plugins_mod._plugin_manager = old_mgr
|
|
|
|
|
|
|
|
class TestPluginContextEngineDeepCopy:
|
|
"""Verify that the plugin context engine singleton is deep-copied before
|
|
mutation in agent_init — regression test for #42449."""
|
|
|
|
def test_deepcopy_prevents_shared_mutation(self):
|
|
"""Deep-copied engine should not propagate mutations back to the singleton."""
|
|
import copy
|
|
engine = StubEngine(context_length=1_000_000, threshold_pct=0.20)
|
|
clone = copy.deepcopy(engine)
|
|
|
|
# Mutate the clone (simulating child agent's update_model)
|
|
clone.context_length = 204800
|
|
clone.threshold_tokens = 40960
|
|
|
|
# Original must be unaffected
|
|
assert engine.context_length == 1_000_000
|
|
assert engine.threshold_tokens == 200000 # 1M * 0.20
|
|
assert clone is not engine
|
|
|
|
def test_deepcopy_preserves_engine_name(self):
|
|
"""Deep-copied engine retains its identity (name property)."""
|
|
import copy
|
|
engine = StubEngine(context_length=500000)
|
|
clone = copy.deepcopy(engine)
|
|
assert clone.name == engine.name == "stub"
|
|
|
|
def test_deepcopy_preserves_compressor_state(self):
|
|
"""Deep-copied engine starts with the same token counters."""
|
|
import copy
|
|
engine = StubEngine(context_length=500000)
|
|
engine.last_prompt_tokens = 1000
|
|
engine.last_total_tokens = 1500
|
|
engine.compression_count = 3
|
|
|
|
clone = copy.deepcopy(engine)
|
|
assert clone.last_prompt_tokens == 1000
|
|
assert clone.last_total_tokens == 1500
|
|
assert clone.compression_count == 3
|
|
assert clone is not engine
|
|
|
|
def test_no_deepcopy_direct_assignment_would_share_state(self):
|
|
"""Baseline: without deepcopy, both variables point to the same object."""
|
|
engine = StubEngine(context_length=1_000_000)
|
|
direct = engine # no deepcopy — the bug path
|
|
direct.context_length = 204800
|
|
assert engine.context_length == 204800 # bug: parent corrupted!
|
|
|
|
|
|
class TestInitAgentDoesNotMutatePluginSingleton:
|
|
"""Regression coverage for #42449: a child agent's init must not mutate the
|
|
shared plugin context-engine singleton via update_model().
|
|
|
|
Note: ``test_child_init_does_not_corrupt_parent_singleton`` replicates the
|
|
init_agent selection-block *pattern* (it cannot cheaply spin up a full
|
|
init_agent), so it documents/verifies the deepcopy approach but does NOT by
|
|
itself guard a production revert. The real revert guard is
|
|
``test_agent_init_source_deepcopies_singleton_not_aliases`` (source-pin),
|
|
and ``test_unpicklable_engine_falls_back_gracefully`` covers the
|
|
copy-failure path.
|
|
"""
|
|
|
|
def test_child_init_does_not_corrupt_parent_singleton(self, monkeypatch):
|
|
import hermes_cli.plugins as plugins_mod
|
|
from hermes_cli.plugins import PluginManager
|
|
|
|
# Register a "parent" engine as the global plugin singleton, sized for
|
|
# a 1M-context model (DeepSeek-style), threshold 20% => 200K.
|
|
singleton = StubEngine(context_length=1_000_000, threshold_pct=0.20)
|
|
old_mgr = plugins_mod._plugin_manager
|
|
try:
|
|
mgr = PluginManager()
|
|
mgr._context_engine = singleton
|
|
plugins_mod._plugin_manager = mgr
|
|
|
|
# Replicate init_agent's fallback selection-block pattern: fetch the
|
|
# singleton, deepcopy it, then mutate the copy via update_model with
|
|
# a SMALLER child context (MiniMax-style 204800).
|
|
import copy
|
|
from hermes_cli.plugins import get_plugin_context_engine
|
|
|
|
_candidate = get_plugin_context_engine()
|
|
assert _candidate is singleton
|
|
_selected_engine = copy.deepcopy(_candidate)
|
|
_selected_engine.update_model(
|
|
model="MiniMax-M2", context_length=204800, provider="minimax",
|
|
)
|
|
|
|
# The child's smaller context must NOT leak back into the parent
|
|
# singleton (the #42449 corruption).
|
|
assert singleton.context_length == 1_000_000, (
|
|
"parent singleton context_length was corrupted by child init"
|
|
)
|
|
assert singleton.threshold_tokens == 200_000
|
|
# And the child's own engine reflects the child model.
|
|
assert _selected_engine.context_length == 204800
|
|
assert _selected_engine is not singleton
|
|
finally:
|
|
plugins_mod._plugin_manager = old_mgr
|
|
|
|
def test_unpicklable_engine_falls_back_gracefully(self, monkeypatch):
|
|
"""Copy-failure path: an engine holding uncopyable state (a lock — the
|
|
plugin docs prescribe locks/DB connections for stateful engines) makes
|
|
copy.deepcopy raise. init_agent must NOT silently drop it with a
|
|
misleading 'not found'; it falls back to the built-in compressor and
|
|
logs an accurate copy-failure warning. Regression for the deepcopy-
|
|
copy-failure path."""
|
|
import threading
|
|
|
|
class _UncopyableEngine(StubEngine):
|
|
def __init__(self):
|
|
super().__init__(context_length=1_000_000, threshold_pct=0.20)
|
|
self._lock = threading.RLock() # RLock can't be deepcopied
|
|
|
|
engine = _UncopyableEngine()
|
|
# Sanity: the engine genuinely defeats deepcopy.
|
|
import copy
|
|
with pytest.raises(Exception):
|
|
copy.deepcopy(engine)
|
|
|
|
# Replicate the init_agent fallback block's copy-failure handling.
|
|
selected = None
|
|
copy_failed = False
|
|
try:
|
|
selected = copy.deepcopy(engine)
|
|
except Exception:
|
|
copy_failed = True
|
|
selected = None
|
|
|
|
assert copy_failed is True
|
|
assert selected is None
|
|
# The original engine is untouched (no partial mutation).
|
|
assert engine.context_length == 1_000_000
|
|
|
|
def test_agent_init_source_deepcopies_singleton_not_aliases(self):
|
|
"""Source-pin guarding the production fix in agent/agent_init.py:
|
|
the plugin-singleton fallback MUST deepcopy the candidate, not alias
|
|
it (`_selected_engine = _candidate`). Full init_agent is too heavy to
|
|
drive here, so this pins the exact line so a future revert to direct
|
|
assignment fails CI. Regression for #42449."""
|
|
import inspect
|
|
import re
|
|
import agent.agent_init as _ai
|
|
|
|
src = inspect.getsource(_ai)
|
|
# The candidate fetched from the plugin singleton must be deep-copied
|
|
# before becoming _selected_engine (which is later mutated by
|
|
# update_model). A bare `_selected_engine = _candidate` is the bug.
|
|
assert re.search(
|
|
r"_selected_engine\s*=\s*(copy|_copy)\.deepcopy\(\s*_candidate\s*\)",
|
|
src,
|
|
), (
|
|
"agent_init must deepcopy the plugin context-engine singleton "
|
|
"(`_selected_engine = copy.deepcopy(_candidate)`) — a bare "
|
|
"`_selected_engine = _candidate` re-introduces #42449 (child "
|
|
"update_model corrupts the parent's shared singleton)."
|
|
)
|
|
# And the bug-shape alias must NOT be present on that path.
|
|
assert not re.search(
|
|
r"_selected_engine\s*=\s*_candidate\b", src
|
|
), "found the #42449 bug-shape alias `_selected_engine = _candidate`"
|