mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-09 08:21:50 +00:00
## What does this PR do? The trajectory compressor could corrupt training trajectories by cutting a conversation in the middle of a tool-call/tool-response pair. In the from/value trajectory format a `tool` turn (carrying `<tool_response>` markers) is always emitted immediately after the `gpt` turn whose `<tool_call>` it answers, so the two turns must stay together. The compressible region's end boundary, however, was chosen purely by token accumulation: the loop stopped at the first turn where the accumulated tokens met the savings target, with no regard for turn roles. For any over-budget trajectory whose savings boundary happened to land between a `gpt` turn and its `tool` turn, the `gpt` (with its `<tool_call>`) was summarised away into the replacement `human` message while the now-orphaned `tool` turn (with its `<tool_response>`) was kept verbatim in the tail — producing an unmatched marker and silently corrupting the training signal. The head boundary had the mirror problem when the first tool turn was not protected. This change snaps both compression boundaries to a clean turn boundary before the region is extracted and replaced, so the summary always covers whole gpt+tool blocks and a `tool` turn is never separated from the `gpt` turn that precedes it. The boundary is moved forward when possible (folding an orphaned tool turn into the region that already holds its gpt) and falls back to moving backward when no clean boundary exists ahead, such as when the protected tail itself begins on a tool turn. ## Related Issue N/A ## Type of Change - [x] 🐛 Bug fix (non-breaking change that fixes an issue) ## Changes Made - `trajectory_compressor.py`: added `_is_boundary_clean()` and `_snap_boundary()` helpers on `TrajectoryCompressor`, and applied them to both the head and tail compression boundaries in `compress_trajectory()` and `compress_trajectory_async()`. When snapping collapses the region to nothing safe to compress, the trajectory is returned unchanged and flagged as still over the limit rather than being corrupted. - `tests/test_trajectory_compressor.py`: added `TestCompressionToolPairIntegrity` covering the sync and async paths plus direct unit tests for the boundary snapping (forward skip and backward fallback). ## How to Test 1. Run the focused tests: `pytest tests/test_trajectory_compressor.py -q`. 2. The new sync/async cases build a trajectory of gpt/tool pairs with an oversized middle gpt turn and choose a token target that forces the accumulation boundary to stop between a `<tool_call>` and its `<tool_response>`. They assert that `<tool_call>` and `<tool_response>` markers stay balanced after compression and that every kept `tool` turn is immediately preceded by a `gpt` turn (never the inserted summary or another tool turn). ## Checklist ### Code - [x] I've read the [Contributing Guide](https://github.com/NousResearch/hermes-agent/blob/main/CONTRIBUTING.md) - [x] My commit messages follow [Conventional Commits](https://www.conventionalcommits.org/) (`fix(scope):`, `feat(scope):`, etc.) - [x] I searched for [existing PRs](https://github.com/NousResearch/hermes-agent/pulls) to make sure this isn't a duplicate - [x] My PR contains **only** changes related to this fix/feature (no unrelated commits) - [x] I've run `pytest tests/ -q` and all tests pass - [x] I've added tests for my changes (required for bug fixes, strongly encouraged for features) - [x] I've tested on my platform: macOS 15 (Darwin 25.5) ### Documentation & Housekeeping - [x] I've updated relevant documentation (README, `docs/`, docstrings) — or N/A - [x] I've updated `cli-config.yaml.example` if I added/changed config keys — or N/A - [x] I've updated `CONTRIBUTING.md` or `AGENTS.md` if I changed architecture or workflows — or N/A - [x] I've considered cross-platform impact (Windows, macOS) per the [compatibility guide](https://github.com/NousResearch/hermes-agent/blob/main/CONTRIBUTING.md#cross-platform-compatibility) — or N/A - [x] I've updated tool descriptions/schemas if I changed tool behavior — or N/A
630 lines
23 KiB
Python
630 lines
23 KiB
Python
"""Tests for trajectory_compressor.py — config, metrics, and compression logic."""
|
|
|
|
import importlib
|
|
import os
|
|
import sys
|
|
from types import SimpleNamespace
|
|
from unittest.mock import AsyncMock, patch, MagicMock
|
|
|
|
import pytest
|
|
|
|
from trajectory_compressor import (
|
|
CompressionConfig,
|
|
TrajectoryMetrics,
|
|
AggregateMetrics,
|
|
TrajectoryCompressor,
|
|
)
|
|
|
|
|
|
def test_import_loads_env_from_hermes_home(tmp_path, monkeypatch):
|
|
home = tmp_path / ".hermes"
|
|
home.mkdir()
|
|
(home / ".env").write_text("OPENROUTER_API_KEY=from-hermes-home\n", encoding="utf-8")
|
|
|
|
monkeypatch.setenv("HERMES_HOME", str(home))
|
|
monkeypatch.delenv("OPENROUTER_API_KEY", raising=False)
|
|
|
|
sys.modules.pop("trajectory_compressor", None)
|
|
importlib.import_module("trajectory_compressor")
|
|
|
|
assert os.getenv("OPENROUTER_API_KEY") == "from-hermes-home"
|
|
|
|
|
|
def test_generate_summary_kimi_omits_temperature():
|
|
"""Kimi models should have temperature omitted — server manages it."""
|
|
config = CompressionConfig(
|
|
summarization_model="kimi-for-coding",
|
|
temperature=0.3,
|
|
summary_target_tokens=100,
|
|
max_retries=1,
|
|
)
|
|
compressor = TrajectoryCompressor.__new__(TrajectoryCompressor)
|
|
compressor.config = config
|
|
compressor.logger = MagicMock()
|
|
compressor._use_call_llm = False
|
|
compressor.client = MagicMock()
|
|
compressor.client.chat.completions.create.return_value = SimpleNamespace(
|
|
choices=[SimpleNamespace(message=SimpleNamespace(content="[CONTEXT SUMMARY]: summary"))]
|
|
)
|
|
|
|
metrics = TrajectoryMetrics()
|
|
result = compressor._generate_summary("tool output", metrics)
|
|
|
|
assert result.startswith("[CONTEXT SUMMARY]:")
|
|
assert "temperature" not in compressor.client.chat.completions.create.call_args.kwargs
|
|
|
|
|
|
def test_generate_summary_public_moonshot_kimi_k2_5_omits_temperature():
|
|
"""kimi-k2.5 on the public Moonshot API should not get a forced temperature."""
|
|
config = CompressionConfig(
|
|
summarization_model="kimi-k2.5",
|
|
base_url="https://api.moonshot.ai/v1",
|
|
temperature=0.3,
|
|
summary_target_tokens=100,
|
|
max_retries=1,
|
|
)
|
|
compressor = TrajectoryCompressor.__new__(TrajectoryCompressor)
|
|
compressor.config = config
|
|
compressor.logger = MagicMock()
|
|
compressor._use_call_llm = False
|
|
compressor.client = MagicMock()
|
|
compressor.client.chat.completions.create.return_value = SimpleNamespace(
|
|
choices=[SimpleNamespace(message=SimpleNamespace(content="[CONTEXT SUMMARY]: summary"))]
|
|
)
|
|
|
|
metrics = TrajectoryMetrics()
|
|
result = compressor._generate_summary("tool output", metrics)
|
|
|
|
assert result.startswith("[CONTEXT SUMMARY]:")
|
|
assert "temperature" not in compressor.client.chat.completions.create.call_args.kwargs
|
|
|
|
|
|
def test_generate_summary_public_moonshot_cn_kimi_k2_5_omits_temperature():
|
|
"""kimi-k2.5 on api.moonshot.cn should not get a forced temperature."""
|
|
config = CompressionConfig(
|
|
summarization_model="kimi-k2.5",
|
|
base_url="https://api.moonshot.cn/v1",
|
|
temperature=0.3,
|
|
summary_target_tokens=100,
|
|
max_retries=1,
|
|
)
|
|
compressor = TrajectoryCompressor.__new__(TrajectoryCompressor)
|
|
compressor.config = config
|
|
compressor.logger = MagicMock()
|
|
compressor._use_call_llm = False
|
|
compressor.client = MagicMock()
|
|
compressor.client.chat.completions.create.return_value = SimpleNamespace(
|
|
choices=[SimpleNamespace(message=SimpleNamespace(content="[CONTEXT SUMMARY]: summary"))]
|
|
)
|
|
|
|
metrics = TrajectoryMetrics()
|
|
result = compressor._generate_summary("tool output", metrics)
|
|
|
|
assert result.startswith("[CONTEXT SUMMARY]:")
|
|
assert "temperature" not in compressor.client.chat.completions.create.call_args.kwargs
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# CompressionConfig
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestCompressionConfig:
|
|
def test_defaults(self):
|
|
config = CompressionConfig()
|
|
assert config.target_max_tokens == 15250
|
|
assert config.summary_target_tokens == 750
|
|
assert config.protect_last_n_turns == 4
|
|
assert config.skip_under_target is True
|
|
|
|
def test_from_yaml(self, tmp_path):
|
|
yaml_content = """\
|
|
tokenizer:
|
|
name: custom-tokenizer
|
|
trust_remote_code: false
|
|
compression:
|
|
target_max_tokens: 10000
|
|
summary_target_tokens: 500
|
|
protected_turns:
|
|
first_system: true
|
|
first_human: false
|
|
last_n_turns: 6
|
|
summarization:
|
|
model: gpt-4
|
|
temperature: 0.5
|
|
max_retries: 5
|
|
output:
|
|
add_summary_notice: false
|
|
output_suffix: _short
|
|
processing:
|
|
num_workers: 8
|
|
max_concurrent_requests: 100
|
|
skip_under_target: false
|
|
save_over_limit: false
|
|
metrics:
|
|
enabled: false
|
|
per_trajectory: false
|
|
output_file: my_metrics.json
|
|
"""
|
|
yaml_file = tmp_path / "config.yaml"
|
|
yaml_file.write_text(yaml_content)
|
|
config = CompressionConfig.from_yaml(str(yaml_file))
|
|
assert config.tokenizer_name == "custom-tokenizer"
|
|
assert config.trust_remote_code is False
|
|
assert config.target_max_tokens == 10000
|
|
assert config.summary_target_tokens == 500
|
|
assert config.protect_first_human is False
|
|
assert config.protect_last_n_turns == 6
|
|
assert config.summarization_model == "gpt-4"
|
|
assert config.temperature == 0.5
|
|
assert config.max_retries == 5
|
|
assert config.add_summary_notice is False
|
|
assert config.output_suffix == "_short"
|
|
assert config.num_workers == 8
|
|
assert config.max_concurrent_requests == 100
|
|
assert config.skip_under_target is False
|
|
assert config.save_over_limit is False
|
|
assert config.metrics_enabled is False
|
|
assert config.metrics_output_file == "my_metrics.json"
|
|
|
|
def test_from_yaml_partial(self, tmp_path):
|
|
"""Only specified sections override defaults."""
|
|
yaml_file = tmp_path / "config.yaml"
|
|
yaml_file.write_text("compression:\n target_max_tokens: 8000\n")
|
|
config = CompressionConfig.from_yaml(str(yaml_file))
|
|
assert config.target_max_tokens == 8000
|
|
# Other sections keep defaults
|
|
assert config.protect_last_n_turns == 4
|
|
assert config.num_workers == 4
|
|
|
|
def test_from_yaml_empty(self, tmp_path):
|
|
yaml_file = tmp_path / "config.yaml"
|
|
yaml_file.write_text("{}\n")
|
|
config = CompressionConfig.from_yaml(str(yaml_file))
|
|
assert config.target_max_tokens == 15250 # all defaults
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TrajectoryMetrics
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestTrajectoryMetrics:
|
|
def test_to_dict(self):
|
|
m = TrajectoryMetrics()
|
|
m.original_tokens = 10000
|
|
m.compressed_tokens = 5000
|
|
m.tokens_saved = 5000
|
|
m.compression_ratio = 0.5
|
|
m.original_turns = 20
|
|
m.compressed_turns = 10
|
|
m.turns_removed = 10
|
|
m.was_compressed = True
|
|
d = m.to_dict()
|
|
assert d["original_tokens"] == 10000
|
|
assert d["compressed_tokens"] == 5000
|
|
assert d["compression_ratio"] == 0.5
|
|
assert d["was_compressed"] is True
|
|
assert d["compression_region"]["start_idx"] == -1
|
|
|
|
def test_default_values(self):
|
|
m = TrajectoryMetrics()
|
|
d = m.to_dict()
|
|
assert d["original_tokens"] == 0
|
|
assert d["was_compressed"] is False
|
|
assert d["skipped_under_target"] is False
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# AggregateMetrics
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestAggregateMetrics:
|
|
def test_empty_to_dict(self):
|
|
agg = AggregateMetrics()
|
|
d = agg.to_dict()
|
|
assert d["summary"]["total_trajectories"] == 0
|
|
assert d["averages"]["avg_compression_ratio"] == 1.0
|
|
assert d["averages"]["avg_tokens_saved_per_compressed"] == 0
|
|
|
|
def test_add_compressed_trajectory(self):
|
|
agg = AggregateMetrics()
|
|
m = TrajectoryMetrics()
|
|
m.original_tokens = 20000
|
|
m.compressed_tokens = 10000
|
|
m.tokens_saved = 10000
|
|
m.compression_ratio = 0.5
|
|
m.original_turns = 30
|
|
m.compressed_turns = 15
|
|
m.turns_removed = 15
|
|
m.was_compressed = True
|
|
agg.add_trajectory_metrics(m)
|
|
assert agg.total_trajectories == 1
|
|
assert agg.trajectories_compressed == 1
|
|
assert agg.total_tokens_saved == 10000
|
|
assert len(agg.compression_ratios) == 1
|
|
|
|
def test_add_skipped_trajectory(self):
|
|
agg = AggregateMetrics()
|
|
m = TrajectoryMetrics()
|
|
m.original_tokens = 5000
|
|
m.compressed_tokens = 5000
|
|
m.skipped_under_target = True
|
|
agg.add_trajectory_metrics(m)
|
|
assert agg.trajectories_skipped_under_target == 1
|
|
assert agg.trajectories_compressed == 0
|
|
|
|
def test_add_over_limit_trajectory(self):
|
|
agg = AggregateMetrics()
|
|
m = TrajectoryMetrics()
|
|
m.original_tokens = 20000
|
|
m.compressed_tokens = 16000
|
|
m.still_over_limit = True
|
|
m.was_compressed = True
|
|
m.compression_ratio = 0.8
|
|
agg.add_trajectory_metrics(m)
|
|
assert agg.trajectories_still_over_limit == 1
|
|
|
|
def test_multiple_trajectories_aggregation(self):
|
|
agg = AggregateMetrics()
|
|
for i in range(3):
|
|
m = TrajectoryMetrics()
|
|
m.original_tokens = 10000
|
|
m.compressed_tokens = 5000
|
|
m.tokens_saved = 5000
|
|
m.turns_removed = 5
|
|
m.was_compressed = True
|
|
m.compression_ratio = 0.5
|
|
agg.add_trajectory_metrics(m)
|
|
d = agg.to_dict()
|
|
assert d["summary"]["total_trajectories"] == 3
|
|
assert d["summary"]["trajectories_compressed"] == 3
|
|
assert d["tokens"]["total_saved"] == 15000
|
|
assert d["averages"]["avg_compression_ratio"] == 0.5
|
|
|
|
def test_to_dict_no_division_by_zero(self):
|
|
"""Ensure no ZeroDivisionError with empty data."""
|
|
agg = AggregateMetrics()
|
|
d = agg.to_dict()
|
|
assert d["summarization"]["success_rate"] == 1.0
|
|
assert d["tokens"]["overall_compression_ratio"] == 0.0
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TrajectoryCompressor._find_protected_indices
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _make_compressor(config=None):
|
|
"""Create a TrajectoryCompressor with mocked tokenizer and summarizer."""
|
|
if config is None:
|
|
config = CompressionConfig()
|
|
with patch.object(TrajectoryCompressor, '_init_tokenizer'), \
|
|
patch.object(TrajectoryCompressor, '_init_summarizer'):
|
|
compressor = TrajectoryCompressor(config)
|
|
# Provide a simple token counter for tests (1 token per 4 chars)
|
|
compressor.tokenizer = MagicMock()
|
|
compressor.tokenizer.encode = lambda text: [0] * (len(text) // 4)
|
|
return compressor
|
|
|
|
|
|
class TestFindProtectedIndices:
|
|
def test_basic_trajectory(self):
|
|
tc = _make_compressor()
|
|
trajectory = [
|
|
{"from": "system", "value": "You are an agent."},
|
|
{"from": "human", "value": "Do something."},
|
|
{"from": "gpt", "value": "I will use a tool."},
|
|
{"from": "tool", "value": "Tool result."},
|
|
{"from": "gpt", "value": "More work."},
|
|
{"from": "tool", "value": "Another result."},
|
|
{"from": "gpt", "value": "Work continues."},
|
|
{"from": "tool", "value": "Result 3."},
|
|
{"from": "gpt", "value": "Done."},
|
|
{"from": "human", "value": "Thanks."},
|
|
]
|
|
protected, start, end = tc._find_protected_indices(trajectory)
|
|
# First system (0), human (1), gpt (2), tool (3) are protected
|
|
assert 0 in protected
|
|
assert 1 in protected
|
|
assert 2 in protected
|
|
assert 3 in protected
|
|
# Last 4 turns (6,7,8,9) are protected
|
|
assert 6 in protected
|
|
assert 7 in protected
|
|
assert 8 in protected
|
|
assert 9 in protected
|
|
# Compressible region should be between head and tail
|
|
assert start >= 4
|
|
assert end <= 6
|
|
|
|
def test_short_trajectory_all_protected(self):
|
|
tc = _make_compressor()
|
|
trajectory = [
|
|
{"from": "system", "value": "sys"},
|
|
{"from": "human", "value": "hi"},
|
|
{"from": "gpt", "value": "hello"},
|
|
]
|
|
protected, start, end = tc._find_protected_indices(trajectory)
|
|
# All 3 turns should be protected (first of each + last 4 covers all)
|
|
assert len(protected) == 3
|
|
assert start >= end # Nothing to compress
|
|
|
|
def test_protect_last_n_zero(self):
|
|
config = CompressionConfig()
|
|
config.protect_last_n_turns = 0
|
|
tc = _make_compressor(config)
|
|
trajectory = [
|
|
{"from": "system", "value": "sys"},
|
|
{"from": "human", "value": "q"},
|
|
{"from": "gpt", "value": "a"},
|
|
{"from": "tool", "value": "r"},
|
|
{"from": "gpt", "value": "b"},
|
|
{"from": "tool", "value": "r2"},
|
|
{"from": "gpt", "value": "c"},
|
|
{"from": "tool", "value": "r3"},
|
|
]
|
|
protected, start, end = tc._find_protected_indices(trajectory)
|
|
# Only first occurrences protected, no tail protection
|
|
assert 0 in protected
|
|
assert 1 in protected
|
|
assert 2 in protected
|
|
assert 3 in protected
|
|
assert 7 not in protected
|
|
|
|
def test_no_system_turn(self):
|
|
tc = _make_compressor()
|
|
trajectory = [
|
|
{"from": "human", "value": "hi"},
|
|
{"from": "gpt", "value": "hello"},
|
|
{"from": "tool", "value": "data"},
|
|
{"from": "gpt", "value": "result"},
|
|
{"from": "human", "value": "thanks"},
|
|
]
|
|
protected, start, end = tc._find_protected_indices(trajectory)
|
|
assert 0 in protected # first human
|
|
|
|
def test_disable_protect_first_system(self):
|
|
config = CompressionConfig()
|
|
config.protect_first_system = False
|
|
tc = _make_compressor(config)
|
|
trajectory = [
|
|
{"from": "system", "value": "sys"},
|
|
{"from": "human", "value": "q"},
|
|
{"from": "gpt", "value": "a"},
|
|
{"from": "tool", "value": "r"},
|
|
{"from": "gpt", "value": "b"},
|
|
{"from": "tool", "value": "r2"},
|
|
{"from": "gpt", "value": "c"},
|
|
{"from": "tool", "value": "r3"},
|
|
]
|
|
protected, _, _ = tc._find_protected_indices(trajectory)
|
|
assert 0 not in protected # system not protected
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TrajectoryCompressor._extract_turn_content_for_summary
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestExtractTurnContent:
|
|
def test_basic_extraction(self):
|
|
tc = _make_compressor()
|
|
trajectory = [
|
|
{"from": "gpt", "value": "I will search."},
|
|
{"from": "tool", "value": "Search result: found it."},
|
|
{"from": "gpt", "value": "Great, done."},
|
|
]
|
|
content = tc._extract_turn_content_for_summary(trajectory, 0, 2)
|
|
assert "[Turn 0 - GPT]" in content
|
|
assert "I will search." in content
|
|
assert "[Turn 1 - TOOL]" in content
|
|
assert "Search result: found it." in content
|
|
# Turn 2 should NOT be included (end is exclusive)
|
|
assert "[Turn 2" not in content
|
|
|
|
def test_long_content_truncated(self):
|
|
tc = _make_compressor()
|
|
trajectory = [
|
|
{"from": "tool", "value": "x" * 5000},
|
|
]
|
|
content = tc._extract_turn_content_for_summary(trajectory, 0, 1)
|
|
assert "...[truncated]..." in content
|
|
assert len(content) < 5000
|
|
|
|
def test_empty_range(self):
|
|
tc = _make_compressor()
|
|
trajectory = [{"from": "gpt", "value": "hello"}]
|
|
content = tc._extract_turn_content_for_summary(trajectory, 0, 0)
|
|
assert content == ""
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TrajectoryCompressor.count_tokens / count_trajectory_tokens
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestTokenCounting:
|
|
def test_count_tokens_empty(self):
|
|
tc = _make_compressor()
|
|
assert tc.count_tokens("") == 0
|
|
|
|
def test_count_tokens_basic(self):
|
|
tc = _make_compressor()
|
|
# Our mock: 1 token per 4 chars
|
|
assert tc.count_tokens("12345678") == 2
|
|
|
|
def test_count_trajectory_tokens(self):
|
|
tc = _make_compressor()
|
|
trajectory = [
|
|
{"from": "system", "value": "12345678"}, # 2 tokens
|
|
{"from": "human", "value": "1234567890ab"}, # 3 tokens
|
|
]
|
|
assert tc.count_trajectory_tokens(trajectory) == 5
|
|
|
|
def test_count_turn_tokens(self):
|
|
tc = _make_compressor()
|
|
trajectory = [
|
|
{"from": "system", "value": "1234"}, # 1 token
|
|
{"from": "human", "value": "12345678"}, # 2 tokens
|
|
]
|
|
result = tc.count_turn_tokens(trajectory)
|
|
assert result == [1, 2]
|
|
|
|
def test_count_tokens_fallback_on_error(self):
|
|
tc = _make_compressor()
|
|
tc.tokenizer.encode = MagicMock(side_effect=Exception("fail"))
|
|
# Should fallback to len(text) // 4
|
|
assert tc.count_tokens("12345678") == 2
|
|
|
|
|
|
class TestGenerateSummary:
|
|
def test_generate_summary_handles_none_content(self):
|
|
tc = _make_compressor()
|
|
tc.client = MagicMock()
|
|
tc.client.chat.completions.create.return_value = SimpleNamespace(
|
|
choices=[SimpleNamespace(message=SimpleNamespace(content=None))]
|
|
)
|
|
metrics = TrajectoryMetrics()
|
|
|
|
summary = tc._generate_summary("Turn content", metrics)
|
|
|
|
assert summary == "[CONTEXT SUMMARY]:"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_generate_summary_async_handles_none_content(self):
|
|
tc = _make_compressor()
|
|
mock_client = MagicMock()
|
|
mock_client.chat.completions.create = AsyncMock(
|
|
return_value=SimpleNamespace(
|
|
choices=[SimpleNamespace(message=SimpleNamespace(content=None))]
|
|
)
|
|
)
|
|
tc._get_async_client = MagicMock(return_value=mock_client)
|
|
metrics = TrajectoryMetrics()
|
|
|
|
summary = await tc._generate_summary_async("Turn content", metrics)
|
|
|
|
assert summary == "[CONTEXT SUMMARY]:"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TrajectoryCompressor — compression boundary must not split tool pairs
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _gpt_with_tool_call(label, tokens):
|
|
"""A 'gpt' turn carrying a <tool_call> marker, padded to ~`tokens` tokens."""
|
|
body = f"<tool_call>\n{{\"name\": \"{label}\"}}\n</tool_call>"
|
|
pad = max(0, tokens * 4 - len(body))
|
|
return {"from": "gpt", "value": body + "x" * pad}
|
|
|
|
|
|
def _tool_response(label, tokens):
|
|
"""A 'tool' turn carrying a <tool_response> marker, padded to ~`tokens` tokens."""
|
|
body = f"<tool_response>\n{{\"name\": \"{label}\"}}\n</tool_response>"
|
|
pad = max(0, tokens * 4 - len(body))
|
|
return {"from": "tool", "value": body + "x" * pad}
|
|
|
|
|
|
def _count_marker(trajectory, marker):
|
|
return sum(turn["value"].count(marker) for turn in trajectory)
|
|
|
|
|
|
def _paired_trajectory():
|
|
"""A 10-turn trajectory of gpt/tool pairs with one oversized middle gpt turn.
|
|
|
|
Layout (index): system, human, gpt#0, tool#0, gpt#1(big), tool#1, gpt#2,
|
|
tool#2, gpt(final), human. With ``protect_last_n_turns=2`` the compressible
|
|
region is [4, 8) and the oversized gpt#1 at index 4 is large enough that the
|
|
token-accumulation boundary stops at index 5 — i.e. between gpt#1's
|
|
<tool_call> and tool#1's <tool_response>.
|
|
"""
|
|
return [
|
|
{"from": "system", "value": "You are an agent. " * 4},
|
|
{"from": "human", "value": "Please do the task. " * 4},
|
|
_gpt_with_tool_call("a", 12),
|
|
_tool_response("a", 12),
|
|
_gpt_with_tool_call("b", 400), # oversized — forces a mid-pair boundary
|
|
_tool_response("b", 12),
|
|
_gpt_with_tool_call("c", 12),
|
|
_tool_response("c", 12),
|
|
{"from": "gpt", "value": "<think>\n</think>\nAll done."},
|
|
{"from": "human", "value": "Thanks!"},
|
|
]
|
|
|
|
|
|
def _target_that_splits_after_index_4(tc, trajectory):
|
|
"""Pick a target so token accumulation breaks right after index 4 (a gpt)."""
|
|
turn_tokens = tc.count_turn_tokens(trajectory)
|
|
total = sum(turn_tokens)
|
|
# threshold == turn_tokens[4] makes the loop break at compress_until = 5,
|
|
# which lands on the tool turn paired with gpt#1.
|
|
return total - turn_tokens[4] + tc.config.summary_target_tokens
|
|
|
|
|
|
class TestCompressionToolPairIntegrity:
|
|
def _config(self):
|
|
config = CompressionConfig()
|
|
config.protect_last_n_turns = 2
|
|
config.summary_target_tokens = 4
|
|
return config
|
|
|
|
def test_sync_compression_does_not_orphan_tool_markers(self):
|
|
tc = _make_compressor(self._config())
|
|
tc._generate_summary = MagicMock(
|
|
return_value="[CONTEXT SUMMARY]: middle turns summarized."
|
|
)
|
|
trajectory = _paired_trajectory()
|
|
tc.config.target_max_tokens = _target_that_splits_after_index_4(tc, trajectory)
|
|
|
|
compressed, metrics = tc.compress_trajectory(trajectory)
|
|
|
|
assert metrics.was_compressed
|
|
# Every <tool_call> must keep its matching <tool_response>.
|
|
assert _count_marker(compressed, "<tool_call>") == _count_marker(
|
|
compressed, "<tool_response>"
|
|
)
|
|
# A kept 'tool' turn must always immediately follow its 'gpt' turn —
|
|
# never the inserted summary (a 'human' turn) or another 'tool' turn.
|
|
for i, turn in enumerate(compressed):
|
|
if turn.get("from") == "tool":
|
|
assert i > 0 and compressed[i - 1].get("from") == "gpt"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_async_compression_does_not_orphan_tool_markers(self):
|
|
tc = _make_compressor(self._config())
|
|
tc._generate_summary_async = AsyncMock(
|
|
return_value="[CONTEXT SUMMARY]: middle turns summarized."
|
|
)
|
|
trajectory = _paired_trajectory()
|
|
tc.config.target_max_tokens = _target_that_splits_after_index_4(tc, trajectory)
|
|
|
|
compressed, metrics = await tc.compress_trajectory_async(trajectory)
|
|
|
|
assert metrics.was_compressed
|
|
assert _count_marker(compressed, "<tool_call>") == _count_marker(
|
|
compressed, "<tool_response>"
|
|
)
|
|
for i, turn in enumerate(compressed):
|
|
if turn.get("from") == "tool":
|
|
assert i > 0 and compressed[i - 1].get("from") == "gpt"
|
|
|
|
def test_snap_boundary_skips_tool_turn_forward(self):
|
|
tc = _make_compressor()
|
|
trajectory = _paired_trajectory()
|
|
# Index 5 is a 'tool' turn; the boundary should move forward to 6.
|
|
assert tc._snap_boundary(trajectory, 5, 4, 8) == 6
|
|
# Index 4 is a 'gpt' turn and already clean.
|
|
assert tc._snap_boundary(trajectory, 4, 4, 8) == 4
|
|
|
|
def test_snap_boundary_falls_back_to_backward(self):
|
|
tc = _make_compressor()
|
|
# Protected tail begins on a 'tool' turn at max_idx: no clean boundary
|
|
# ahead, so the boundary must retreat onto the preceding 'gpt' turn.
|
|
trajectory = [
|
|
{"from": "gpt", "value": "<tool_call>a</tool_call>"},
|
|
{"from": "tool", "value": "<tool_response>a</tool_response>"},
|
|
]
|
|
assert tc._snap_boundary(trajectory, 1, 0, 1) == 0
|