mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-25 00:51:20 +00:00
1014 lines
46 KiB
Python
1014 lines
46 KiB
Python
"""Tests for agent/context_compressor.py — compression logic, thresholds, truncation fallback."""
|
|
|
|
import pytest
|
|
from unittest.mock import patch, MagicMock
|
|
|
|
from agent.context_compressor import ContextCompressor, SUMMARY_PREFIX
|
|
|
|
|
|
@pytest.fixture()
|
|
def compressor():
|
|
"""Create a ContextCompressor with mocked dependencies."""
|
|
with patch("agent.context_compressor.get_model_context_length", return_value=100000):
|
|
c = ContextCompressor(
|
|
model="test/model",
|
|
threshold_percent=0.85,
|
|
protect_first_n=2,
|
|
protect_last_n=2,
|
|
quiet_mode=True,
|
|
)
|
|
return c
|
|
|
|
|
|
class TestShouldCompress:
|
|
def test_below_threshold(self, compressor):
|
|
compressor.last_prompt_tokens = 50000
|
|
assert compressor.should_compress() is False
|
|
|
|
def test_above_threshold(self, compressor):
|
|
compressor.last_prompt_tokens = 90000
|
|
assert compressor.should_compress() is True
|
|
|
|
def test_exact_threshold(self, compressor):
|
|
compressor.last_prompt_tokens = 85000
|
|
assert compressor.should_compress() is True
|
|
|
|
def test_explicit_tokens(self, compressor):
|
|
assert compressor.should_compress(prompt_tokens=90000) is True
|
|
assert compressor.should_compress(prompt_tokens=50000) is False
|
|
|
|
|
|
|
|
class TestUpdateFromResponse:
|
|
def test_updates_fields(self, compressor):
|
|
compressor.update_from_response({
|
|
"prompt_tokens": 5000,
|
|
"completion_tokens": 1000,
|
|
"total_tokens": 6000,
|
|
})
|
|
assert compressor.last_prompt_tokens == 5000
|
|
assert compressor.last_completion_tokens == 1000
|
|
|
|
def test_missing_fields_default_zero(self, compressor):
|
|
compressor.update_from_response({})
|
|
assert compressor.last_prompt_tokens == 0
|
|
|
|
|
|
|
|
class TestCompress:
|
|
def _make_messages(self, n):
|
|
return [{"role": "user" if i % 2 == 0 else "assistant", "content": f"msg {i}"} for i in range(n)]
|
|
|
|
def test_too_few_messages_returns_unchanged(self, compressor):
|
|
msgs = self._make_messages(4) # protect_first=2 + protect_last=2 + 1 = 5 needed
|
|
result = compressor.compress(msgs)
|
|
assert result == msgs
|
|
|
|
def test_truncation_fallback_no_client(self, compressor):
|
|
# compressor has client=None, so should use truncation fallback
|
|
msgs = [{"role": "system", "content": "System prompt"}] + self._make_messages(10)
|
|
result = compressor.compress(msgs)
|
|
assert len(result) < len(msgs)
|
|
# Should keep system message and last N
|
|
assert result[0]["role"] == "system"
|
|
assert compressor.compression_count == 1
|
|
|
|
def test_compression_increments_count(self, compressor):
|
|
msgs = self._make_messages(10)
|
|
compressor.compress(msgs)
|
|
assert compressor.compression_count == 1
|
|
compressor.compress(msgs)
|
|
assert compressor.compression_count == 2
|
|
|
|
def test_protects_first_and_last(self, compressor):
|
|
msgs = self._make_messages(10)
|
|
result = compressor.compress(msgs)
|
|
# First 2 messages should be preserved (protect_first_n=2)
|
|
# Last 2 messages should be preserved (protect_last_n=2)
|
|
assert result[-1]["content"] == msgs[-1]["content"]
|
|
# The second-to-last tail message may have the summary merged
|
|
# into it when a double-collision prevents a standalone summary
|
|
# (head=assistant, tail=user in this fixture). Verify the
|
|
# original content is present in either case.
|
|
assert msgs[-2]["content"] in result[-2]["content"]
|
|
|
|
|
|
class TestGenerateSummaryNoneContent:
|
|
"""Regression: content=None (from tool-call-only assistant messages) must not crash."""
|
|
|
|
def test_none_content_does_not_crash(self):
|
|
mock_response = MagicMock()
|
|
mock_response.choices = [MagicMock()]
|
|
mock_response.choices[0].message.content = "[CONTEXT SUMMARY]: tool calls happened"
|
|
|
|
with patch("agent.context_compressor.get_model_context_length", return_value=100000):
|
|
c = ContextCompressor(model="test", quiet_mode=True)
|
|
|
|
messages = [
|
|
{"role": "user", "content": "do something"},
|
|
{"role": "assistant", "content": None, "tool_calls": [
|
|
{"function": {"name": "search"}}
|
|
]},
|
|
{"role": "tool", "content": "result"},
|
|
{"role": "assistant", "content": None},
|
|
{"role": "user", "content": "thanks"},
|
|
]
|
|
|
|
with patch("agent.context_compressor.call_llm", return_value=mock_response):
|
|
summary = c._generate_summary(messages)
|
|
assert isinstance(summary, str)
|
|
assert summary.startswith(SUMMARY_PREFIX)
|
|
|
|
def test_none_content_in_system_message_compress(self):
|
|
"""System message with content=None should not crash during compress."""
|
|
with patch("agent.context_compressor.get_model_context_length", return_value=100000):
|
|
c = ContextCompressor(model="test", quiet_mode=True, protect_first_n=2, protect_last_n=2)
|
|
|
|
msgs = [{"role": "system", "content": None}] + [
|
|
{"role": "user" if i % 2 == 0 else "assistant", "content": f"msg {i}"}
|
|
for i in range(10)
|
|
]
|
|
result = c.compress(msgs)
|
|
assert len(result) < len(msgs)
|
|
|
|
|
|
class TestNonStringContent:
|
|
"""Regression: content as dict (e.g., llama.cpp tool calls) must not crash."""
|
|
|
|
def test_dict_content_coerced_to_string(self):
|
|
mock_response = MagicMock()
|
|
mock_response.choices = [MagicMock()]
|
|
mock_response.choices[0].message.content = {"text": "some summary"}
|
|
|
|
with patch("agent.context_compressor.get_model_context_length", return_value=100000):
|
|
c = ContextCompressor(model="test", quiet_mode=True)
|
|
|
|
messages = [
|
|
{"role": "user", "content": "do something"},
|
|
{"role": "assistant", "content": "ok"},
|
|
]
|
|
|
|
with patch("agent.context_compressor.call_llm", return_value=mock_response):
|
|
summary = c._generate_summary(messages)
|
|
assert isinstance(summary, str)
|
|
assert summary.startswith(SUMMARY_PREFIX)
|
|
|
|
def test_none_content_coerced_to_empty(self):
|
|
mock_response = MagicMock()
|
|
mock_response.choices = [MagicMock()]
|
|
mock_response.choices[0].message.content = None
|
|
|
|
with patch("agent.context_compressor.get_model_context_length", return_value=100000):
|
|
c = ContextCompressor(model="test", quiet_mode=True)
|
|
|
|
messages = [
|
|
{"role": "user", "content": "do something"},
|
|
{"role": "assistant", "content": "ok"},
|
|
]
|
|
|
|
with patch("agent.context_compressor.call_llm", return_value=mock_response):
|
|
summary = c._generate_summary(messages)
|
|
# None content → empty string → standardized compaction handoff prefix added
|
|
assert summary is not None
|
|
assert summary == SUMMARY_PREFIX
|
|
|
|
def test_summary_call_does_not_force_temperature(self):
|
|
mock_response = MagicMock()
|
|
mock_response.choices = [MagicMock()]
|
|
mock_response.choices[0].message.content = "ok"
|
|
|
|
with patch("agent.context_compressor.get_model_context_length", return_value=100000):
|
|
c = ContextCompressor(model="test", quiet_mode=True)
|
|
|
|
messages = [
|
|
{"role": "user", "content": "do something"},
|
|
{"role": "assistant", "content": "ok"},
|
|
]
|
|
|
|
with patch("agent.context_compressor.call_llm", return_value=mock_response) as mock_call:
|
|
c._generate_summary(messages)
|
|
|
|
kwargs = mock_call.call_args.kwargs
|
|
assert "temperature" not in kwargs
|
|
|
|
def test_summary_call_passes_live_main_runtime(self):
|
|
mock_response = MagicMock()
|
|
mock_response.choices = [MagicMock()]
|
|
mock_response.choices[0].message.content = "ok"
|
|
|
|
with patch("agent.context_compressor.get_model_context_length", return_value=100000):
|
|
c = ContextCompressor(
|
|
model="gpt-5.4",
|
|
provider="openai-codex",
|
|
base_url="https://chatgpt.com/backend-api/codex",
|
|
api_key="codex-token",
|
|
api_mode="codex_responses",
|
|
quiet_mode=True,
|
|
)
|
|
|
|
messages = [
|
|
{"role": "user", "content": "do something"},
|
|
{"role": "assistant", "content": "ok"},
|
|
]
|
|
|
|
with patch("agent.context_compressor.call_llm", return_value=mock_response) as mock_call:
|
|
c._generate_summary(messages)
|
|
|
|
assert mock_call.call_args.kwargs["main_runtime"] == {
|
|
"model": "gpt-5.4",
|
|
"provider": "openai-codex",
|
|
"base_url": "https://chatgpt.com/backend-api/codex",
|
|
"api_key": "codex-token",
|
|
"api_mode": "codex_responses",
|
|
}
|
|
|
|
|
|
class TestSummaryFailureCooldown:
|
|
def test_summary_failure_enters_cooldown_and_skips_retry(self):
|
|
with patch("agent.context_compressor.get_model_context_length", return_value=100000):
|
|
c = ContextCompressor(model="test", quiet_mode=True)
|
|
|
|
messages = [
|
|
{"role": "user", "content": "do something"},
|
|
{"role": "assistant", "content": "ok"},
|
|
]
|
|
|
|
with patch("agent.context_compressor.call_llm", side_effect=Exception("boom")) as mock_call:
|
|
first = c._generate_summary(messages)
|
|
second = c._generate_summary(messages)
|
|
|
|
assert first is None
|
|
assert second is None
|
|
assert mock_call.call_count == 1
|
|
|
|
|
|
class TestSummaryPrefixNormalization:
|
|
def test_legacy_prefix_is_replaced(self):
|
|
summary = ContextCompressor._with_summary_prefix("[CONTEXT SUMMARY]: did work")
|
|
assert summary == f"{SUMMARY_PREFIX}\ndid work"
|
|
|
|
def test_existing_new_prefix_is_not_duplicated(self):
|
|
summary = ContextCompressor._with_summary_prefix(f"{SUMMARY_PREFIX}\ndid work")
|
|
assert summary == f"{SUMMARY_PREFIX}\ndid work"
|
|
|
|
|
|
class TestCompressWithClient:
|
|
def test_system_content_list_gets_compression_note_without_crashing(self):
|
|
mock_response = MagicMock()
|
|
mock_response.choices = [MagicMock()]
|
|
mock_response.choices[0].message.content = "summary text"
|
|
|
|
with patch("agent.context_compressor.get_model_context_length", return_value=100000):
|
|
c = ContextCompressor(model="test", quiet_mode=True, protect_first_n=2, protect_last_n=2)
|
|
|
|
msgs = [
|
|
{"role": "system", "content": [{"type": "text", "text": "system prompt"}]},
|
|
{"role": "user", "content": "msg 1"},
|
|
{"role": "assistant", "content": "msg 2"},
|
|
{"role": "user", "content": "msg 3"},
|
|
{"role": "assistant", "content": "msg 4"},
|
|
{"role": "user", "content": "msg 5"},
|
|
{"role": "assistant", "content": "msg 6"},
|
|
{"role": "user", "content": "msg 7"},
|
|
]
|
|
|
|
with patch("agent.context_compressor.call_llm", return_value=mock_response):
|
|
result = c.compress(msgs)
|
|
|
|
assert isinstance(result[0]["content"], list)
|
|
assert any(
|
|
isinstance(block, dict)
|
|
and "compacted into a handoff summary" in block.get("text", "")
|
|
for block in result[0]["content"]
|
|
)
|
|
|
|
def test_summarization_path(self):
|
|
mock_client = MagicMock()
|
|
mock_response = MagicMock()
|
|
mock_response.choices = [MagicMock()]
|
|
mock_response.choices[0].message.content = "[CONTEXT SUMMARY]: stuff happened"
|
|
mock_client.chat.completions.create.return_value = mock_response
|
|
|
|
with patch("agent.context_compressor.get_model_context_length", return_value=100000):
|
|
c = ContextCompressor(model="test", quiet_mode=True, protect_first_n=2, protect_last_n=2)
|
|
|
|
msgs = [{"role": "user" if i % 2 == 0 else "assistant", "content": f"msg {i}"} for i in range(10)]
|
|
with patch("agent.context_compressor.call_llm", return_value=mock_response):
|
|
result = c.compress(msgs)
|
|
|
|
# Should have summary message in the middle
|
|
contents = [m.get("content", "") for m in result]
|
|
assert any(c.startswith(SUMMARY_PREFIX) for c in contents)
|
|
assert len(result) < len(msgs)
|
|
|
|
def test_summarization_does_not_split_tool_call_pairs(self):
|
|
mock_client = MagicMock()
|
|
mock_response = MagicMock()
|
|
mock_response.choices = [MagicMock()]
|
|
mock_response.choices[0].message.content = "[CONTEXT SUMMARY]: compressed middle"
|
|
mock_client.chat.completions.create.return_value = mock_response
|
|
|
|
with patch("agent.context_compressor.get_model_context_length", return_value=100000):
|
|
c = ContextCompressor(
|
|
model="test",
|
|
quiet_mode=True,
|
|
protect_first_n=3,
|
|
protect_last_n=4,
|
|
)
|
|
|
|
msgs = [
|
|
{"role": "user", "content": "Could you address the reviewer comments in PR#71"},
|
|
{
|
|
"role": "assistant",
|
|
"content": "",
|
|
"tool_calls": [
|
|
{"id": "call_a", "type": "function", "function": {"name": "skill_view", "arguments": "{}"}},
|
|
{"id": "call_b", "type": "function", "function": {"name": "skill_view", "arguments": "{}"}},
|
|
],
|
|
},
|
|
{"role": "tool", "tool_call_id": "call_a", "content": "output a"},
|
|
{"role": "tool", "tool_call_id": "call_b", "content": "output b"},
|
|
{"role": "user", "content": "later 1"},
|
|
{"role": "assistant", "content": "later 2"},
|
|
{"role": "tool", "tool_call_id": "call_x", "content": "later output"},
|
|
{"role": "assistant", "content": "later 3"},
|
|
{"role": "user", "content": "later 4"},
|
|
]
|
|
|
|
with patch("agent.context_compressor.call_llm", return_value=mock_response):
|
|
result = c.compress(msgs)
|
|
|
|
answered_ids = {
|
|
msg.get("tool_call_id")
|
|
for msg in result
|
|
if msg.get("role") == "tool" and msg.get("tool_call_id")
|
|
}
|
|
for msg in result:
|
|
if msg.get("role") == "assistant" and msg.get("tool_calls"):
|
|
for tc in msg["tool_calls"]:
|
|
assert tc["id"] in answered_ids
|
|
|
|
def test_summary_role_avoids_consecutive_user_messages(self):
|
|
"""Summary role should alternate with the last head message to avoid consecutive same-role messages."""
|
|
mock_client = MagicMock()
|
|
mock_response = MagicMock()
|
|
mock_response.choices = [MagicMock()]
|
|
mock_response.choices[0].message.content = "[CONTEXT SUMMARY]: stuff happened"
|
|
mock_client.chat.completions.create.return_value = mock_response
|
|
|
|
with patch("agent.context_compressor.get_model_context_length", return_value=100000):
|
|
c = ContextCompressor(model="test", quiet_mode=True, protect_first_n=2, protect_last_n=2)
|
|
|
|
# Last head message (index 1) is "assistant" → summary should be "user".
|
|
# With min_tail=3, tail = last 3 messages (indices 5-7).
|
|
# head_last=assistant, tail_first=assistant → summary_role="user", no collision.
|
|
# Need 8 messages: min_for_compress = 2+3+1 = 6, must have > 6.
|
|
msgs = [
|
|
{"role": "user", "content": "msg 0"},
|
|
{"role": "assistant", "content": "msg 1"},
|
|
{"role": "user", "content": "msg 2"},
|
|
{"role": "assistant", "content": "msg 3"},
|
|
{"role": "user", "content": "msg 4"},
|
|
{"role": "assistant", "content": "msg 5"},
|
|
{"role": "user", "content": "msg 6"},
|
|
{"role": "assistant", "content": "msg 7"},
|
|
]
|
|
with patch("agent.context_compressor.call_llm", return_value=mock_response):
|
|
result = c.compress(msgs)
|
|
summary_msg = [
|
|
m for m in result if (m.get("content") or "").startswith(SUMMARY_PREFIX)
|
|
]
|
|
assert len(summary_msg) == 1
|
|
assert summary_msg[0]["role"] == "user"
|
|
|
|
def test_summary_role_avoids_consecutive_user_when_head_ends_with_user(self):
|
|
"""When last head message is 'user', summary must be 'assistant' to avoid two consecutive user messages."""
|
|
mock_client = MagicMock()
|
|
mock_response = MagicMock()
|
|
mock_response.choices = [MagicMock()]
|
|
mock_response.choices[0].message.content = "[CONTEXT SUMMARY]: stuff happened"
|
|
mock_client.chat.completions.create.return_value = mock_response
|
|
|
|
with patch("agent.context_compressor.get_model_context_length", return_value=100000):
|
|
c = ContextCompressor(model="test", quiet_mode=True, protect_first_n=3, protect_last_n=2)
|
|
|
|
# Last head message (index 2) is "user" → summary should be "assistant"
|
|
msgs = [
|
|
{"role": "system", "content": "system prompt"},
|
|
{"role": "user", "content": "msg 1"},
|
|
{"role": "user", "content": "msg 2"}, # last head — user
|
|
{"role": "assistant", "content": "msg 3"},
|
|
{"role": "user", "content": "msg 4"},
|
|
{"role": "assistant", "content": "msg 5"},
|
|
{"role": "user", "content": "msg 6"},
|
|
{"role": "assistant", "content": "msg 7"},
|
|
]
|
|
with patch("agent.context_compressor.call_llm", return_value=mock_response):
|
|
result = c.compress(msgs)
|
|
summary_msg = [
|
|
m for m in result if (m.get("content") or "").startswith(SUMMARY_PREFIX)
|
|
]
|
|
assert len(summary_msg) == 1
|
|
assert summary_msg[0]["role"] == "assistant"
|
|
|
|
def test_summary_role_flips_to_avoid_tail_collision(self):
|
|
"""When summary role collides with the first tail message but flipping
|
|
doesn't collide with head, the role should be flipped."""
|
|
mock_response = MagicMock()
|
|
mock_response.choices = [MagicMock()]
|
|
mock_response.choices[0].message.content = "summary text"
|
|
|
|
with patch("agent.context_compressor.get_model_context_length", return_value=100000):
|
|
c = ContextCompressor(model="test", quiet_mode=True, protect_first_n=2, protect_last_n=2)
|
|
|
|
# Head ends with tool (index 1), tail starts with user (index 6).
|
|
# Default: tool → summary_role="user" → collides with tail.
|
|
# Flip to "assistant" → tool→assistant is fine.
|
|
msgs = [
|
|
{"role": "user", "content": "msg 0"},
|
|
{"role": "assistant", "content": "", "tool_calls": [
|
|
{"id": "call_1", "type": "function", "function": {"name": "t", "arguments": "{}"}},
|
|
]},
|
|
{"role": "tool", "tool_call_id": "call_1", "content": "result 1"},
|
|
{"role": "assistant", "content": "msg 3"},
|
|
{"role": "user", "content": "msg 4"},
|
|
{"role": "assistant", "content": "msg 5"},
|
|
{"role": "user", "content": "msg 6"},
|
|
{"role": "assistant", "content": "msg 7"},
|
|
]
|
|
with patch("agent.context_compressor.call_llm", return_value=mock_response):
|
|
result = c.compress(msgs)
|
|
# Verify no consecutive user or assistant messages
|
|
for i in range(1, len(result)):
|
|
r1 = result[i - 1].get("role")
|
|
r2 = result[i].get("role")
|
|
if r1 in ("user", "assistant") and r2 in ("user", "assistant"):
|
|
assert r1 != r2, f"consecutive {r1} at indices {i-1},{i}"
|
|
|
|
def test_double_collision_merges_summary_into_tail(self):
|
|
"""When neither role avoids collision with both neighbors, the summary
|
|
should be merged into the first tail message rather than creating a
|
|
standalone message that breaks role alternation.
|
|
|
|
Common scenario: head ends with 'assistant', tail starts with 'user'.
|
|
summary='user' collides with tail, summary='assistant' collides with head.
|
|
"""
|
|
mock_response = MagicMock()
|
|
mock_response.choices = [MagicMock()]
|
|
mock_response.choices[0].message.content = "summary text"
|
|
|
|
with patch("agent.context_compressor.get_model_context_length", return_value=100000):
|
|
c = ContextCompressor(model="test", quiet_mode=True, protect_first_n=3, protect_last_n=3)
|
|
|
|
# Head: [system, user, assistant] → last head = assistant
|
|
# Tail: [user, assistant, user] → first tail = user
|
|
# summary_role="user" collides with tail, "assistant" collides with head → merge
|
|
msgs = [
|
|
{"role": "system", "content": "system prompt"},
|
|
{"role": "user", "content": "msg 1"},
|
|
{"role": "assistant", "content": "msg 2"},
|
|
{"role": "user", "content": "msg 3"}, # compressed
|
|
{"role": "assistant", "content": "msg 4"}, # compressed
|
|
{"role": "user", "content": "msg 5"}, # compressed
|
|
{"role": "user", "content": "msg 6"}, # tail start
|
|
{"role": "assistant", "content": "msg 7"},
|
|
{"role": "user", "content": "msg 8"},
|
|
]
|
|
with patch("agent.context_compressor.call_llm", return_value=mock_response):
|
|
result = c.compress(msgs)
|
|
|
|
# Verify no consecutive user or assistant messages
|
|
for i in range(1, len(result)):
|
|
r1 = result[i - 1].get("role")
|
|
r2 = result[i].get("role")
|
|
if r1 in ("user", "assistant") and r2 in ("user", "assistant"):
|
|
assert r1 != r2, f"consecutive {r1} at indices {i-1},{i}"
|
|
|
|
# The summary text should be merged into the first tail message
|
|
first_tail = [m for m in result if "msg 6" in (m.get("content") or "")]
|
|
assert len(first_tail) == 1
|
|
assert "summary text" in first_tail[0]["content"]
|
|
|
|
def test_double_collision_merges_summary_into_list_tail_content(self):
|
|
"""Structured tail content should accept a merged summary without TypeError."""
|
|
mock_response = MagicMock()
|
|
mock_response.choices = [MagicMock()]
|
|
mock_response.choices[0].message.content = "summary text"
|
|
|
|
with patch("agent.context_compressor.get_model_context_length", return_value=100000):
|
|
c = ContextCompressor(model="test", quiet_mode=True, protect_first_n=3, protect_last_n=3)
|
|
|
|
msgs = [
|
|
{"role": "system", "content": "system prompt"},
|
|
{"role": "user", "content": "msg 1"},
|
|
{"role": "assistant", "content": "msg 2"},
|
|
{"role": "user", "content": "msg 3"},
|
|
{"role": "assistant", "content": "msg 4"},
|
|
{"role": "user", "content": "msg 5"},
|
|
{"role": "user", "content": [{"type": "text", "text": "msg 6"}]},
|
|
{"role": "assistant", "content": "msg 7"},
|
|
{"role": "user", "content": "msg 8"},
|
|
]
|
|
|
|
with patch("agent.context_compressor.call_llm", return_value=mock_response):
|
|
result = c.compress(msgs)
|
|
|
|
merged_tail = next(
|
|
m for m in result
|
|
if m.get("role") == "user" and isinstance(m.get("content"), list)
|
|
)
|
|
assert isinstance(merged_tail["content"], list)
|
|
assert "summary text" in merged_tail["content"][0]["text"]
|
|
assert any(
|
|
isinstance(block, dict) and block.get("text") == "msg 6"
|
|
for block in merged_tail["content"]
|
|
)
|
|
|
|
def test_double_collision_user_head_assistant_tail(self):
|
|
"""Reverse double collision: head ends with 'user', tail starts with 'assistant'.
|
|
summary='assistant' collides with tail, 'user' collides with head → merge."""
|
|
mock_response = MagicMock()
|
|
mock_response.choices = [MagicMock()]
|
|
mock_response.choices[0].message.content = "summary text"
|
|
|
|
with patch("agent.context_compressor.get_model_context_length", return_value=100000):
|
|
c = ContextCompressor(model="test", quiet_mode=True, protect_first_n=2, protect_last_n=2)
|
|
|
|
# Head: [system, user] → last head = user
|
|
# Tail: [assistant, user, assistant] → first tail = assistant
|
|
# summary_role="assistant" collides with tail, "user" collides with head → merge
|
|
# With min_tail=3, tail = last 3 messages (indices 5-7).
|
|
# Need 8 messages: min_for_compress = 2+3+1 = 6, must have > 6.
|
|
msgs = [
|
|
{"role": "system", "content": "system prompt"},
|
|
{"role": "user", "content": "msg 1"},
|
|
{"role": "assistant", "content": "msg 2"}, # compressed
|
|
{"role": "user", "content": "msg 3"}, # compressed
|
|
{"role": "assistant", "content": "msg 4"}, # compressed
|
|
{"role": "assistant", "content": "msg 5"}, # tail start
|
|
{"role": "user", "content": "msg 6"},
|
|
{"role": "assistant", "content": "msg 7"},
|
|
]
|
|
with patch("agent.context_compressor.call_llm", return_value=mock_response):
|
|
result = c.compress(msgs)
|
|
|
|
# Verify no consecutive user or assistant messages
|
|
for i in range(1, len(result)):
|
|
r1 = result[i - 1].get("role")
|
|
r2 = result[i].get("role")
|
|
if r1 in ("user", "assistant") and r2 in ("user", "assistant"):
|
|
assert r1 != r2, f"consecutive {r1} at indices {i-1},{i}"
|
|
|
|
# The summary should be merged into the first tail message (assistant at index 5)
|
|
first_tail = [m for m in result if "msg 5" in (m.get("content") or "")]
|
|
assert len(first_tail) == 1
|
|
assert "summary text" in first_tail[0]["content"]
|
|
|
|
def test_no_collision_scenarios_still_work(self):
|
|
"""Verify that the common no-collision cases (head=assistant/tail=assistant,
|
|
head=user/tail=user) still produce a standalone summary message."""
|
|
mock_response = MagicMock()
|
|
mock_response.choices = [MagicMock()]
|
|
mock_response.choices[0].message.content = "summary text"
|
|
|
|
with patch("agent.context_compressor.get_model_context_length", return_value=100000):
|
|
c = ContextCompressor(model="test", quiet_mode=True, protect_first_n=2, protect_last_n=2)
|
|
|
|
# Head=assistant, Tail=assistant → summary_role="user", no collision.
|
|
# With min_tail=3, tail = last 3 messages (indices 5-7).
|
|
# Need 8 messages: min_for_compress = 2+3+1 = 6, must have > 6.
|
|
msgs = [
|
|
{"role": "user", "content": "msg 0"},
|
|
{"role": "assistant", "content": "msg 1"},
|
|
{"role": "user", "content": "msg 2"},
|
|
{"role": "assistant", "content": "msg 3"},
|
|
{"role": "user", "content": "msg 4"},
|
|
{"role": "assistant", "content": "msg 5"},
|
|
{"role": "user", "content": "msg 6"},
|
|
{"role": "assistant", "content": "msg 7"},
|
|
]
|
|
with patch("agent.context_compressor.call_llm", return_value=mock_response):
|
|
result = c.compress(msgs)
|
|
summary_msgs = [m for m in result if (m.get("content") or "").startswith(SUMMARY_PREFIX)]
|
|
assert len(summary_msgs) == 1, "should have a standalone summary message"
|
|
assert summary_msgs[0]["role"] == "user"
|
|
|
|
def test_summarization_does_not_start_tail_with_tool_outputs(self):
|
|
mock_response = MagicMock()
|
|
mock_response.choices = [MagicMock()]
|
|
mock_response.choices[0].message.content = "[CONTEXT SUMMARY]: compressed middle"
|
|
|
|
with patch("agent.context_compressor.get_model_context_length", return_value=100000):
|
|
c = ContextCompressor(
|
|
model="test",
|
|
quiet_mode=True,
|
|
protect_first_n=2,
|
|
protect_last_n=3,
|
|
)
|
|
|
|
msgs = [
|
|
{"role": "user", "content": "earlier 1"},
|
|
{"role": "assistant", "content": "earlier 2"},
|
|
{"role": "user", "content": "earlier 3"},
|
|
{
|
|
"role": "assistant",
|
|
"content": "",
|
|
"tool_calls": [
|
|
{"id": "call_c", "type": "function", "function": {"name": "search_files", "arguments": "{}"}},
|
|
],
|
|
},
|
|
{"role": "tool", "tool_call_id": "call_c", "content": "output c"},
|
|
{"role": "user", "content": "latest user"},
|
|
]
|
|
|
|
with patch("agent.context_compressor.call_llm", return_value=mock_response):
|
|
result = c.compress(msgs)
|
|
|
|
called_ids = {
|
|
tc["id"]
|
|
for msg in result
|
|
if msg.get("role") == "assistant" and msg.get("tool_calls")
|
|
for tc in msg["tool_calls"]
|
|
}
|
|
for msg in result:
|
|
if msg.get("role") == "tool" and msg.get("tool_call_id"):
|
|
assert msg["tool_call_id"] in called_ids
|
|
|
|
|
|
class TestSummaryTargetRatio:
|
|
"""Verify that summary_target_ratio properly scales budgets with context window."""
|
|
|
|
def test_tail_budget_scales_with_context(self):
|
|
"""Tail token budget should be threshold_tokens * summary_target_ratio."""
|
|
with patch("agent.context_compressor.get_model_context_length", return_value=200_000):
|
|
c = ContextCompressor(model="test", quiet_mode=True, summary_target_ratio=0.40)
|
|
# 200K * 0.50 threshold * 0.40 ratio = 40K
|
|
assert c.tail_token_budget == 40_000
|
|
|
|
with patch("agent.context_compressor.get_model_context_length", return_value=1_000_000):
|
|
c = ContextCompressor(model="test", quiet_mode=True, summary_target_ratio=0.40)
|
|
# 1M * 0.50 threshold * 0.40 ratio = 200K
|
|
assert c.tail_token_budget == 200_000
|
|
|
|
def test_summary_cap_scales_with_context(self):
|
|
"""Max summary tokens should be 5% of context, capped at 12K."""
|
|
with patch("agent.context_compressor.get_model_context_length", return_value=200_000):
|
|
c = ContextCompressor(model="test", quiet_mode=True)
|
|
assert c.max_summary_tokens == 10_000 # 200K * 0.05
|
|
|
|
with patch("agent.context_compressor.get_model_context_length", return_value=1_000_000):
|
|
c = ContextCompressor(model="test", quiet_mode=True)
|
|
assert c.max_summary_tokens == 12_000 # capped at 12K ceiling
|
|
|
|
def test_ratio_clamped(self):
|
|
"""Ratio should be clamped to [0.10, 0.80]."""
|
|
with patch("agent.context_compressor.get_model_context_length", return_value=100_000):
|
|
c = ContextCompressor(model="test", quiet_mode=True, summary_target_ratio=0.05)
|
|
assert c.summary_target_ratio == 0.10
|
|
|
|
with patch("agent.context_compressor.get_model_context_length", return_value=100_000):
|
|
c = ContextCompressor(model="test", quiet_mode=True, summary_target_ratio=0.95)
|
|
assert c.summary_target_ratio == 0.80
|
|
|
|
def test_default_threshold_is_50_percent(self):
|
|
"""Default compression threshold should be 50%, with a 64K floor."""
|
|
with patch("agent.context_compressor.get_model_context_length", return_value=100_000):
|
|
c = ContextCompressor(model="test", quiet_mode=True)
|
|
assert c.threshold_percent == 0.50
|
|
# 50% of 100K = 50K, but the floor is 64K
|
|
assert c.threshold_tokens == 64_000
|
|
|
|
def test_threshold_floor_does_not_apply_above_128k(self):
|
|
"""On large-context models the 50% percentage is used directly."""
|
|
with patch("agent.context_compressor.get_model_context_length", return_value=200_000):
|
|
c = ContextCompressor(model="test", quiet_mode=True)
|
|
# 50% of 200K = 100K, which is above the 64K floor
|
|
assert c.threshold_tokens == 100_000
|
|
|
|
def test_default_protect_last_n_is_20(self):
|
|
"""Default protect_last_n should be 20."""
|
|
with patch("agent.context_compressor.get_model_context_length", return_value=100_000):
|
|
c = ContextCompressor(model="test", quiet_mode=True)
|
|
assert c.protect_last_n == 20
|
|
|
|
def test_default_protect_first_n_is_3(self):
|
|
"""Default protect_first_n should be 3 (system prompt + first exchange)."""
|
|
with patch("agent.context_compressor.get_model_context_length", return_value=100_000):
|
|
c = ContextCompressor(model="test", quiet_mode=True)
|
|
assert c.protect_first_n == 3
|
|
|
|
def test_protect_first_n_override(self):
|
|
"""protect_first_n=1 should be honoured — for users who rely on rolling
|
|
compaction and don't want the opening user/assistant turn pinned as head
|
|
indefinitely. Only the system prompt survives head-protection."""
|
|
with patch("agent.context_compressor.get_model_context_length", return_value=100_000):
|
|
c = ContextCompressor(model="test", quiet_mode=True, protect_first_n=1)
|
|
assert c.protect_first_n == 1
|
|
|
|
def test_protect_first_n_1_preserves_only_system_prompt(self):
|
|
"""End-to-end: when protect_first_n=1, compression should treat only
|
|
the first message (system prompt) as head. Messages 1..n-protect_last-1
|
|
become summarization candidates, unlike the default where messages 0-2
|
|
would all be pinned as head."""
|
|
with patch("agent.context_compressor.get_model_context_length", return_value=100_000):
|
|
c = ContextCompressor(
|
|
model="test",
|
|
quiet_mode=True,
|
|
protect_first_n=1,
|
|
protect_last_n=2,
|
|
)
|
|
msgs = (
|
|
[{"role": "system", "content": "System prompt"}]
|
|
+ [{"role": "user" if i % 2 == 0 else "assistant", "content": f"msg {i}"}
|
|
for i in range(8)]
|
|
)
|
|
result = c.compress(msgs)
|
|
# System prompt (msg[0]) survives as head
|
|
assert result[0]["role"] == "system"
|
|
assert result[0]["content"].startswith("System prompt")
|
|
# The first user/assistant exchange (msg 0, msg 1) should NOT be pinned
|
|
# as head verbatim — those would have been summarized or absorbed.
|
|
# Under default protect_first_n=3, result[1] and result[2] would be
|
|
# the literal "msg 0" / "msg 1"; with protect_first_n=1 they aren't.
|
|
assert result[1].get("content") != "msg 0"
|
|
# Last 2 messages are tail-protected under protect_last_n=2
|
|
assert result[-1]["content"] == msgs[-1]["content"]
|
|
|
|
|
|
class TestTokenBudgetTailProtection:
|
|
"""Tests for token-budget-based tail protection (PR #6240).
|
|
|
|
The core change: tail protection is now based on a token budget rather
|
|
than a fixed message count. This prevents large tool outputs from
|
|
blocking compaction.
|
|
"""
|
|
|
|
@pytest.fixture()
|
|
def budget_compressor(self):
|
|
"""Compressor with known token budget for tail protection tests."""
|
|
with patch("agent.context_compressor.get_model_context_length", return_value=200_000):
|
|
c = ContextCompressor(
|
|
model="test/model",
|
|
threshold_percent=0.50, # 100K threshold
|
|
protect_first_n=2,
|
|
protect_last_n=20,
|
|
quiet_mode=True,
|
|
)
|
|
return c
|
|
|
|
def test_large_tool_outputs_no_longer_block_compaction(self, budget_compressor):
|
|
"""The motivating scenario: 20 messages with large tool outputs should
|
|
NOT prevent compaction. With message-count tail protection they would
|
|
all be protected, leaving nothing to summarize."""
|
|
c = budget_compressor
|
|
messages = [
|
|
{"role": "user", "content": "Start task"},
|
|
{"role": "assistant", "content": "On it"},
|
|
]
|
|
# Add 20 messages with large tool outputs (~5K chars each ≈ 1250 tokens)
|
|
for i in range(10):
|
|
messages.append({
|
|
"role": "assistant", "content": None,
|
|
"tool_calls": [{"function": {"name": f"tool_{i}", "arguments": "{}"}}],
|
|
})
|
|
messages.append({
|
|
"role": "tool", "content": "x" * 5000,
|
|
"tool_call_id": f"call_{i}",
|
|
})
|
|
# Add 3 recent small messages
|
|
messages.append({"role": "user", "content": "What's the status?"})
|
|
messages.append({"role": "assistant", "content": "Here's what I found..."})
|
|
messages.append({"role": "user", "content": "Continue"})
|
|
|
|
# The tail cut should NOT protect all 20 tool messages
|
|
head_end = c.protect_first_n
|
|
cut = c._find_tail_cut_by_tokens(messages, head_end)
|
|
tail_size = len(messages) - cut
|
|
# With token budget, the tail should be much smaller than 20+
|
|
assert tail_size < 20, f"Tail {tail_size} messages — large tool outputs are blocking compaction"
|
|
# But at least 3 (hard minimum)
|
|
assert tail_size >= 3
|
|
|
|
def test_min_tail_always_3_messages(self, budget_compressor):
|
|
"""Even with a tiny token budget, at least 3 messages are protected."""
|
|
c = budget_compressor
|
|
# Override to a tiny budget
|
|
c.tail_token_budget = 10
|
|
messages = [
|
|
{"role": "user", "content": "hello"},
|
|
{"role": "assistant", "content": "hi"},
|
|
{"role": "user", "content": "do something"},
|
|
{"role": "assistant", "content": "working on it"},
|
|
{"role": "user", "content": "more work"},
|
|
{"role": "assistant", "content": "done"},
|
|
{"role": "user", "content": "thanks"},
|
|
]
|
|
head_end = 2
|
|
cut = c._find_tail_cut_by_tokens(messages, head_end)
|
|
tail_size = len(messages) - cut
|
|
assert tail_size >= 3, f"Tail is only {tail_size} messages, min should be 3"
|
|
|
|
def test_soft_ceiling_allows_oversized_message(self, budget_compressor):
|
|
"""The 1.5x soft ceiling allows an oversized message to be included
|
|
rather than splitting it."""
|
|
c = budget_compressor
|
|
# Set a small budget — 500 tokens
|
|
c.tail_token_budget = 500
|
|
messages = [
|
|
{"role": "user", "content": "hello"},
|
|
{"role": "assistant", "content": "hi"},
|
|
{"role": "user", "content": "read the file"},
|
|
# This message is ~600 tokens (> budget of 500, but < 1.5x = 750)
|
|
{"role": "assistant", "content": "a" * 2400},
|
|
{"role": "user", "content": "short"},
|
|
{"role": "assistant", "content": "short reply"},
|
|
{"role": "user", "content": "continue"},
|
|
]
|
|
head_end = 2
|
|
cut = c._find_tail_cut_by_tokens(messages, head_end)
|
|
# The oversized message at index 3 should NOT be the cut point
|
|
# because 1.5x ceiling = 750 tokens and accumulated would be ~610
|
|
# (short msgs + oversized msg) which is < 750
|
|
tail_size = len(messages) - cut
|
|
assert tail_size >= 3
|
|
|
|
def test_small_conversation_still_compresses(self, budget_compressor):
|
|
"""With the new min of 8 messages (head=2 + 3 + 1 guard + 2 middle),
|
|
a small but compressible conversation should still compress."""
|
|
c = budget_compressor
|
|
# 9 messages: head(2) + 4 middle + 3 tail = compressible
|
|
messages = []
|
|
for i in range(9):
|
|
role = "user" if i % 2 == 0 else "assistant"
|
|
messages.append({"role": role, "content": f"Message {i}"})
|
|
|
|
# Should not early-return (needs > protect_first_n + 3 + 1 = 6)
|
|
# Mock the summary generation to avoid real API call
|
|
with patch.object(c, "_generate_summary", return_value="Summary of conversation"):
|
|
result = c.compress(messages, current_tokens=90_000)
|
|
# Should have compressed (fewer messages than original)
|
|
assert len(result) < len(messages)
|
|
|
|
def test_prune_with_token_budget(self, budget_compressor):
|
|
"""_prune_old_tool_results with protect_tail_tokens respects the budget."""
|
|
c = budget_compressor
|
|
messages = [
|
|
{"role": "user", "content": "start"},
|
|
{"role": "assistant", "content": None,
|
|
"tool_calls": [{"function": {"name": "read_file", "arguments": '{"path": "big.txt"}'}}]},
|
|
{"role": "tool", "content": "x" * 10000, "tool_call_id": "c1"}, # ~2500 tokens
|
|
{"role": "assistant", "content": None,
|
|
"tool_calls": [{"function": {"name": "read_file", "arguments": '{"path": "small.txt"}'}}]},
|
|
{"role": "tool", "content": "y" * 10000, "tool_call_id": "c2"}, # ~2500 tokens
|
|
{"role": "user", "content": "short recent message"},
|
|
{"role": "assistant", "content": "short reply"},
|
|
]
|
|
# With a 1000-token budget, only the last couple messages should be protected
|
|
result, pruned = c._prune_old_tool_results(
|
|
messages, protect_tail_count=2, protect_tail_tokens=1000,
|
|
)
|
|
# At least one old tool result should have been pruned
|
|
assert pruned >= 1
|
|
|
|
def test_prune_without_token_budget_uses_message_count(self, budget_compressor):
|
|
"""Without protect_tail_tokens, falls back to message-count behavior."""
|
|
c = budget_compressor
|
|
messages = [
|
|
{"role": "user", "content": "start"},
|
|
{"role": "assistant", "content": None,
|
|
"tool_calls": [{"function": {"name": "tool", "arguments": "{}"}}]},
|
|
{"role": "tool", "content": "x" * 5000, "tool_call_id": "c1"},
|
|
{"role": "user", "content": "recent"},
|
|
{"role": "assistant", "content": "reply"},
|
|
]
|
|
# protect_tail_count=3 means last 3 messages protected
|
|
result, pruned = c._prune_old_tool_results(
|
|
messages, protect_tail_count=3,
|
|
)
|
|
# Tool at index 2 is outside the protected tail (last 3 = indices 2,3,4)
|
|
# so it might or might not be pruned depending on boundary
|
|
assert isinstance(pruned, int)
|
|
|
|
|
|
class TestTruncateToolCallArgsJson:
|
|
"""Regression tests for #11762.
|
|
|
|
The previous implementation produced invalid JSON by slicing
|
|
``function.arguments`` mid-string, which caused non-retryable 400s from
|
|
strict providers (observed on MiniMax) and stuck long sessions in a
|
|
re-send loop. The helper here must always emit parseable JSON whose
|
|
shape matches the original — shrunken, not corrupted.
|
|
"""
|
|
|
|
def _helper(self):
|
|
from agent.context_compressor import _truncate_tool_call_args_json
|
|
return _truncate_tool_call_args_json
|
|
|
|
def test_shrunken_args_remain_valid_json(self):
|
|
import json as _json
|
|
shrink = self._helper()
|
|
original = _json.dumps({
|
|
"path": "~/.hermes/skills/shopping/browser-setup-notes.md",
|
|
"content": "# Shopping Browser Setup Notes\n\n" + "abc " * 400,
|
|
})
|
|
assert len(original) > 500
|
|
shrunk = shrink(original)
|
|
parsed = _json.loads(shrunk) # must not raise
|
|
assert parsed["path"] == "~/.hermes/skills/shopping/browser-setup-notes.md"
|
|
assert parsed["content"].endswith("...[truncated]")
|
|
assert len(shrunk) < len(original)
|
|
|
|
def test_non_json_arguments_pass_through(self):
|
|
shrink = self._helper()
|
|
not_json = "this is not json at all, " * 50
|
|
assert shrink(not_json) == not_json
|
|
|
|
def test_short_string_leaves_unchanged(self):
|
|
import json as _json
|
|
shrink = self._helper()
|
|
payload = _json.dumps({"command": "ls -la", "cwd": "/tmp"})
|
|
assert _json.loads(shrink(payload)) == {"command": "ls -la", "cwd": "/tmp"}
|
|
|
|
def test_nested_structures_are_walked(self):
|
|
import json as _json
|
|
shrink = self._helper()
|
|
payload = _json.dumps({
|
|
"messages": [
|
|
{"role": "user", "content": "x" * 500},
|
|
{"role": "assistant", "content": "ok"},
|
|
],
|
|
"meta": {"note": "y" * 500},
|
|
})
|
|
parsed = _json.loads(shrink(payload))
|
|
assert parsed["messages"][0]["content"].endswith("...[truncated]")
|
|
assert parsed["messages"][1]["content"] == "ok"
|
|
assert parsed["meta"]["note"].endswith("...[truncated]")
|
|
|
|
def test_non_string_leaves_preserved(self):
|
|
import json as _json
|
|
shrink = self._helper()
|
|
payload = _json.dumps({
|
|
"retries": 3,
|
|
"enabled": True,
|
|
"timeout": None,
|
|
"items": [1, 2, 3],
|
|
"note": "z" * 500,
|
|
})
|
|
parsed = _json.loads(shrink(payload))
|
|
assert parsed["retries"] == 3
|
|
assert parsed["enabled"] is True
|
|
assert parsed["timeout"] is None
|
|
assert parsed["items"] == [1, 2, 3]
|
|
assert parsed["note"].endswith("...[truncated]")
|
|
|
|
def test_scalar_json_string_gets_shrunk(self):
|
|
import json as _json
|
|
shrink = self._helper()
|
|
payload = _json.dumps("q" * 500)
|
|
parsed = _json.loads(shrink(payload))
|
|
assert isinstance(parsed, str)
|
|
assert parsed.endswith("...[truncated]")
|
|
|
|
def test_unicode_preserved(self):
|
|
import json as _json
|
|
shrink = self._helper()
|
|
payload = _json.dumps({"content": "非德满" + ("a" * 500)})
|
|
out = shrink(payload)
|
|
# ensure_ascii=False keeps CJK intact rather than emitting \uXXXX
|
|
assert "非德满" in out
|
|
|
|
def test_pass3_emits_valid_json_for_downstream_provider(self):
|
|
"""End-to-end: Pass 3 must never produce the exact failure payload
|
|
that caused the 400 loop (unterminated string, missing brace)."""
|
|
import json as _json
|
|
with patch("agent.context_compressor.get_model_context_length", return_value=100000):
|
|
c = ContextCompressor(
|
|
model="test/model",
|
|
threshold_percent=0.85,
|
|
protect_first_n=1,
|
|
protect_last_n=1,
|
|
quiet_mode=True,
|
|
)
|
|
huge_content = "# Shopping Browser Setup Notes\n\n## Overview\n" + "x " * 400
|
|
args_payload = _json.dumps({
|
|
"path": "~/.hermes/skills/shopping/browser-setup-notes.md",
|
|
"content": huge_content,
|
|
})
|
|
assert len(args_payload) > 500 # triggers the Pass-3 shrink
|
|
messages = [
|
|
{"role": "user", "content": "please write two files"},
|
|
{"role": "assistant", "content": None, "tool_calls": [
|
|
{"id": "call_1", "type": "function",
|
|
"function": {"name": "write_file", "arguments": args_payload}},
|
|
]},
|
|
{"role": "tool", "tool_call_id": "call_1",
|
|
"content": '{"bytes_written": 727}'},
|
|
{"role": "user", "content": "ok"},
|
|
{"role": "assistant", "content": "done"},
|
|
]
|
|
result, _ = c._prune_old_tool_results(messages, protect_tail_count=2)
|
|
shrunk = result[1]["tool_calls"][0]["function"]["arguments"]
|
|
# Must parse — otherwise downstream provider returns 400
|
|
parsed = _json.loads(shrunk)
|
|
assert parsed["path"] == "~/.hermes/skills/shopping/browser-setup-notes.md"
|
|
assert parsed["content"].endswith("...[truncated]")
|