From 8df231c9413f85f2d5e0650f8e362df61e7abc24 Mon Sep 17 00:00:00 2001 From: diamondeyesfox Date: Fri, 26 Jun 2026 20:02:46 -0500 Subject: [PATCH] fix(agent): rebaseline in-place compression flushes --- agent/conversation_compression.py | 23 +++++++ agent/conversation_loop.py | 29 ++++----- agent/turn_context.py | 5 +- .../run_agent/test_compression_persistence.py | 62 +++++++++++++++++++ 4 files changed, 102 insertions(+), 17 deletions(-) diff --git a/agent/conversation_compression.py b/agent/conversation_compression.py index 70d997631bc..b16765ea9b4 100644 --- a/agent/conversation_compression.py +++ b/agent/conversation_compression.py @@ -288,6 +288,29 @@ def replay_compression_warning(agent: Any) -> None: pass +def conversation_history_after_compression(agent: Any, messages: list) -> Optional[list]: + """Return the correct flush baseline after a compression boundary. + + Legacy compression rotates to a fresh child session. That child has not + seen the compacted transcript through the normal same-turn flush path yet, + so callers must clear ``conversation_history`` to ``None`` and let the next + persistence call write the whole compacted list. + + In-place compaction is different: ``archive_and_compact()`` has already + soft-archived the previous active rows and inserted ``messages`` as the new + active live transcript under the same session id. If the same agent turn + continues with ``conversation_history=None``, the identity-based flush path + treats those already-persisted compacted dicts as new and appends them a + second time, doubling the active context and retriggering compression. + + A shallow copy is intentional: it captures the current compacted dict + identities as history while allowing later same-turn appends to remain new. + """ + if bool(getattr(agent, "_last_compaction_in_place", False)): + return list(messages) + return None + + def compress_context( agent: Any, messages: list, diff --git a/agent/conversation_loop.py b/agent/conversation_loop.py index c605e7cabac..ce2ff16cb0a 100644 --- a/agent/conversation_loop.py +++ b/agent/conversation_loop.py @@ -28,6 +28,7 @@ import uuid from typing import Any, Dict, List, Optional from agent.codex_responses_adapter import _summarize_user_message_for_log +from agent.conversation_compression import conversation_history_after_compression from agent.display import KawaiiSpinner from agent.error_classifier import FailoverReason, classify_api_error from agent.iteration_budget import IterationBudget @@ -2830,10 +2831,9 @@ def run_conversation( approx_tokens=approx_tokens, task_id=effective_task_id, ) - # Compression created a new session — clear history - # so _flush_messages_to_session_db writes compressed - # messages to the new session, not skipping them. - conversation_history = None + conversation_history = conversation_history_after_compression( + agent, messages + ) if len(messages) < original_len or old_ctx > _reduced_ctx: agent._buffer_status( f"🗜️ Context reduced to {_reduced_ctx:,} tokens " @@ -3042,10 +3042,9 @@ def run_conversation( messages, system_message, approx_tokens=approx_tokens, task_id=effective_task_id, ) - # Compression created a new session — clear history - # so _flush_messages_to_session_db writes compressed - # messages to the new session, not skipping them. - conversation_history = None + conversation_history = conversation_history_after_compression( + agent, messages + ) # Re-estimate tokens after compression. Same-message-count # compression (tool-result pruning, in-place summarization) @@ -3209,10 +3208,9 @@ def run_conversation( messages, system_message, approx_tokens=approx_tokens, task_id=effective_task_id, ) - # Compression created a new session — clear history - # so _flush_messages_to_session_db writes compressed - # messages to the new session, not skipping them. - conversation_history = None + conversation_history = conversation_history_after_compression( + agent, messages + ) # Re-estimate tokens after compression. Same-message-count # compression (tool-result pruning, in-place summarization) @@ -4316,10 +4314,9 @@ def run_conversation( approx_tokens=agent.context_compressor.last_prompt_tokens, task_id=effective_task_id, ) - # Compression created a new session — clear history so - # _flush_messages_to_session_db writes compressed messages - # to the new session (see preflight compression comment). - conversation_history = None + conversation_history = conversation_history_after_compression( + agent, messages + ) # Save session log incrementally (so progress is visible even if interrupted) agent._session_messages = messages diff --git a/agent/turn_context.py b/agent/turn_context.py index 6efa22a68ca..189771511b6 100644 --- a/agent/turn_context.py +++ b/agent/turn_context.py @@ -28,6 +28,7 @@ import uuid from dataclasses import dataclass from typing import Any, Dict, List, Optional +from agent.conversation_compression import conversation_history_after_compression from agent.iteration_budget import IterationBudget from agent.model_metadata import ( estimate_messages_tokens_rough, @@ -400,7 +401,9 @@ def build_turn_context( _orig_len, len(messages), _orig_tokens, _preflight_tokens ): break # Cannot compress further: neither rows nor tokens moved - conversation_history = None + conversation_history = conversation_history_after_compression( + agent, messages + ) agent._empty_content_retries = 0 agent._thinking_prefill_retries = 0 agent._last_content_with_tools = None diff --git a/tests/run_agent/test_compression_persistence.py b/tests/run_agent/test_compression_persistence.py index b3f42961a07..d38828ccba2 100644 --- a/tests/run_agent/test_compression_persistence.py +++ b/tests/run_agent/test_compression_persistence.py @@ -129,6 +129,68 @@ class TestFlushAfterCompression: assert len(rows) == 2 assert [row["content"] for row in rows] == ["summary", "continuing..."] + def test_in_place_compression_rebaseline_prevents_duplicate_compacted_rows(self): + """In-place compaction already persisted the compacted transcript. + + Regression for the 2026-06-26 SRE compression loop: archive_and_compact() + inserted a compacted active block, then the same turn continued with + conversation_history=None and _flush_messages_to_session_db() appended + the compacted dicts again, doubling live context. + """ + from agent.conversation_compression import conversation_history_after_compression + from hermes_state import SessionDB + + with tempfile.TemporaryDirectory() as tmpdir: + db_path = Path(tmpdir) / "test.db" + db = SessionDB(db_path=db_path) + + agent = self._make_agent(db) + agent._ensure_db_session() + + original_history = [ + {"role": "user", "content": "old question"}, + {"role": "assistant", "content": "old answer"}, + ] + agent._flush_messages_to_session_db(original_history, []) + assert [row["content"] for row in db.get_messages("original-session")] == [ + "old question", + "old answer", + ] + + compacted = [ + {"role": "assistant", "content": "[CONTEXT COMPACTION] summary"}, + {"role": "user", "content": "recent question"}, + {"role": "assistant", "content": "recent answer"}, + ] + db.archive_and_compact("original-session", compacted) + setattr(agent, "_last_compaction_in_place", True) + agent._last_flushed_db_idx = 0 + + # Same agent turn continues after compaction. The compacted dicts + # must be treated as already-persisted history; only later appends + # should be flushed. + post_compaction_history = conversation_history_after_compression( + agent, compacted + ) + assert post_compaction_history is not None + assert post_compaction_history is not compacted + assert post_compaction_history == compacted + + messages = compacted + [ + {"role": "tool", "content": "tool result"}, + {"role": "assistant", "content": "final answer"}, + ] + agent._flush_messages_to_session_db(messages, post_compaction_history) + + rows = db.get_messages("original-session") + assert [row["content"] for row in rows] == [ + "[CONTEXT COMPACTION] summary", + "recent question", + "recent answer", + "tool result", + "final answer", + ] + # --------------------------------------------------------------------------- # Part 2: Gateway-side — history_offset after session split