hermes-agent/tests/test_trajectory_compressor.py
synapsesx f10a330aee fix(research): keep tool_call/tool_response pairs intact when compressing trajectories
## What does this PR do?

The trajectory compressor could corrupt training trajectories by cutting a
conversation in the middle of a tool-call/tool-response pair. In the from/value
trajectory format a `tool` turn (carrying `<tool_response>` markers) is always
emitted immediately after the `gpt` turn whose `<tool_call>` it answers, so the
two turns must stay together. The compressible region's end boundary, however,
was chosen purely by token accumulation: the loop stopped at the first turn where
the accumulated tokens met the savings target, with no regard for turn roles. For
any over-budget trajectory whose savings boundary happened to land between a `gpt`
turn and its `tool` turn, the `gpt` (with its `<tool_call>`) was summarised away
into the replacement `human` message while the now-orphaned `tool` turn (with its
`<tool_response>`) was kept verbatim in the tail — producing an unmatched marker
and silently corrupting the training signal. The head boundary had the mirror
problem when the first tool turn was not protected.

This change snaps both compression boundaries to a clean turn boundary before the
region is extracted and replaced, so the summary always covers whole gpt+tool
blocks and a `tool` turn is never separated from the `gpt` turn that precedes it.
The boundary is moved forward when possible (folding an orphaned tool turn into
the region that already holds its gpt) and falls back to moving backward when no
clean boundary exists ahead, such as when the protected tail itself begins on a
tool turn.

## Related Issue

N/A

## Type of Change

- [x] 🐛 Bug fix (non-breaking change that fixes an issue)

## Changes Made

- `trajectory_compressor.py`: added `_is_boundary_clean()` and `_snap_boundary()`
  helpers on `TrajectoryCompressor`, and applied them to both the head and tail
  compression boundaries in `compress_trajectory()` and
  `compress_trajectory_async()`. When snapping collapses the region to nothing
  safe to compress, the trajectory is returned unchanged and flagged as still
  over the limit rather than being corrupted.
- `tests/test_trajectory_compressor.py`: added `TestCompressionToolPairIntegrity`
  covering the sync and async paths plus direct unit tests for the boundary
  snapping (forward skip and backward fallback).

## How to Test

1. Run the focused tests: `pytest tests/test_trajectory_compressor.py -q`.
2. The new sync/async cases build a trajectory of gpt/tool pairs with an oversized
   middle gpt turn and choose a token target that forces the accumulation
   boundary to stop between a `<tool_call>` and its `<tool_response>`. They assert
   that `<tool_call>` and `<tool_response>` markers stay balanced after
   compression and that every kept `tool` turn is immediately preceded by a `gpt`
   turn (never the inserted summary or another tool turn).

## Checklist

### Code

