From 11c40d6a427b8730035f33a0883841b398223cb0 Mon Sep 17 00:00:00 2001 From: Teknium <127238744+teknium1@users.noreply.github.com> Date: Mon, 25 May 2026 00:45:12 -0700 Subject: [PATCH] test+polish(compression): pin anti-thrash gate and gateway session_id persistence MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Follow-up to @someaka's fix. Polish: - Drop the redundant `_preflight_tokens >= threshold_tokens` clause. `should_compress(tokens)` already short-circuits when tokens < threshold, so the explicit comparison was dead code on the True branch. Tests: - Preflight: pin that should_compress() is called (anti-thrash has a vote). Mocks should_compress to return False even with tokens past the raw threshold and asserts no compression runs — exact bug shape from #29335. - Gateway: AST scan of gateway/run.py asserts every `session_entry.session_id = ...` assignment is followed by a `session_store._save()` call within the same block. Three sites mutate the session_id after compression; all three must persist or the next turn loads the pre-compression transcript and re-loops. Empirically verified the test catches the bug (drops the new _save() line → red). AUTHOR_MAP: - Map ed@bebop.crew -> someaka so the salvaged commit resolves to @someaka in release notes. --- agent/conversation_loop.py | 3 +- scripts/release.py | 1 + ...test_compression_session_id_persistence.py | 111 ++++++++++++++++++ tests/run_agent/test_413_compression.py | 34 ++++++ 4 files changed, 147 insertions(+), 2 deletions(-) create mode 100644 tests/gateway/test_compression_session_id_persistence.py diff --git a/agent/conversation_loop.py b/agent/conversation_loop.py index 1361d6ed6e6..049cbcdf67e 100644 --- a/agent/conversation_loop.py +++ b/agent/conversation_loop.py @@ -484,8 +484,7 @@ def run_conversation( tools=agent.tools or None, ) - if _preflight_tokens >= agent.context_compressor.threshold_tokens \ - and agent.context_compressor.should_compress(_preflight_tokens): + if agent.context_compressor.should_compress(_preflight_tokens): logger.info( "Preflight compression: ~%s tokens >= %s threshold (model %s, ctx %s)", f"{_preflight_tokens:,}", diff --git a/scripts/release.py b/scripts/release.py index 8bfc4d42eab..f2da64997ea 100755 --- a/scripts/release.py +++ b/scripts/release.py @@ -56,6 +56,7 @@ AUTHOR_MAP = { "30366221+WorldWriter@users.noreply.github.com": "WorldWriter", "dafeng@DafengdeMacBook-Pro.local": "WorldWriter", "schepers.zander1@gmail.com": "Strontvod", + "ed@bebop.crew": "someaka", "anadi.jaggia@gmail.com": "Jaggia", "32201324+simpolism@users.noreply.github.com": "simpolism", "simpolism@gmail.com": "simpolism", diff --git a/tests/gateway/test_compression_session_id_persistence.py b/tests/gateway/test_compression_session_id_persistence.py new file mode 100644 index 00000000000..a2ea09048ae --- /dev/null +++ b/tests/gateway/test_compression_session_id_persistence.py @@ -0,0 +1,111 @@ +"""Regression tests for #29335 — gateway must persist ``session_entry.session_id`` +after the agent's compression path mutates it. + +When ``_compress_context()`` rolls the agent forward into a new session, the +agent now returns the new ``session_id`` in its result dict. The gateway +updates ``session_entry.session_id`` in memory AND must call +``session_store._save()`` so the new mapping survives a gateway restart. +Without ``_save()``, the next turn loads the OLD session's transcript and +re-triggers compression forever. + +Three sites in ``gateway/run.py`` mutate ``session_entry.session_id`` after +a compression-induced session split. All three MUST be followed by a +``_save()`` call. This test pins that invariant. +""" + +from __future__ import annotations + +import ast +import inspect +import textwrap + +from gateway import run as gateway_run + + +def _session_id_assignments_followed_by_save(source: str) -> list[tuple[int, bool]]: + """For each ``session_entry.session_id = ...`` assignment in *source*, + return ``(lineno, saved_within_5_stmts)`` — True iff a + ``self.session_store._save()`` call appears in the same block within the + next 5 statements (covers normal control flow without false-flagging + cleanup that lives 200 lines away). + """ + tree = ast.parse(textwrap.dedent(source)) + results: list[tuple[int, bool]] = [] + + class _Visitor(ast.NodeVisitor): + def _is_session_id_assign(self, node: ast.AST) -> bool: + if not isinstance(node, ast.Assign): + return False + for target in node.targets: + if ( + isinstance(target, ast.Attribute) + and target.attr == "session_id" + and isinstance(target.value, ast.Name) + and target.value.id == "session_entry" + ): + return True + return False + + def _block_has_save_after(self, body: list[ast.stmt], idx: int) -> bool: + for stmt in body[idx : idx + 6]: + for sub in ast.walk(stmt): + if ( + isinstance(sub, ast.Call) + and isinstance(sub.func, ast.Attribute) + and sub.func.attr == "_save" + ): + return True + return False + + def _walk_body(self, body: list[ast.stmt]) -> None: + for i, stmt in enumerate(body): + if self._is_session_id_assign(stmt): + results.append((stmt.lineno, self._block_has_save_after(body, i))) + for child in ast.iter_child_nodes(stmt): + if isinstance(child, (ast.If, ast.For, ast.While, ast.With, + ast.Try, ast.AsyncWith, ast.AsyncFor)): + self._walk_node(child) + + def _walk_node(self, node: ast.AST) -> None: + for attr in ("body", "orelse", "finalbody"): + inner = getattr(node, attr, None) + if isinstance(inner, list): + self._walk_body(inner) + if hasattr(node, "handlers"): + for handler in node.handlers: + self._walk_body(handler.body) + + def visit(self, node: ast.AST) -> None: + if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): + self._walk_body(node.body) + for child in ast.iter_child_nodes(node): + self.visit(child) + + _Visitor().visit(tree) + return results + + +def test_every_post_compression_session_id_assignment_persists(): + """Every ``session_entry.session_id = ...`` in gateway/run.py must be + followed by a ``session_store._save()`` call within the same block. + + Regression for #29335 — the assignment at the end of + ``_handle_message_with_agent`` used to skip ``_save()`` while two sibling + sites (hygiene rewrite, manual /compress) already persisted. The agent + would compress correctly, the gateway would update its in-memory + session_id, then drop it on next gateway restart. + """ + source = inspect.getsource(gateway_run) + assignments = _session_id_assignments_followed_by_save(source) + assert assignments, ( + "No ``session_entry.session_id = ...`` assignments found in gateway/run.py — " + "either the structure changed or the AST walker is broken." + ) + missing = [lineno for lineno, saved in assignments if not saved] + assert not missing, ( + f"{len(missing)} ``session_entry.session_id = ...`` site(s) in gateway/run.py " + f"are not followed by ``session_store._save()`` within the same block " + f"(lines: {missing}). Every post-compression session_id update must persist " + f"or the next turn loads the pre-compression transcript and triggers an " + f"infinite compression loop. See issue #29335." + ) diff --git a/tests/run_agent/test_413_compression.py b/tests/run_agent/test_413_compression.py index 3cbd47c0e1b..82fc6b3e60d 100644 --- a/tests/run_agent/test_413_compression.py +++ b/tests/run_agent/test_413_compression.py @@ -543,6 +543,40 @@ class TestPreflightCompression: mock_compress.assert_not_called() + def test_preflight_respects_anti_thrash(self, agent): + """Preflight must call ``should_compress()`` so anti-thrash applies. + + Regression for #29335 — preflight used to bypass ``should_compress()`` + and re-trigger every turn even when the prior two passes each saved + <10% (the canonical infinite-compression-loop signal). + """ + agent.compression_enabled = True + agent.context_compressor.context_length = 2000 + agent.context_compressor.threshold_tokens = 200 + + big_history = [] + for i in range(20): + big_history.append({"role": "user", "content": f"Message {i} padded"}) + big_history.append({"role": "assistant", "content": f"Response {i} padded"}) + + ok_resp = _mock_response(content="No preflight", finish_reason="stop") + agent.client.chat.completions.create.side_effect = [ok_resp] + + with ( + patch.object(agent.context_compressor, "should_compress", return_value=False) as mock_should, + patch.object(agent, "_compress_context") as mock_compress, + patch.object(agent, "_persist_session"), + patch.object(agent, "_save_trajectory"), + patch.object(agent, "_cleanup_task_resources"), + ): + result = agent.run_conversation("hello", conversation_history=big_history) + + # The gate consulted should_compress — anti-thrash had a chance to vote. + mock_should.assert_called() + # And vetoed: even though tokens >= threshold, no compression ran. + mock_compress.assert_not_called() + assert result["completed"] is True + class TestToolResultPreflightCompression: """Compression should trigger when tool results push context past the threshold."""