fix(research): keep tool_call/tool_response pairs intact when compressing trajectories

## What does this PR do?

The trajectory compressor could corrupt training trajectories by cutting a
conversation in the middle of a tool-call/tool-response pair. In the from/value
trajectory format a `tool` turn (carrying `<tool_response>` markers) is always
emitted immediately after the `gpt` turn whose `<tool_call>` it answers, so the
two turns must stay together. The compressible region's end boundary, however,
was chosen purely by token accumulation: the loop stopped at the first turn where
the accumulated tokens met the savings target, with no regard for turn roles. For
any over-budget trajectory whose savings boundary happened to land between a `gpt`
turn and its `tool` turn, the `gpt` (with its `<tool_call>`) was summarised away
into the replacement `human` message while the now-orphaned `tool` turn (with its
`<tool_response>`) was kept verbatim in the tail — producing an unmatched marker
and silently corrupting the training signal. The head boundary had the mirror
problem when the first tool turn was not protected.

This change snaps both compression boundaries to a clean turn boundary before the
region is extracted and replaced, so the summary always covers whole gpt+tool
blocks and a `tool` turn is never separated from the `gpt` turn that precedes it.
The boundary is moved forward when possible (folding an orphaned tool turn into
the region that already holds its gpt) and falls back to moving backward when no
clean boundary exists ahead, such as when the protected tail itself begins on a
tool turn.

## Related Issue

N/A

## Type of Change

- [x] 🐛 Bug fix (non-breaking change that fixes an issue)

## Changes Made

- `trajectory_compressor.py`: added `_is_boundary_clean()` and `_snap_boundary()`
  helpers on `TrajectoryCompressor`, and applied them to both the head and tail
  compression boundaries in `compress_trajectory()` and
  `compress_trajectory_async()`. When snapping collapses the region to nothing
  safe to compress, the trajectory is returned unchanged and flagged as still
  over the limit rather than being corrupted.
- `tests/test_trajectory_compressor.py`: added `TestCompressionToolPairIntegrity`
  covering the sync and async paths plus direct unit tests for the boundary
  snapping (forward skip and backward fallback).

## How to Test

1. Run the focused tests: `pytest tests/test_trajectory_compressor.py -q`.
2. The new sync/async cases build a trajectory of gpt/tool pairs with an oversized
   middle gpt turn and choose a token target that forces the accumulation
   boundary to stop between a `<tool_call>` and its `<tool_response>`. They assert
   that `<tool_call>` and `<tool_response>` markers stay balanced after
   compression and that every kept `tool` turn is immediately preceded by a `gpt`
   turn (never the inserted summary or another tool turn).

## Checklist

### Code