- [x] I've read the [Contributing Guide](https://github.com/NousResearch/hermes-agent/blob/main/CONTRIBUTING.md)
- [x] My commit messages follow [Conventional Commits](https://www.conventionalcommits.org/) (`fix(scope):`, `feat(scope):`, etc.)
- [x] I searched for [existing PRs](https://github.com/NousResearch/hermes-agent/pulls) to make sure this isn't a duplicate
- [x] My PR contains **only** changes related to this fix/feature (no unrelated commits)
- [x] I've run `pytest tests/ -q` and all tests pass
- [x] I've added tests for my changes (required for bug fixes, strongly encouraged for features)
- [x] I've tested on my platform: macOS 15 (Darwin 25.5)

### Documentation & Housekeeping

- [x] I've updated relevant documentation (README, `docs/`, docstrings) — or N/A
- [x] I've updated `cli-config.yaml.example` if I added/changed config keys — or N/A
- [x] I've updated `CONTRIBUTING.md` or `AGENTS.md` if I changed architecture or workflows — or N/A
- [x] I've considered cross-platform impact (Windows, macOS) per the [compatibility guide](https://github.com/NousResearch/hermes-agent/blob/main/CONTRIBUTING.md#cross-platform-compatibility) — or N/A
- [x] I've updated tool descriptions/schemas if I changed tool behavior — or N/A
2026-06-07 05:01:27 -07:00

630 lines
23 KiB
Python

"""Tests for trajectory_compressor.py — config, metrics, and compression logic."""
import importlib
import os
import sys
from types import SimpleNamespace
from unittest.mock import AsyncMock, patch, MagicMock
import pytest
from trajectory_compressor import (
CompressionConfig,
TrajectoryMetrics,
AggregateMetrics,
TrajectoryCompressor,
)
def test_import_loads_env_from_hermes_home(tmp_path, monkeypatch):
home = tmp_path / ".hermes"
home.mkdir()
(home / ".env").write_text("OPENROUTER_API_KEY=from-hermes-home\n", encoding="utf-8")
monkeypatch.setenv("HERMES_HOME", str(home))
monkeypatch.delenv("OPENROUTER_API_KEY", raising=False)
sys.modules.pop("trajectory_compressor", None)
importlib.import_module("trajectory_compressor")
assert os.getenv("OPENROUTER_API_KEY") == "from-hermes-home"
def test_generate_summary_kimi_omits_temperature():
"""Kimi models should have temperature omitted — server manages it."""
config = CompressionConfig(
summarization_model="kimi-for-coding",
temperature=0.3,
summary_target_tokens=100,
max_retries=1,
)
compressor = TrajectoryCompressor.__new__(TrajectoryCompressor)
compressor.config = config
compressor.logger = MagicMock()
compressor._use_call_llm = False
compressor.client = MagicMock()
compressor.client.chat.completions.create.return_value = SimpleNamespace(
choices=[SimpleNamespace(message=SimpleNamespace(content="[CONTEXT SUMMARY]: summary"))]
)
metrics = TrajectoryMetrics()
result = compressor._generate_summary("tool output", metrics)
assert result.startswith("[CONTEXT SUMMARY]:")
assert "temperature" not in compressor.client.chat.completions.create.call_args.kwargs
def test_generate_summary_public_moonshot_kimi_k2_5_omits_temperature():
"""kimi-k2.5 on the public Moonshot API should not get a forced temperature."""
config = CompressionConfig(
summarization_model="kimi-k2.5",
base_url="https://api.moonshot.ai/v1",
temperature=0.3,
summary_target_tokens=100,
max_retries=1,
)
compressor = TrajectoryCompressor.__new__(TrajectoryCompressor)
compressor.config = config
compressor.logger = MagicMock()
compressor._use_call_llm = False
compressor.client = MagicMock()
compressor.client.chat.completions.create.return_value = SimpleNamespace(
choices=[SimpleNamespace(message=SimpleNamespace(content="[CONTEXT SUMMARY]: summary"))]
)
metrics = TrajectoryMetrics()
result = compressor._generate_summary("tool output", metrics)
assert result.startswith("[CONTEXT SUMMARY]:")
assert "temperature" not in compressor.client.chat.completions.create.call_args.kwargs
def test_generate_summary_public_moonshot_cn_kimi_k2_5_omits_temperature():
"""kimi-k2.5 on api.moonshot.cn should not get a forced temperature."""
config = CompressionConfig(
summarization_model="kimi-k2.5",
base_url="https://api.moonshot.cn/v1",
temperature=0.3,
summary_target_tokens=100,
max_retries=1,
)
compressor = TrajectoryCompressor.__new__(TrajectoryCompressor)
compressor.config = config
compressor.logger = MagicMock()
compressor._use_call_llm = False
compressor.client = MagicMock()
compressor.client.chat.completions.create.return_value = SimpleNamespace(
choices=[SimpleNamespace(message=SimpleNamespace(content="[CONTEXT SUMMARY]: summary"))]
)
metrics = TrajectoryMetrics()
result = compressor._generate_summary("tool output", metrics)
assert result.startswith("[CONTEXT SUMMARY]:")
assert "temperature" not in compressor.client.chat.completions.create.call_args.kwargs
# ---------------------------------------------------------------------------
# CompressionConfig
# ---------------------------------------------------------------------------
class TestCompressionConfig:
def test_defaults(self):
config = CompressionConfig()
assert config.target_max_tokens == 15250
assert config.summary_target_tokens == 750
assert config.protect_last_n_turns == 4
assert config.skip_under_target is True
def test_from_yaml(self, tmp_path):
yaml_content = """\
tokenizer:
name: custom-tokenizer
trust_remote_code: false
compression:
target_max_tokens: 10000
summary_target_tokens: 500
protected_turns:
first_system: true
first_human: false
last_n_turns: 6
summarization:
model: gpt-4
temperature: 0.5
max_retries: 5
output:
add_summary_notice: false
output_suffix: _short
processing:
num_workers: 8
max_concurrent_requests: 100
skip_under_target: false
save_over_limit: false
metrics:
enabled: false
per_trajectory: false
output_file: my_metrics.json
"""
yaml_file = tmp_path / "config.yaml"
yaml_file.write_text(yaml_content)
config = CompressionConfig.from_yaml(str(yaml_file))
assert config.tokenizer_name == "custom-tokenizer"
assert config.trust_remote_code is False
assert config.target_max_tokens == 10000
assert config.summary_target_tokens == 500
assert config.protect_first_human is False
assert config.protect_last_n_turns == 6
assert config.summarization_model == "gpt-4"
assert config.temperature == 0.5
assert config.max_retries == 5
assert config.add_summary_notice is False
assert config.output_suffix == "_short"
assert config.num_workers == 8
assert config.max_concurrent_requests == 100
assert config.skip_under_target is False
assert config.save_over_limit is False
assert config.metrics_enabled is False
assert config.metrics_output_file == "my_metrics.json"
def test_from_yaml_partial(self, tmp_path):
"""Only specified sections override defaults."""
yaml_file = tmp_path / "config.yaml"
yaml_file.write_text("compression:\n target_max_tokens: 8000\n")
config = CompressionConfig.from_yaml(str(yaml_file))
assert config.target_max_tokens == 8000
# Other sections keep defaults
assert config.protect_last_n_turns == 4
assert config.num_workers == 4
def test_from_yaml_empty(self, tmp_path):
yaml_file = tmp_path / "config.yaml"
yaml_file.write_text("{}\n")
config = CompressionConfig.from_yaml(str(yaml_file))
assert config.target_max_tokens == 15250 # all defaults
# ---------------------------------------------------------------------------
# TrajectoryMetrics
# ---------------------------------------------------------------------------
class TestTrajectoryMetrics:
def test_to_dict(self):
m = TrajectoryMetrics()
m.original_tokens = 10000
m.compressed_tokens = 5000
m.tokens_saved = 5000
m.compression_ratio = 0.5
m.original_turns = 20
m.compressed_turns = 10
m.turns_removed = 10
m.was_compressed = True
d = m.to_dict()
assert d["original_tokens"] == 10000
assert d["compressed_tokens"] == 5000
assert d["compression_ratio"] == 0.5
assert d["was_compressed"] is True
assert d["compression_region"]["start_idx"] == -1
def test_default_values(self):
m = TrajectoryMetrics()
d = m.to_dict()
assert d["original_tokens"] == 0
assert d["was_compressed"] is False
assert d["skipped_under_target"] is False
# ---------------------------------------------------------------------------
# AggregateMetrics
# ---------------------------------------------------------------------------
class TestAggregateMetrics:
def test_empty_to_dict(self):
agg = AggregateMetrics()
d = agg.to_dict()
assert d["summary"]["total_trajectories"] == 0
assert d["averages"]["avg_compression_ratio"] == 1.0
assert d["averages"]["avg_tokens_saved_per_compressed"] == 0
def test_add_compressed_trajectory(self):
agg = AggregateMetrics()
m = TrajectoryMetrics()
m.original_tokens = 20000
m.compressed_tokens = 10000
m.tokens_saved = 10000
m.compression_ratio = 0.5
m.original_turns = 30
m.compressed_turns = 15
m.turns_removed = 15
m.was_compressed = True
agg.add_trajectory_metrics(m)
assert agg.total_trajectories == 1
assert agg.trajectories_compressed == 1
assert agg.total_tokens_saved == 10000
assert len(agg.compression_ratios) == 1
def test_add_skipped_trajectory(self):
agg = AggregateMetrics()
m = TrajectoryMetrics()
m.original_tokens = 5000
m.compressed_tokens = 5000
m.skipped_under_target = True
agg.add_trajectory_metrics(m)
assert agg.trajectories_skipped_under_target == 1
assert agg.trajectories_compressed == 0
def test_add_over_limit_trajectory(self):
agg = AggregateMetrics()
m = TrajectoryMetrics()
m.original_tokens = 20000
m.compressed_tokens = 16000
m.still_over_limit = True
m.was_compressed = True
m.compression_ratio = 0.8
agg.add_trajectory_metrics(m)
assert agg.trajectories_still_over_limit == 1
def test_multiple_trajectories_aggregation(self):
agg = AggregateMetrics()
for i in range(3):
m = TrajectoryMetrics()
m.original_tokens = 10000
m.compressed_tokens = 5000
m.tokens_saved = 5000
m.turns_removed = 5
m.was_compressed = True
m.compression_ratio = 0.5
agg.add_trajectory_metrics(m)
d = agg.to_dict()
assert d["summary"]["total_trajectories"] == 3
assert d["summary"]["trajectories_compressed"] == 3
assert d["tokens"]["total_saved"] == 15000
assert d["averages"]["avg_compression_ratio"] == 0.5
def test_to_dict_no_division_by_zero(self):
"""Ensure no ZeroDivisionError with empty data."""
agg = AggregateMetrics()
d = agg.to_dict()
assert d["summarization"]["success_rate"] == 1.0
assert d["tokens"]["overall_compression_ratio"] == 0.0
# ---------------------------------------------------------------------------
# TrajectoryCompressor._find_protected_indices
# ---------------------------------------------------------------------------
def _make_compressor(config=None):
"""Create a TrajectoryCompressor with mocked tokenizer and summarizer."""
if config is None:
config = CompressionConfig()
with patch.object(TrajectoryCompressor, '_init_tokenizer'), \
patch.object(TrajectoryCompressor, '_init_summarizer'):
compressor = TrajectoryCompressor(config)
# Provide a simple token counter for tests (1 token per 4 chars)
compressor.tokenizer = MagicMock()
compressor.tokenizer.encode = lambda text: [0] * (len(text) // 4)
return compressor
class TestFindProtectedIndices:
def test_basic_trajectory(self):
tc = _make_compressor()
trajectory = [
{"from": "system", "value": "You are an agent."},
{"from": "human", "value": "Do something."},
{"from": "gpt", "value": "I will use a tool."},
{"from": "tool", "value": "Tool result."},
{"from": "gpt", "value": "More work."},
{"from": "tool", "value": "Another result."},
{"from": "gpt", "value": "Work continues."},
{"from": "tool", "value": "Result 3."},
{"from": "gpt", "value": "Done."},
{"from": "human", "value": "Thanks."},
]
protected, start, end = tc._find_protected_indices(trajectory)
# First system (0), human (1), gpt (2), tool (3) are protected
assert 0 in protected
assert 1 in protected
assert 2 in protected
assert 3 in protected
# Last 4 turns (6,7,8,9) are protected
assert 6 in protected
assert 7 in protected
assert 8 in protected
assert 9 in protected
# Compressible region should be between head and tail
assert start >= 4
assert end <= 6
def test_short_trajectory_all_protected(self):
tc = _make_compressor()
trajectory = [
{"from": "system", "value": "sys"},
{"from": "human", "value": "hi"},
{"from": "gpt", "value": "hello"},
]
protected, start, end = tc._find_protected_indices(trajectory)
# All 3 turns should be protected (first of each + last 4 covers all)
assert len(protected) == 3
assert start >= end # Nothing to compress
def test_protect_last_n_zero(self):
config = CompressionConfig()
config.protect_last_n_turns = 0
tc = _make_compressor(config)
trajectory = [
{"from": "system", "value": "sys"},
{"from": "human", "value": "q"},
{"from": "gpt", "value": "a"},
{"from": "tool", "value": "r"},
{"from": "gpt", "value": "b"},
{"from": "tool", "value": "r2"},
{"from": "gpt", "value": "c"},
{"from": "tool", "value": "r3"},
]
protected, start, end = tc._find_protected_indices(trajectory)
# Only first occurrences protected, no tail protection
assert 0 in protected
assert 1 in protected
assert 2 in protected
assert 3 in protected
assert 7 not in protected
def test_no_system_turn(self):
tc = _make_compressor()
trajectory = [
{"from": "human", "value": "hi"},
{"from": "gpt", "value": "hello"},
{"from": "tool", "value": "data"},
{"from": "gpt", "value": "result"},
{"from": "human", "value": "thanks"},
]
protected, start, end = tc._find_protected_indices(trajectory)
assert 0 in protected # first human
def test_disable_protect_first_system(self):
config = CompressionConfig()
config.protect_first_system = False
tc = _make_compressor(config)
trajectory = [
{"from": "system", "value": "sys"},
{"from": "human", "value": "q"},
{"from": "gpt", "value": "a"},
{"from": "tool", "value": "r"},
{"from": "gpt", "value": "b"},
{"from": "tool", "value": "r2"},
{"from": "gpt", "value": "c"},
{"from": "tool", "value": "r3"},
]
protected, _, _ = tc._find_protected_indices(trajectory)
assert 0 not in protected # system not protected
# ---------------------------------------------------------------------------
# TrajectoryCompressor._extract_turn_content_for_summary
# ---------------------------------------------------------------------------
class TestExtractTurnContent:
def test_basic_extraction(self):
tc = _make_compressor()
trajectory = [
{"from": "gpt", "value": "I will search."},
{"from": "tool", "value": "Search result: found it."},
{"from": "gpt", "value": "Great, done."},
]
content = tc._extract_turn_content_for_summary(trajectory, 0, 2)
assert "[Turn 0 - GPT]" in content
assert "I will search." in content
assert "[Turn 1 - TOOL]" in content
assert "Search result: found it." in content
# Turn 2 should NOT be included (end is exclusive)
assert "[Turn 2" not in content
def test_long_content_truncated(self):
tc = _make_compressor()
trajectory = [
{"from": "tool", "value": "x" * 5000},
]
content = tc._extract_turn_content_for_summary(trajectory, 0, 1)
assert "...[truncated]..." in content
assert len(content) < 5000
def test_empty_range(self):
tc = _make_compressor()
trajectory = [{"from": "gpt", "value": "hello"}]
content = tc._extract_turn_content_for_summary(trajectory, 0, 0)
assert content == ""
# ---------------------------------------------------------------------------
# TrajectoryCompressor.count_tokens / count_trajectory_tokens
# ---------------------------------------------------------------------------
class TestTokenCounting:
def test_count_tokens_empty(self):
tc = _make_compressor()
assert tc.count_tokens("") == 0
def test_count_tokens_basic(self):
tc = _make_compressor()
# Our mock: 1 token per 4 chars
assert tc.count_tokens("12345678") == 2
def test_count_trajectory_tokens(self):
tc = _make_compressor()
trajectory = [
{"from": "system", "value": "12345678"}, # 2 tokens
{"from": "human", "value": "1234567890ab"}, # 3 tokens
]
assert tc.count_trajectory_tokens(trajectory) == 5
def test_count_turn_tokens(self):
tc = _make_compressor()
trajectory = [
{"from": "system", "value": "1234"}, # 1 token
{"from": "human", "value": "12345678"}, # 2 tokens
]
result = tc.count_turn_tokens(trajectory)
assert result == [1, 2]
def test_count_tokens_fallback_on_error(self):
tc = _make_compressor()
tc.tokenizer.encode = MagicMock(side_effect=Exception("fail"))
# Should fallback to len(text) // 4
assert tc.count_tokens("12345678") == 2
class TestGenerateSummary:
def test_generate_summary_handles_none_content(self):
tc = _make_compressor()
tc.client = MagicMock()
tc.client.chat.completions.create.return_value = SimpleNamespace(
choices=[SimpleNamespace(message=SimpleNamespace(content=None))]
)
metrics = TrajectoryMetrics()
summary = tc._generate_summary("Turn content", metrics)
assert summary == "[CONTEXT SUMMARY]:"
@pytest.mark.asyncio
async def test_generate_summary_async_handles_none_content(self):
tc = _make_compressor()
mock_client = MagicMock()
mock_client.chat.completions.create = AsyncMock(
return_value=SimpleNamespace(
choices=[SimpleNamespace(message=SimpleNamespace(content=None))]
)
)
tc._get_async_client = MagicMock(return_value=mock_client)
metrics = TrajectoryMetrics()
summary = await tc._generate_summary_async("Turn content", metrics)
assert summary == "[CONTEXT SUMMARY]:"
# ---------------------------------------------------------------------------
# TrajectoryCompressor — compression boundary must not split tool pairs
# ---------------------------------------------------------------------------
def _gpt_with_tool_call(label, tokens):
"""A 'gpt' turn carrying a <tool_call> marker, padded to ~`tokens` tokens."""
body = f"<tool_call>\n{{\"name\": \"{label}\"}}\n</tool_call>"
pad = max(0, tokens * 4 - len(body))
return {"from": "gpt", "value": body + "x" * pad}
def _tool_response(label, tokens):
"""A 'tool' turn carrying a <tool_response> marker, padded to ~`tokens` tokens."""
body = f"<tool_response>\n{{\"name\": \"{label}\"}}\n</tool_response>"
pad = max(0, tokens * 4 - len(body))
return {"from": "tool", "value": body + "x" * pad}
def _count_marker(trajectory, marker):
return sum(turn["value"].count(marker) for turn in trajectory)
def _paired_trajectory():
"""A 10-turn trajectory of gpt/tool pairs with one oversized middle gpt turn.
Layout (index): system, human, gpt#0, tool#0, gpt#1(big), tool#1, gpt#2,
tool#2, gpt(final), human. With ``protect_last_n_turns=2`` the compressible
region is [4, 8) and the oversized gpt#1 at index 4 is large enough that the
token-accumulation boundary stops at index 5 — i.e. between gpt#1's
<tool_call> and tool#1's <tool_response>.
"""
return [
{"from": "system", "value": "You are an agent. " * 4},
{"from": "human", "value": "Please do the task. " * 4},
_gpt_with_tool_call("a", 12),
_tool_response("a", 12),
_gpt_with_tool_call("b", 400), # oversized — forces a mid-pair boundary
_tool_response("b", 12),
_gpt_with_tool_call("c", 12),
_tool_response("c", 12),
{"from": "gpt", "value": "<think>\n</think>\nAll done."},
{"from": "human", "value": "Thanks!"},
]
def _target_that_splits_after_index_4(tc, trajectory):
"""Pick a target so token accumulation breaks right after index 4 (a gpt)."""
turn_tokens = tc.count_turn_tokens(trajectory)
total = sum(turn_tokens)
# threshold == turn_tokens[4] makes the loop break at compress_until = 5,
# which lands on the tool turn paired with gpt#1.
return total - turn_tokens[4] + tc.config.summary_target_tokens
class TestCompressionToolPairIntegrity:
def _config(self):
config = CompressionConfig()
config.protect_last_n_turns = 2
config.summary_target_tokens = 4
return config
def test_sync_compression_does_not_orphan_tool_markers(self):
tc = _make_compressor(self._config())
tc._generate_summary = MagicMock(
return_value="[CONTEXT SUMMARY]: middle turns summarized."
)
trajectory = _paired_trajectory()
tc.config.target_max_tokens = _target_that_splits_after_index_4(tc, trajectory)
compressed, metrics = tc.compress_trajectory(trajectory)
assert metrics.was_compressed
# Every <tool_call> must keep its matching <tool_response>.
assert _count_marker(compressed, "<tool_call>") == _count_marker(
compressed, "<tool_response>"
)
# A kept 'tool' turn must always immediately follow its 'gpt' turn —
# never the inserted summary (a 'human' turn) or another 'tool' turn.
for i, turn in enumerate(compressed):
if turn.get("from") == "tool":
assert i > 0 and compressed[i - 1].get("from") == "gpt"
@pytest.mark.asyncio
async def test_async_compression_does_not_orphan_tool_markers(self):
tc = _make_compressor(self._config())
tc._generate_summary_async = AsyncMock(
return_value="[CONTEXT SUMMARY]: middle turns summarized."
)
trajectory = _paired_trajectory()
tc.config.target_max_tokens = _target_that_splits_after_index_4(tc, trajectory)
compressed, metrics = await tc.compress_trajectory_async(trajectory)
assert metrics.was_compressed
assert _count_marker(compressed, "<tool_call>") == _count_marker(
compressed, "<tool_response>"
)
for i, turn in enumerate(compressed):
if turn.get("from") == "tool":
assert i > 0 and compressed[i - 1].get("from") == "gpt"
def test_snap_boundary_skips_tool_turn_forward(self):
tc = _make_compressor()
trajectory = _paired_trajectory()
# Index 5 is a 'tool' turn; the boundary should move forward to 6.
assert tc._snap_boundary(trajectory, 5, 4, 8) == 6
# Index 4 is a 'gpt' turn and already clean.
assert tc._snap_boundary(trajectory, 4, 4, 8) == 4
def test_snap_boundary_falls_back_to_backward(self):
tc = _make_compressor()
# Protected tail begins on a 'tool' turn at max_idx: no clean boundary
# ahead, so the boundary must retreat onto the preceding 'gpt' turn.
trajectory = [
{"from": "gpt", "value": "<tool_call>a</tool_call>"},
{"from": "tool", "value": "<tool_response>a</tool_response>"},
]
assert tc._snap_boundary(trajectory, 1, 0, 1) == 0