hermes-agent/tests/agent/test_context_engine.py
liuhao1024 8d1f6debfd fix(agent): deepcopy plugin context engine to prevent parent corruption on delegate_task (#42449)
When delegate_task spawns a child agent with a different model/provider, the
child's init_agent loaded the plugin context-engine GLOBAL singleton by
reference (`_selected_engine = _candidate`) and then called update_model() on
it with the child's (smaller) context_length. Because parent and child shared
the same object, this mutated the PARENT's compressor: e.g. DeepSeek 1M ctx
silently dropped to 204800 and the compression threshold from 200K to 40K
after any delegate_task with a different model.

Deepcopy the singleton before assigning/mutating it (agent_init.py) so the
child gets its own instance and the parent's compressor is untouched.

Salvaged from #42452 by @liuhao1024 (authorship preserved). Added a
source-pin regression test that fails if the production line reverts to the
bare alias, plus an end-to-end test driving get_plugin_context_engine() and a
StubEngine.update_model() — the original PR's tests exercised copy.deepcopy in
isolation but did not guard the actual agent_init code path.

Closes #42449. Supersedes #42469, #42474 (same one-line fix, no test).
2026-06-25 02:13:26 +05:30

421 lines
16 KiB
Python

"""Tests for the ContextEngine ABC and plugin slot."""
import json
import pytest
from typing import Any, Dict, List
from agent.context_engine import ContextEngine
from agent.context_compressor import ContextCompressor
# ---------------------------------------------------------------------------
# A minimal concrete engine for testing the ABC
# ---------------------------------------------------------------------------
class StubEngine(ContextEngine):
"""Minimal engine that satisfies the ABC without doing real work."""
def __init__(self, context_length=200000, threshold_pct=0.50):
self.context_length = context_length
self.threshold_tokens = int(context_length * threshold_pct)
self._compress_called = False
self._tools_called = []
@property
def name(self) -> str:
return "stub"
def update_model(self, model="", context_length=0, base_url="", api_key="",
provider="", api_mode="", **kwargs) -> None:
"""Mirror ContextCompressor.update_model — recompute threshold from the
new context_length. This is the mutation that corrupted the shared
singleton in #42449."""
self.context_length = context_length
self.threshold_tokens = int(context_length * 0.20)
def update_from_response(self, usage: Dict[str, Any]) -> None:
self.last_prompt_tokens = usage.get("prompt_tokens", 0)
self.last_completion_tokens = usage.get("completion_tokens", 0)
self.last_total_tokens = usage.get("total_tokens", 0)
def should_compress(self, prompt_tokens: int = None) -> bool:
tokens = prompt_tokens if prompt_tokens is not None else self.last_prompt_tokens
return tokens >= self.threshold_tokens
def compress(self, messages: List[Dict[str, Any]], current_tokens: int = None) -> List[Dict[str, Any]]:
self._compress_called = True
self.compression_count += 1
# Trivial: just return as-is
return messages
def get_tool_schemas(self) -> List[Dict[str, Any]]:
return [
{
"name": "stub_search",
"description": "Search the stub engine",
"parameters": {"type": "object", "properties": {}},
}
]
def handle_tool_call(self, name: str, args: Dict[str, Any]) -> str:
self._tools_called.append(name)
return json.dumps({"ok": True, "tool": name})
# ---------------------------------------------------------------------------
# ABC contract tests
# ---------------------------------------------------------------------------
class TestContextEngineABC:
"""Verify the ABC enforces the required interface."""
def test_cannot_instantiate_abc_directly(self):
with pytest.raises(TypeError):
ContextEngine()
def test_missing_methods_raises(self):
"""A subclass missing required methods cannot be instantiated."""
class Incomplete(ContextEngine):
@property
def name(self):
return "incomplete"
with pytest.raises(TypeError):
Incomplete()
def test_stub_engine_satisfies_abc(self):
engine = StubEngine()
assert isinstance(engine, ContextEngine)
assert engine.name == "stub"
def test_compressor_is_context_engine(self):
c = ContextCompressor(model="test", quiet_mode=True, config_context_length=200000)
assert isinstance(c, ContextEngine)
assert c.name == "compressor"
# ---------------------------------------------------------------------------
# Default method behavior
# ---------------------------------------------------------------------------
class TestDefaults:
"""Verify ABC default implementations work correctly."""
def test_default_tool_schemas_empty(self):
engine = StubEngine()
# StubEngine overrides this, so test the base via super
assert ContextEngine.get_tool_schemas(engine) == []
def test_default_handle_tool_call_returns_error(self):
engine = StubEngine()
result = ContextEngine.handle_tool_call(engine, "unknown", {})
data = json.loads(result)
assert "error" in data
def test_default_get_status(self):
engine = StubEngine()
engine.last_prompt_tokens = 50000
status = engine.get_status()
assert status["last_prompt_tokens"] == 50000
assert status["context_length"] == 200000
assert status["threshold_tokens"] == 100000
assert 0 < status["usage_percent"] <= 100
def test_on_session_reset(self):
engine = StubEngine()
engine.last_prompt_tokens = 999
engine.compression_count = 3
engine.on_session_reset()
assert engine.last_prompt_tokens == 0
assert engine.compression_count == 0
def test_should_compress_preflight_default_false(self):
engine = StubEngine()
assert engine.should_compress_preflight([]) is False
# ---------------------------------------------------------------------------
# StubEngine behavior
# ---------------------------------------------------------------------------
class TestStubEngine:
def test_should_compress(self):
engine = StubEngine(context_length=100000, threshold_pct=0.50)
assert not engine.should_compress(40000)
assert engine.should_compress(50000)
assert engine.should_compress(60000)
def test_compress_tracks_count(self):
engine = StubEngine()
msgs = [{"role": "user", "content": "hello"}]
result = engine.compress(msgs)
assert result == msgs
assert engine._compress_called
assert engine.compression_count == 1
def test_tool_schemas(self):
engine = StubEngine()
schemas = engine.get_tool_schemas()
assert len(schemas) == 1
assert schemas[0]["name"] == "stub_search"
def test_handle_tool_call(self):
engine = StubEngine()
result = engine.handle_tool_call("stub_search", {})
assert json.loads(result)["ok"] is True
assert "stub_search" in engine._tools_called
def test_update_from_response(self):
engine = StubEngine()
engine.update_from_response({"prompt_tokens": 1000, "completion_tokens": 200, "total_tokens": 1200})
assert engine.last_prompt_tokens == 1000
assert engine.last_completion_tokens == 200
# ---------------------------------------------------------------------------
# ContextCompressor session reset via ABC
# ---------------------------------------------------------------------------
class TestCompressorSessionReset:
"""Verify ContextCompressor.on_session_reset() clears all state."""
def test_reset_clears_state(self):
c = ContextCompressor(model="test", quiet_mode=True, config_context_length=200000)
c.last_prompt_tokens = 50000
c.compression_count = 3
c._previous_summary = "some old summary"
c._context_probed = True
c._context_probe_persistable = True
c.on_session_reset()
assert c.last_prompt_tokens == 0
assert c.last_completion_tokens == 0
assert c.last_total_tokens == 0
assert c.compression_count == 0
assert c._context_probed is False
assert c._context_probe_persistable is False
assert c._previous_summary is None
# ---------------------------------------------------------------------------
# Plugin slot (PluginManager integration)
# ---------------------------------------------------------------------------
class TestPluginContextEngineSlot:
"""Test register_context_engine on PluginContext."""
def test_register_engine(self):
from hermes_cli.plugins import PluginManager, PluginContext, PluginManifest
mgr = PluginManager()
manifest = PluginManifest(name="test-lcm")
ctx = PluginContext(manifest, mgr)
engine = StubEngine()
ctx.register_context_engine(engine)
assert mgr._context_engine is engine
assert mgr._context_engine.name == "stub"
def test_reject_second_engine(self):
from hermes_cli.plugins import PluginManager, PluginContext, PluginManifest
mgr = PluginManager()
manifest = PluginManifest(name="test-lcm")
ctx = PluginContext(manifest, mgr)
engine1 = StubEngine()
engine2 = StubEngine()
ctx.register_context_engine(engine1)
ctx.register_context_engine(engine2) # should be rejected
assert mgr._context_engine is engine1
def test_reject_non_engine(self):
from hermes_cli.plugins import PluginManager, PluginContext, PluginManifest
mgr = PluginManager()
manifest = PluginManifest(name="test-bad")
ctx = PluginContext(manifest, mgr)
ctx.register_context_engine("not an engine")
assert mgr._context_engine is None
def test_get_plugin_context_engine(self):
from hermes_cli.plugins import PluginManager, get_plugin_context_engine
import hermes_cli.plugins as plugins_mod
# Inject a test manager
old_mgr = plugins_mod._plugin_manager
try:
mgr = PluginManager()
plugins_mod._plugin_manager = mgr
assert get_plugin_context_engine() is None
engine = StubEngine()
mgr._context_engine = engine
assert get_plugin_context_engine() is engine
finally:
plugins_mod._plugin_manager = old_mgr
class TestPluginContextEngineDeepCopy:
"""Verify that the plugin context engine singleton is deep-copied before
mutation in agent_init — regression test for #42449."""
def test_deepcopy_prevents_shared_mutation(self):
"""Deep-copied engine should not propagate mutations back to the singleton."""
import copy
engine = StubEngine(context_length=1_000_000, threshold_pct=0.20)
clone = copy.deepcopy(engine)
# Mutate the clone (simulating child agent's update_model)
clone.context_length = 204800
clone.threshold_tokens = 40960
# Original must be unaffected
assert engine.context_length == 1_000_000
assert engine.threshold_tokens == 200000 # 1M * 0.20
assert clone is not engine
def test_deepcopy_preserves_engine_name(self):
"""Deep-copied engine retains its identity (name property)."""
import copy
engine = StubEngine(context_length=500000)
clone = copy.deepcopy(engine)
assert clone.name == engine.name == "stub"
def test_deepcopy_preserves_compressor_state(self):
"""Deep-copied engine starts with the same token counters."""
import copy
engine = StubEngine(context_length=500000)
engine.last_prompt_tokens = 1000
engine.last_total_tokens = 1500
engine.compression_count = 3
clone = copy.deepcopy(engine)
assert clone.last_prompt_tokens == 1000
assert clone.last_total_tokens == 1500
assert clone.compression_count == 3
assert clone is not engine
def test_no_deepcopy_direct_assignment_would_share_state(self):
"""Baseline: without deepcopy, both variables point to the same object."""
engine = StubEngine(context_length=1_000_000)
direct = engine # no deepcopy — the bug path
direct.context_length = 204800
assert engine.context_length == 204800 # bug: parent corrupted!
class TestInitAgentDoesNotMutatePluginSingleton:
"""Regression coverage for #42449: a child agent's init must not mutate the
shared plugin context-engine singleton via update_model().
Note: ``test_child_init_does_not_corrupt_parent_singleton`` replicates the
init_agent selection-block *pattern* (it cannot cheaply spin up a full
init_agent), so it documents/verifies the deepcopy approach but does NOT by
itself guard a production revert. The real revert guard is
``test_agent_init_source_deepcopies_singleton_not_aliases`` (source-pin),
and ``test_unpicklable_engine_falls_back_gracefully`` covers the
copy-failure path.
"""
def test_child_init_does_not_corrupt_parent_singleton(self, monkeypatch):
import hermes_cli.plugins as plugins_mod
from hermes_cli.plugins import PluginManager
# Register a "parent" engine as the global plugin singleton, sized for
# a 1M-context model (DeepSeek-style), threshold 20% => 200K.
singleton = StubEngine(context_length=1_000_000, threshold_pct=0.20)
old_mgr = plugins_mod._plugin_manager
try:
mgr = PluginManager()
mgr._context_engine = singleton
plugins_mod._plugin_manager = mgr
# Replicate init_agent's fallback selection-block pattern: fetch the
# singleton, deepcopy it, then mutate the copy via update_model with
# a SMALLER child context (MiniMax-style 204800).
import copy
from hermes_cli.plugins import get_plugin_context_engine
_candidate = get_plugin_context_engine()
assert _candidate is singleton
_selected_engine = copy.deepcopy(_candidate)
_selected_engine.update_model(
model="MiniMax-M2", context_length=204800, provider="minimax",
)
# The child's smaller context must NOT leak back into the parent
# singleton (the #42449 corruption).
assert singleton.context_length == 1_000_000, (
"parent singleton context_length was corrupted by child init"
)
assert singleton.threshold_tokens == 200_000
# And the child's own engine reflects the child model.
assert _selected_engine.context_length == 204800
assert _selected_engine is not singleton
finally:
plugins_mod._plugin_manager = old_mgr
def test_unpicklable_engine_falls_back_gracefully(self, monkeypatch):
"""Copy-failure path: an engine holding uncopyable state (a lock — the
plugin docs prescribe locks/DB connections for stateful engines) makes
copy.deepcopy raise. init_agent must NOT silently drop it with a
misleading 'not found'; it falls back to the built-in compressor and
logs an accurate copy-failure warning. Regression for the deepcopy-
copy-failure path."""
import threading
class _UncopyableEngine(StubEngine):
def __init__(self):
super().__init__(context_length=1_000_000, threshold_pct=0.20)
self._lock = threading.RLock() # RLock can't be deepcopied
engine = _UncopyableEngine()
# Sanity: the engine genuinely defeats deepcopy.
import copy
with pytest.raises(Exception):
copy.deepcopy(engine)
# Replicate the init_agent fallback block's copy-failure handling.
selected = None
copy_failed = False
try:
selected = copy.deepcopy(engine)
except Exception:
copy_failed = True
selected = None
assert copy_failed is True
assert selected is None
# The original engine is untouched (no partial mutation).
assert engine.context_length == 1_000_000
def test_agent_init_source_deepcopies_singleton_not_aliases(self):
"""Source-pin guarding the production fix in agent/agent_init.py:
the plugin-singleton fallback MUST deepcopy the candidate, not alias
it (`_selected_engine = _candidate`). Full init_agent is too heavy to
drive here, so this pins the exact line so a future revert to direct
assignment fails CI. Regression for #42449."""
import inspect
import re
import agent.agent_init as _ai
src = inspect.getsource(_ai)
# The candidate fetched from the plugin singleton must be deep-copied
# before becoming _selected_engine (which is later mutated by
# update_model). A bare `_selected_engine = _candidate` is the bug.
assert re.search(
r"_selected_engine\s*=\s*(copy|_copy)\.deepcopy\(\s*_candidate\s*\)",
src,
), (
"agent_init must deepcopy the plugin context-engine singleton "
"(`_selected_engine = copy.deepcopy(_candidate)`) — a bare "
"`_selected_engine = _candidate` re-introduces #42449 (child "
"update_model corrupts the parent's shared singleton)."
)
# And the bug-shape alias must NOT be present on that path.
assert not re.search(
r"_selected_engine\s*=\s*_candidate\b", src
), "found the #42449 bug-shape alias `_selected_engine = _candidate`"