- [x] I've read the [Contributing Guide](https://github.com/NousResearch/hermes-agent/blob/main/CONTRIBUTING.md)
- [x] My commit messages follow [Conventional Commits](https://www.conventionalcommits.org/) (`fix(scope):`, `feat(scope):`, etc.)
- [x] I searched for [existing PRs](https://github.com/NousResearch/hermes-agent/pulls) to make sure this isn't a duplicate
- [x] My PR contains **only** changes related to this fix/feature (no unrelated commits)
- [x] I've run `pytest tests/ -q` and all tests pass
- [x] I've added tests for my changes (required for bug fixes, strongly encouraged for features)
- [x] I've tested on my platform: macOS 15 (Darwin 25.5)

### Documentation & Housekeeping

- [x] I've updated relevant documentation (README, `docs/`, docstrings) — or N/A
- [x] I've updated `cli-config.yaml.example` if I added/changed config keys — or N/A
- [x] I've updated `CONTRIBUTING.md` or `AGENTS.md` if I changed architecture or workflows — or N/A
- [x] I've considered cross-platform impact (Windows, macOS) per the [compatibility guide](https://github.com/NousResearch/hermes-agent/blob/main/CONTRIBUTING.md#cross-platform-compatibility) — or N/A
- [x] I've updated tool descriptions/schemas if I changed tool behavior — or N/A
This commit is contained in:
synapsesx 2026-06-06 15:51:46 +03:00 committed by Teknium
parent 490c486ff6
commit f10a330aee
2 changed files with 202 additions and 10 deletions

View file

@ -507,3 +507,124 @@ class TestGenerateSummary:
summary = await tc._generate_summary_async("Turn content", metrics)
assert summary == "[CONTEXT SUMMARY]:"
# ---------------------------------------------------------------------------
# TrajectoryCompressor — compression boundary must not split tool pairs
# ---------------------------------------------------------------------------
def _gpt_with_tool_call(label, tokens):
"""A 'gpt' turn carrying a <tool_call> marker, padded to ~`tokens` tokens."""
body = f"<tool_call>\n{{\"name\": \"{label}\"}}\n</tool_call>"
pad = max(0, tokens * 4 - len(body))
return {"from": "gpt", "value": body + "x" * pad}
def _tool_response(label, tokens):
"""A 'tool' turn carrying a <tool_response> marker, padded to ~`tokens` tokens."""
body = f"<tool_response>\n{{\"name\": \"{label}\"}}\n</tool_response>"
pad = max(0, tokens * 4 - len(body))
return {"from": "tool", "value": body + "x" * pad}
def _count_marker(trajectory, marker):
return sum(turn["value"].count(marker) for turn in trajectory)
def _paired_trajectory():
"""A 10-turn trajectory of gpt/tool pairs with one oversized middle gpt turn.
Layout (index): system, human, gpt#0, tool#0, gpt#1(big), tool#1, gpt#2,
tool#2, gpt(final), human. With ``protect_last_n_turns=2`` the compressible
region is [4, 8) and the oversized gpt#1 at index 4 is large enough that the
token-accumulation boundary stops at index 5 i.e. between gpt#1's
<tool_call> and tool#1's <tool_response>.
"""
return [
{"from": "system", "value": "You are an agent. " * 4},
{"from": "human", "value": "Please do the task. " * 4},
_gpt_with_tool_call("a", 12),
_tool_response("a", 12),
_gpt_with_tool_call("b", 400), # oversized — forces a mid-pair boundary
_tool_response("b", 12),
_gpt_with_tool_call("c", 12),
_tool_response("c", 12),
{"from": "gpt", "value": "<think>\n</think>\nAll done."},
{"from": "human", "value": "Thanks!"},
]
def _target_that_splits_after_index_4(tc, trajectory):
"""Pick a target so token accumulation breaks right after index 4 (a gpt)."""
turn_tokens = tc.count_turn_tokens(trajectory)
total = sum(turn_tokens)
# threshold == turn_tokens[4] makes the loop break at compress_until = 5,
# which lands on the tool turn paired with gpt#1.
return total - turn_tokens[4] + tc.config.summary_target_tokens
class TestCompressionToolPairIntegrity:
def _config(self):
config = CompressionConfig()
config.protect_last_n_turns = 2
config.summary_target_tokens = 4
return config
def test_sync_compression_does_not_orphan_tool_markers(self):
tc = _make_compressor(self._config())
tc._generate_summary = MagicMock(
return_value="[CONTEXT SUMMARY]: middle turns summarized."
)
trajectory = _paired_trajectory()
tc.config.target_max_tokens = _target_that_splits_after_index_4(tc, trajectory)
compressed, metrics = tc.compress_trajectory(trajectory)
assert metrics.was_compressed
# Every <tool_call> must keep its matching <tool_response>.
assert _count_marker(compressed, "<tool_call>") == _count_marker(
compressed, "<tool_response>"
)
# A kept 'tool' turn must always immediately follow its 'gpt' turn —
# never the inserted summary (a 'human' turn) or another 'tool' turn.
for i, turn in enumerate(compressed):
if turn.get("from") == "tool":
assert i > 0 and compressed[i - 1].get("from") == "gpt"
@pytest.mark.asyncio
async def test_async_compression_does_not_orphan_tool_markers(self):
tc = _make_compressor(self._config())
tc._generate_summary_async = AsyncMock(
return_value="[CONTEXT SUMMARY]: middle turns summarized."
)
trajectory = _paired_trajectory()
tc.config.target_max_tokens = _target_that_splits_after_index_4(tc, trajectory)
compressed, metrics = await tc.compress_trajectory_async(trajectory)
assert metrics.was_compressed
assert _count_marker(compressed, "<tool_call>") == _count_marker(
compressed, "<tool_response>"
)
for i, turn in enumerate(compressed):
if turn.get("from") == "tool":
assert i > 0 and compressed[i - 1].get("from") == "gpt"
def test_snap_boundary_skips_tool_turn_forward(self):
tc = _make_compressor()
trajectory = _paired_trajectory()
# Index 5 is a 'tool' turn; the boundary should move forward to 6.
assert tc._snap_boundary(trajectory, 5, 4, 8) == 6
# Index 4 is a 'gpt' turn and already clean.
assert tc._snap_boundary(trajectory, 4, 4, 8) == 4
def test_snap_boundary_falls_back_to_backward(self):
tc = _make_compressor()
# Protected tail begins on a 'tool' turn at max_idx: no clean boundary
# ahead, so the boundary must retreat onto the preceding 'gpt' turn.
trajectory = [
{"from": "gpt", "value": "<tool_call>a</tool_call>"},
{"from": "tool", "value": "<tool_response>a</tool_response>"},
]
assert tc._snap_boundary(trajectory, 1, 0, 1) == 0

View file

@ -524,9 +524,48 @@ class TrajectoryCompressor:
compressible_start = max(head_protected) + 1 if head_protected else 0
compressible_end = min(tail_protected) if tail_protected else n
return protected, compressible_start, compressible_end
@staticmethod
def _is_boundary_clean(trajectory: List[Dict[str, str]], idx: int) -> bool:
"""Return True if a region boundary at ``idx`` does not split a turn pair.
In the from/value trajectory format a ``tool`` turn (carrying
``<tool_response>`` markers) is always emitted immediately after the
``gpt`` turn whose ``<tool_call>`` it answers. A compression boundary
that lands *on* a ``tool`` turn therefore cuts between a tool call and
its response. A boundary is only clean when it sits at the very end of
the trajectory or on a non-``tool`` turn.
"""
return idx >= len(trajectory) or trajectory[idx].get("from") != "tool"
@classmethod
def _snap_boundary(
cls,
trajectory: List[Dict[str, str]],
idx: int,
min_idx: int,
max_idx: int,
) -> int:
"""Move a compression boundary onto the nearest clean turn boundary.
Moving forward is preferred so that an orphaned ``tool`` turn is folded
into the region that already holds its ``gpt`` turn; if no clean
boundary exists ahead (for example the protected tail itself begins on a
``tool`` turn) the boundary is moved backward instead. The result is
clamped to ``[min_idx, max_idx]``.
"""
forward = idx
while forward < max_idx and not cls._is_boundary_clean(trajectory, forward):
forward += 1
if cls._is_boundary_clean(trajectory, forward):
return forward
backward = idx
while backward > min_idx and not cls._is_boundary_clean(trajectory, backward):
backward -= 1
return backward
def _extract_turn_content_for_summary(self, trajectory: List[Dict[str, str]], start: int, end: int) -> str:
"""
Extract content from turns to be summarized.
@ -746,7 +785,11 @@ Write only the summary, starting with "[CONTEXT SUMMARY]:" prefix."""
# Find protected regions
protected, compress_start, compress_end = self._find_protected_indices(trajectory)
# Snap the head boundary so the compressible region never *starts* on an
# orphaned <tool_response> whose <tool_call> lives in the protected head.
compress_start = self._snap_boundary(trajectory, compress_start, compress_start, compress_end)
# Check if there's anything to compress
if compress_start >= compress_end:
# Nothing to compress, return as-is
@ -780,17 +823,29 @@ Write only the summary, starting with "[CONTEXT SUMMARY]:" prefix."""
if accumulated_tokens < target_tokens_to_compress and compress_until < compress_end:
compress_until = compress_end
accumulated_tokens = sum(turn_tokens[compress_start:compress_end])
# Snap the tail boundary so we never cut between a <tool_call> and its
# <tool_response>: the summary replaces [compress_start, compress_until)
# and the remainder is kept verbatim, so a boundary on a tool turn would
# leave an orphaned marker and corrupt the training trajectory.
compress_until = self._snap_boundary(trajectory, compress_until, compress_start, compress_end)
if compress_until <= compress_start:
# Snapping collapsed the region; nothing can be safely compressed.
metrics.compressed_tokens = total_tokens
metrics.compressed_turns = len(trajectory)
metrics.still_over_limit = total_tokens > self.config.target_max_tokens
return trajectory, metrics
# Record compression region
metrics.turns_compressed_start_idx = compress_start
metrics.turns_compressed_end_idx = compress_until
metrics.turns_in_compressed_region = compress_until - compress_start
# Extract content for summary
content_to_summarize = self._extract_turn_content_for_summary(
trajectory, compress_start, compress_until
)
# Generate summary
summary = self._generate_summary(content_to_summarize, metrics)
@ -853,7 +908,11 @@ Write only the summary, starting with "[CONTEXT SUMMARY]:" prefix."""
# Find protected regions
protected, compress_start, compress_end = self._find_protected_indices(trajectory)
# Snap the head boundary so the compressible region never *starts* on an
# orphaned <tool_response> whose <tool_call> lives in the protected head.
compress_start = self._snap_boundary(trajectory, compress_start, compress_start, compress_end)
# Check if there's anything to compress
if compress_start >= compress_end:
metrics.compressed_tokens = total_tokens
@ -879,17 +938,29 @@ Write only the summary, starting with "[CONTEXT SUMMARY]:" prefix."""
if accumulated_tokens < target_tokens_to_compress and compress_until < compress_end:
compress_until = compress_end
accumulated_tokens = sum(turn_tokens[compress_start:compress_end])
# Snap the tail boundary so we never cut between a <tool_call> and its
# <tool_response>: the summary replaces [compress_start, compress_until)
# and the remainder is kept verbatim, so a boundary on a tool turn would
# leave an orphaned marker and corrupt the training trajectory.
compress_until = self._snap_boundary(trajectory, compress_until, compress_start, compress_end)
if compress_until <= compress_start:
# Snapping collapsed the region; nothing can be safely compressed.
metrics.compressed_tokens = total_tokens
metrics.compressed_turns = len(trajectory)
metrics.still_over_limit = total_tokens > self.config.target_max_tokens
return trajectory, metrics
# Record compression region
metrics.turns_compressed_start_idx = compress_start
metrics.turns_compressed_end_idx = compress_until
metrics.turns_in_compressed_region = compress_until - compress_start
# Extract content for summary
content_to_summarize = self._extract_turn_content_for_summary(
trajectory, compress_start, compress_until
)
# Generate summary (ASYNC)
summary = await self._generate_summary_async(content_to_summarize, metrics)