test+polish(compression): pin anti-thrash gate and gateway session_id persistence

Follow-up to @someaka's fix.

Polish:
- Drop the redundant `_preflight_tokens >= threshold_tokens` clause.
  `should_compress(tokens)` already short-circuits when tokens < threshold,
  so the explicit comparison was dead code on the True branch.

Tests:
- Preflight: pin that should_compress() is called (anti-thrash has a vote).
  Mocks should_compress to return False even with tokens past the raw
  threshold and asserts no compression runs — exact bug shape from #29335.
- Gateway: AST scan of gateway/run.py asserts every
  `session_entry.session_id = ...` assignment is followed by a
  `session_store._save()` call within the same block. Three sites mutate
  the session_id after compression; all three must persist or the next
  turn loads the pre-compression transcript and re-loops. Empirically
  verified the test catches the bug (drops the new _save() line → red).

AUTHOR_MAP:
- Map ed@bebop.crew -> someaka so the salvaged commit resolves to
  @someaka in release notes.
This commit is contained in:
Teknium 2026-05-25 00:45:12 -07:00
parent 3914089d52
commit 11c40d6a42
4 changed files with 147 additions and 2 deletions

View file

@ -484,8 +484,7 @@ def run_conversation(
tools=agent.tools or None,
)
if _preflight_tokens >= agent.context_compressor.threshold_tokens \
and agent.context_compressor.should_compress(_preflight_tokens):
if agent.context_compressor.should_compress(_preflight_tokens):
logger.info(
"Preflight compression: ~%s tokens >= %s threshold (model %s, ctx %s)",
f"{_preflight_tokens:,}",

View file

@ -56,6 +56,7 @@ AUTHOR_MAP = {
"30366221+WorldWriter@users.noreply.github.com": "WorldWriter",
"dafeng@DafengdeMacBook-Pro.local": "WorldWriter",
"schepers.zander1@gmail.com": "Strontvod",
"ed@bebop.crew": "someaka",
"anadi.jaggia@gmail.com": "Jaggia",
"32201324+simpolism@users.noreply.github.com": "simpolism",
"simpolism@gmail.com": "simpolism",

View file

@ -0,0 +1,111 @@
"""Regression tests for #29335 — gateway must persist ``session_entry.session_id``
after the agent's compression path mutates it.
When ``_compress_context()`` rolls the agent forward into a new session, the
agent now returns the new ``session_id`` in its result dict. The gateway
updates ``session_entry.session_id`` in memory AND must call
``session_store._save()`` so the new mapping survives a gateway restart.
Without ``_save()``, the next turn loads the OLD session's transcript and
re-triggers compression forever.
Three sites in ``gateway/run.py`` mutate ``session_entry.session_id`` after
a compression-induced session split. All three MUST be followed by a
``_save()`` call. This test pins that invariant.
"""
from __future__ import annotations
import ast
import inspect
import textwrap
from gateway import run as gateway_run
def _session_id_assignments_followed_by_save(source: str) -> list[tuple[int, bool]]:
"""For each ``session_entry.session_id = ...`` assignment in *source*,
return ``(lineno, saved_within_5_stmts)`` True iff a
``self.session_store._save()`` call appears in the same block within the
next 5 statements (covers normal control flow without false-flagging
cleanup that lives 200 lines away).
"""
tree = ast.parse(textwrap.dedent(source))
results: list[tuple[int, bool]] = []
class _Visitor(ast.NodeVisitor):
def _is_session_id_assign(self, node: ast.AST) -> bool:
if not isinstance(node, ast.Assign):
return False
for target in node.targets:
if (
isinstance(target, ast.Attribute)
and target.attr == "session_id"
and isinstance(target.value, ast.Name)
and target.value.id == "session_entry"
):
return True
return False
def _block_has_save_after(self, body: list[ast.stmt], idx: int) -> bool:
for stmt in body[idx : idx + 6]:
for sub in ast.walk(stmt):
if (
isinstance(sub, ast.Call)
and isinstance(sub.func, ast.Attribute)
and sub.func.attr == "_save"
):
return True
return False
def _walk_body(self, body: list[ast.stmt]) -> None:
for i, stmt in enumerate(body):
if self._is_session_id_assign(stmt):
results.append((stmt.lineno, self._block_has_save_after(body, i)))
for child in ast.iter_child_nodes(stmt):
if isinstance(child, (ast.If, ast.For, ast.While, ast.With,
ast.Try, ast.AsyncWith, ast.AsyncFor)):
self._walk_node(child)
def _walk_node(self, node: ast.AST) -> None:
for attr in ("body", "orelse", "finalbody"):
inner = getattr(node, attr, None)
if isinstance(inner, list):
self._walk_body(inner)
if hasattr(node, "handlers"):
for handler in node.handlers:
self._walk_body(handler.body)
def visit(self, node: ast.AST) -> None:
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
self._walk_body(node.body)
for child in ast.iter_child_nodes(node):
self.visit(child)
_Visitor().visit(tree)
return results
def test_every_post_compression_session_id_assignment_persists():
"""Every ``session_entry.session_id = ...`` in gateway/run.py must be
followed by a ``session_store._save()`` call within the same block.
Regression for #29335 — the assignment at the end of
``_handle_message_with_agent`` used to skip ``_save()`` while two sibling
sites (hygiene rewrite, manual /compress) already persisted. The agent
would compress correctly, the gateway would update its in-memory
session_id, then drop it on next gateway restart.
"""
source = inspect.getsource(gateway_run)
assignments = _session_id_assignments_followed_by_save(source)
assert assignments, (
"No ``session_entry.session_id = ...`` assignments found in gateway/run.py — "
"either the structure changed or the AST walker is broken."
)
missing = [lineno for lineno, saved in assignments if not saved]
assert not missing, (
f"{len(missing)} ``session_entry.session_id = ...`` site(s) in gateway/run.py "
f"are not followed by ``session_store._save()`` within the same block "
f"(lines: {missing}). Every post-compression session_id update must persist "
f"or the next turn loads the pre-compression transcript and triggers an "
f"infinite compression loop. See issue #29335."
)

View file

@ -543,6 +543,40 @@ class TestPreflightCompression:
mock_compress.assert_not_called()
def test_preflight_respects_anti_thrash(self, agent):
"""Preflight must call ``should_compress()`` so anti-thrash applies.
Regression for #29335 — preflight used to bypass ``should_compress()``
and re-trigger every turn even when the prior two passes each saved
<10% (the canonical infinite-compression-loop signal).
"""
agent.compression_enabled = True
agent.context_compressor.context_length = 2000
agent.context_compressor.threshold_tokens = 200
big_history = []
for i in range(20):
big_history.append({"role": "user", "content": f"Message {i} padded"})
big_history.append({"role": "assistant", "content": f"Response {i} padded"})
ok_resp = _mock_response(content="No preflight", finish_reason="stop")
agent.client.chat.completions.create.side_effect = [ok_resp]
with (
patch.object(agent.context_compressor, "should_compress", return_value=False) as mock_should,
patch.object(agent, "_compress_context") as mock_compress,
patch.object(agent, "_persist_session"),
patch.object(agent, "_save_trajectory"),
patch.object(agent, "_cleanup_task_resources"),
):
result = agent.run_conversation("hello", conversation_history=big_history)
# The gate consulted should_compress — anti-thrash had a chance to vote.
mock_should.assert_called()
# And vetoed: even though tokens >= threshold, no compression ran.
mock_compress.assert_not_called()
assert result["completed"] is True
class TestToolResultPreflightCompression:
"""Compression should trigger when tool results push context past the threshold."""