fix(agent): rebaseline in-place compression flushes

This commit is contained in:
diamondeyesfox 2026-06-26 20:02:46 -05:00 committed by Teknium
parent 1b75b3fd90
commit 8df231c941
4 changed files with 102 additions and 17 deletions

View file

@ -288,6 +288,29 @@ def replay_compression_warning(agent: Any) -> None:
pass
def conversation_history_after_compression(agent: Any, messages: list) -> Optional[list]:
"""Return the correct flush baseline after a compression boundary.
Legacy compression rotates to a fresh child session. That child has not
seen the compacted transcript through the normal same-turn flush path yet,
so callers must clear ``conversation_history`` to ``None`` and let the next
persistence call write the whole compacted list.
In-place compaction is different: ``archive_and_compact()`` has already
soft-archived the previous active rows and inserted ``messages`` as the new
active live transcript under the same session id. If the same agent turn
continues with ``conversation_history=None``, the identity-based flush path
treats those already-persisted compacted dicts as new and appends them a
second time, doubling the active context and retriggering compression.
A shallow copy is intentional: it captures the current compacted dict
identities as history while allowing later same-turn appends to remain new.
"""
if bool(getattr(agent, "_last_compaction_in_place", False)):
return list(messages)
return None
def compress_context(
agent: Any,
messages: list,

View file

@ -28,6 +28,7 @@ import uuid
from typing import Any, Dict, List, Optional
from agent.codex_responses_adapter import _summarize_user_message_for_log
from agent.conversation_compression import conversation_history_after_compression
from agent.display import KawaiiSpinner
from agent.error_classifier import FailoverReason, classify_api_error
from agent.iteration_budget import IterationBudget
@ -2830,10 +2831,9 @@ def run_conversation(
approx_tokens=approx_tokens,
task_id=effective_task_id,
)
# Compression created a new session — clear history
# so _flush_messages_to_session_db writes compressed
# messages to the new session, not skipping them.
conversation_history = None
conversation_history = conversation_history_after_compression(
agent, messages
)
if len(messages) < original_len or old_ctx > _reduced_ctx:
agent._buffer_status(
f"🗜️ Context reduced to {_reduced_ctx:,} tokens "
@ -3042,10 +3042,9 @@ def run_conversation(
messages, system_message, approx_tokens=approx_tokens,
task_id=effective_task_id,
)
# Compression created a new session — clear history
# so _flush_messages_to_session_db writes compressed
# messages to the new session, not skipping them.
conversation_history = None
conversation_history = conversation_history_after_compression(
agent, messages
)
# Re-estimate tokens after compression. Same-message-count
# compression (tool-result pruning, in-place summarization)
@ -3209,10 +3208,9 @@ def run_conversation(
messages, system_message, approx_tokens=approx_tokens,
task_id=effective_task_id,
)
# Compression created a new session — clear history
# so _flush_messages_to_session_db writes compressed
# messages to the new session, not skipping them.
conversation_history = None
conversation_history = conversation_history_after_compression(
agent, messages
)
# Re-estimate tokens after compression. Same-message-count
# compression (tool-result pruning, in-place summarization)
@ -4316,10 +4314,9 @@ def run_conversation(
approx_tokens=agent.context_compressor.last_prompt_tokens,
task_id=effective_task_id,
)
# Compression created a new session — clear history so
# _flush_messages_to_session_db writes compressed messages
# to the new session (see preflight compression comment).
conversation_history = None
conversation_history = conversation_history_after_compression(
agent, messages
)
# Save session log incrementally (so progress is visible even if interrupted)
agent._session_messages = messages

View file

@ -28,6 +28,7 @@ import uuid
from dataclasses import dataclass
from typing import Any, Dict, List, Optional
from agent.conversation_compression import conversation_history_after_compression
from agent.iteration_budget import IterationBudget
from agent.model_metadata import (
estimate_messages_tokens_rough,
@ -400,7 +401,9 @@ def build_turn_context(
_orig_len, len(messages), _orig_tokens, _preflight_tokens
):
break # Cannot compress further: neither rows nor tokens moved
conversation_history = None
conversation_history = conversation_history_after_compression(
agent, messages
)
agent._empty_content_retries = 0
agent._thinking_prefill_retries = 0
agent._last_content_with_tools = None

View file

@ -129,6 +129,68 @@ class TestFlushAfterCompression:
assert len(rows) == 2
assert [row["content"] for row in rows] == ["summary", "continuing..."]
def test_in_place_compression_rebaseline_prevents_duplicate_compacted_rows(self):
"""In-place compaction already persisted the compacted transcript.
Regression for the 2026-06-26 SRE compression loop: archive_and_compact()
inserted a compacted active block, then the same turn continued with
conversation_history=None and _flush_messages_to_session_db() appended
the compacted dicts again, doubling live context.
"""
from agent.conversation_compression import conversation_history_after_compression
from hermes_state import SessionDB
with tempfile.TemporaryDirectory() as tmpdir:
db_path = Path(tmpdir) / "test.db"
db = SessionDB(db_path=db_path)
agent = self._make_agent(db)
agent._ensure_db_session()
original_history = [
{"role": "user", "content": "old question"},
{"role": "assistant", "content": "old answer"},
]
agent._flush_messages_to_session_db(original_history, [])
assert [row["content"] for row in db.get_messages("original-session")] == [
"old question",
"old answer",
]
compacted = [
{"role": "assistant", "content": "[CONTEXT COMPACTION] summary"},
{"role": "user", "content": "recent question"},
{"role": "assistant", "content": "recent answer"},
]
db.archive_and_compact("original-session", compacted)
setattr(agent, "_last_compaction_in_place", True)
agent._last_flushed_db_idx = 0
# Same agent turn continues after compaction. The compacted dicts
# must be treated as already-persisted history; only later appends
# should be flushed.
post_compaction_history = conversation_history_after_compression(
agent, compacted
)
assert post_compaction_history is not None
assert post_compaction_history is not compacted
assert post_compaction_history == compacted
messages = compacted + [
{"role": "tool", "content": "tool result"},
{"role": "assistant", "content": "final answer"},
]
agent._flush_messages_to_session_db(messages, post_compaction_history)
rows = db.get_messages("original-session")
assert [row["content"] for row in rows] == [
"[CONTEXT COMPACTION] summary",
"recent question",
"recent answer",
"tool result",
"final answer",
]
# ---------------------------------------------------------------------------
# Part 2: Gateway-side — history_offset after session split