fix(compression): make in-place compaction durable + rotation-independent end-to-end

Review (Codex + 3-agent parallel) found the first cut of in-place mode was
incomplete: it only updated the system prompt, so the persisted transcript
stayed 'full history + summary' and the next turn/resume reloaded the full
history and immediately re-compacted (a loop), and every downstream layer
that keyed off session-id rotation silently no-op'd. The session_id was
doing double duty as the 'compaction happened' signal. This wires the whole
path so removing rotation is actually complete:

Agent (agent/conversation_compression.py):
- In-place now DURABLY replaces the transcript: replace_messages(session_id,
  compressed) on the same row (the canonical store the gateway reloads from),
  not just update_system_prompt. Resume reloads the compacted set; no loop.
- Reset flush identity/cursor (_last_flushed_db_idx=0, _flushed_db_message_ids
  cleared) so next-turn appends diff against the compacted transcript.
- Expose a rotation-independent signal: agent._last_compaction_in_place, and
  in_place=True on the session:compress event.
- Fire the compaction-boundary hooks (context-engine on_session_start, memory
  manager on_session_switch, reason='compression') in BOTH modes — in-place
  passes the same id as parent so DAG/buffer state still checkpoints. Without
  this, memory/context plugins miss every in-place compaction.

Gateway auto-compress (gateway/run.py):
- Read agent._last_compaction_in_place; set history_offset=0 on rotation OR
  in-place (both return the compacted set, so slicing past the pre-compaction
  length would drop everything). Carry compacted_in_place in the result dict.
- No extra rewrite needed: the agent shares the gateway's SessionDB, so its
  replace_messages already updated the canonical store load_transcript reads.

Manual /compress (gateway/slash_commands.py):
- The throwaway /compress agent has no _session_db, so rewrite_transcript is
  the durable write. Previously gated behind 'if rotated:' which treated
  'id unchanged' as the #44794 data-loss failure case and SKIPPED the rewrite
  — making /compress a silent no-op in in-place mode. Now rewrites on rotated
  OR in_place; the data-loss guard still fires only for the genuine
  no-rotation-AND-not-in-place failure.

Hygiene auto-compress already writes _compressed to the same id
unconditionally (its agent has no _session_db, can't rotate) — correct for
in-place, no change.

Tests (tests/run_agent/test_in_place_compaction.py):
- Assert the DURABLE transcript IS the compacted set after reload
  (get_messages_as_conversation == compacted), message_count==2, flush
  identity reset, and the rotation-independent signal set on in-place /
  unset on rotation. Rotation regression guard unchanged.

Verified: 64 tests green across in-place + rotation/persistence/boundary/
concurrent/failure-sync/command/cli suites; E2E both modes (durable replace,
gateway offset=0, rotation preserves old transcript); ruff clean. Still
default-off.
This commit is contained in:
kshitijk4poor 2026-06-19 19:20:49 +05:30 committed by Teknium
parent 47fadc24d7
commit 1fbf48d4ad
4 changed files with 141 additions and 41 deletions

View file

@ -535,13 +535,34 @@ def compress_context(
if in_place:
# ── In-place compaction: keep the same session_id ──────────
# No end_session, no new row, no parent_session_id, no title
# renumber, no contextvar/env/logging re-sync. Just refresh
# the stored system prompt on the existing row. The session's
# id, title, cwd, /goal, FTS-indexed history, and gateway
# routing all stay put. See #38763.
# renumber, no contextvar/env/logging re-sync. The session's
# id, title, cwd, /goal, and gateway routing all stay put.
#
# Durable replace: the persisted transcript MUST become the
# compacted set, not "original history + summary". The flush
# above wrote any un-persisted current-turn messages onto the
# row; now atomically replace ALL rows with `compressed` so a
# resume reloads the compacted transcript (lossy by design —
# the pre-compaction turns are summarized away). Without this
# the row keeps the full history and compaction never durably
# shrinks anything (the next turn just re-compacts). See #38763.
agent._session_db.replace_messages(agent.session_id, compressed)
agent._session_db.update_system_prompt(
agent.session_id, new_system_prompt
)
# Reset the flush identity/cursor so the next turn's appends are
# diffed against the COMPACTED transcript, not the pre-compaction
# one. _flush_messages_to_session_db rebuilds its identity set
# when _last_flushed_db_idx == 0; the compacted dicts are passed
# as conversation_history next turn and skipped by identity, so
# only genuinely new turn messages get appended (no dup of the
# summary, no resurrection of dropped turns).
agent._last_flushed_db_idx = 0
agent._flushed_db_message_ids = set()
# Rotation-independent signal: the conversation was compacted in
# place (id unchanged). The caller / gateway uses this instead of
# an id-change diff to re-baseline transcript handling.
compacted_in_place = True
else:
# ── Rotation (legacy): end this session, fork a continuation ─
# Propagate title to the new session with auto-numbering
@ -594,34 +615,37 @@ def compress_context(
except Exception as e:
logger.warning("Session DB compression split failed — new session will NOT be indexed: %s", e)
# Notify the context engine that the session_id rotated because of
# compression (not a fresh /new). Plugin engines (e.g. hermes-lcm) use
# boundary_reason="compression" to preserve DAG lineage across the
# rollover instead of re-initializing fresh per-session state.
# See hermes-lcm#68. Built-in ContextCompressor ignores kwargs.
# Notify the context engine that a compaction boundary occurred. Plugin
# engines (e.g. hermes-lcm) use boundary_reason="compression" to preserve
# DAG lineage / checkpoint per-session state across the boundary instead of
# re-initializing fresh. See hermes-lcm#68. Built-in ContextCompressor
# ignores kwargs. Fires in BOTH modes: rotation passes old→new ids; in-place
# passes the SAME id (the boundary is real even though the id didn't move).
try:
_old_sid = locals().get("old_session_id")
if _old_sid and hasattr(agent.context_compressor, "on_session_start"):
_boundary = bool(_old_sid) or in_place
if _boundary and hasattr(agent.context_compressor, "on_session_start"):
agent.context_compressor.on_session_start(
agent.session_id or "",
boundary_reason="compression",
old_session_id=_old_sid,
old_session_id=_old_sid or agent.session_id or "",
conversation_id=getattr(agent, "_gateway_session_key", None),
)
except Exception as _ce_err:
logger.debug("context engine on_session_start (compression): %s", _ce_err)
# Notify memory providers of the compression-driven session_id rotation
# so provider-cached per-session state (Hindsight's _document_id,
# accumulated turn buffers, counters) refreshes. reset=False because
# the logical conversation continues; only the id and DB row rolled
# over. See #6672.
# Notify memory providers of the compaction boundary so provider-cached
# per-session state (Hindsight's _document_id, accumulated turn buffers,
# counters) refreshes. reset=False because the logical conversation
# continues. See #6672. Fires in BOTH modes: in-place uses the same id as
# parent (the conversation didn't fork, but the buffer must still be told
# the transcript was compacted so it doesn't double-count dropped turns).
try:
_old_sid = locals().get("old_session_id")
if _old_sid and agent._memory_manager:
if (_old_sid or in_place) and agent._memory_manager:
agent._memory_manager.on_session_switch(
agent.session_id or "",
parent_session_id=_old_sid,
parent_session_id=_old_sid or agent.session_id or "",
reset=False,
reason="compression",
)
@ -638,7 +662,9 @@ def compress_context(
)
# Emit session:compress event so hooks (e.g. MemPalace sync) can ingest
# the completed old session before its details are lost.
# the completed old session before its details are lost. In in-place mode
# there is no old id (same session); ``in_place=True`` tells hooks the
# transcript was compacted on the same id rather than rotated.
_old_sid_for_event = locals().get("old_session_id")
if getattr(agent, "event_callback", None):
try:
@ -646,11 +672,18 @@ def compress_context(
"platform": agent.platform or "",
"session_id": agent.session_id,
"old_session_id": _old_sid_for_event or "",
"in_place": in_place,
"compression_count": agent.context_compressor.compression_count,
})
except Exception as e:
logger.debug("event_callback error on session:compress: %s", e)
# Surface the compaction mode to the caller (run_conversation / gateway)
# via a rotation-independent flag. The gateway uses this — NOT an
# id-change diff — to re-baseline transcript handling (history_offset=0 +
# rewrite on the same id) when compaction happened in place. See #38763.
agent._last_compaction_in_place = bool(locals().get("compacted_in_place", False))
# Keep the post-compression rough estimate for diagnostics, but do not
# treat it as provider-reported prompt usage. Schema-heavy rough estimates
# can remain above threshold even after the next real API request fits.

View file

@ -15795,6 +15795,13 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
# below must still point the gateway at the compressed child.
agent = agent_holder[0]
_session_was_split = False
# In-place compaction (compression.in_place / #38763) compacts the
# transcript WITHOUT rotating the id, so the id-change diff below
# can't detect it. compress_context() sets this rotation-independent
# flag on the agent; the gateway uses it to re-baseline transcript
# handling (history_offset=0 + rewrite the JSONL transcript) the
# same way a split would, even though the session_id is unchanged.
_compacted_in_place = bool(getattr(agent, "_last_compaction_in_place", False)) if agent else False
agent_session_id = getattr(agent, 'session_id', session_id) if agent else session_id
if agent and session_key and agent_session_id != session_id:
_session_was_split = True
@ -15843,7 +15850,14 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
)
effective_session_id = agent_session_id
_effective_history_offset = 0 if _session_was_split else len(agent_history)
# history_offset=0 whenever the agent's message list no longer has
# the original history prefix — i.e. on rotation (split) OR in-place
# compaction. In both cases the returned `messages` is the compacted
# set, so the gateway must persist all of it (offset 0), not slice
# past the pre-compaction length (which would drop everything).
_effective_history_offset = (
0 if (_session_was_split or _compacted_in_place) else len(agent_history)
)
if not final_response:
error_msg = f"⚠️ {result['error']}" if result.get("error") else ""
@ -15860,6 +15874,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
"compression_exhausted": result.get("compression_exhausted", False),
"tools": tools_holder[0] or [],
"history_offset": _effective_history_offset,
"compacted_in_place": _compacted_in_place,
"session_id": effective_session_id,
"last_prompt_tokens": _last_prompt_toks,
"input_tokens": _input_toks,
@ -15960,6 +15975,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
"interrupt_message": result_holder[0].get("interrupt_message") if result_holder[0] else None,
"tools": tools_holder[0] or [],
"history_offset": _effective_history_offset,
"compacted_in_place": _compacted_in_place,
"last_prompt_tokens": _last_prompt_toks,
"input_tokens": _input_toks,
"output_tokens": _output_toks,

View file

@ -2627,12 +2627,14 @@ class GatewaySlashCommandsMixin:
if partial and tail:
compressed = rejoin_compressed_head_and_tail(compressed, tail)
# _compress_context already calls end_session() on the old session
# (preserving its full transcript in SQLite) and creates a new
# session_id for the continuation. Write the compressed messages
# into the NEW session so the original history stays searchable.
# _compress_context either rotated (legacy: ended the old
# session, created a continuation id — write compressed messages
# into the NEW session so the original stays searchable) or
# compacted in place (compression.in_place / #38763: same id,
# transcript replaced with the compacted set).
new_session_id = tmp_agent.session_id
rotated = new_session_id != session_entry.session_id
_in_place = bool(getattr(tmp_agent, "compression_in_place", False))
if rotated:
session_entry.session_id = new_session_id
self.session_store._save()
@ -2640,20 +2642,27 @@ class GatewaySlashCommandsMixin:
source, session_entry, reason="compress-command",
)
# Only rewrite the transcript when rotation actually produced a
# NEW session id. If _compress_context could not rotate (e.g.
# _session_db unavailable, or the DB split raised), session_id
# is unchanged and rewrite_transcript() would DELETE the
# original messages and replace them with only the compressed
# summary — permanent data loss (#44794, #39704). In that case
# leave the original transcript intact.
if rotated:
self.session_store.rewrite_transcript(new_session_id, compressed)
# Rewrite the transcript when EITHER rotation produced a new id
# OR in-place compaction succeeded. The danger this guards
# against is the THIRD case: _compress_context could NOT rotate
# AND was not in-place (e.g. legacy mode but _session_db
# unavailable / the DB split raised) — there session_id is
# unchanged for a FAILURE reason, and rewrite_transcript() would
# DELETE the original messages and replace them with only the
# compressed summary (permanent data loss #44794, #39704). In
# in-place mode the unchanged id is SUCCESS, so the rewrite is
# exactly right (and is the durable write when the throwaway
# /compress agent has no _session_db of its own).
if rotated or _in_place:
self.session_store.rewrite_transcript(
new_session_id, compressed
)
else:
logger.warning(
"Manual /compress: session rotation did not occur "
"(session_id unchanged) — preserving original transcript "
"instead of overwriting it (#44794)."
"(session_id unchanged) and in-place mode is off — "
"preserving original transcript instead of overwriting "
"it (#44794)."
)
# Reset stored token count — transcript changed, old value is stale
self.session_store.update_session(

View file

@ -87,12 +87,23 @@ class TestInPlaceCompaction:
row = db.get_session(sid)
assert row["end_reason"] is None
assert row["title"] == "my-research"
# Pre-compaction messages remain under the one id (FTS continuity).
assert row["message_count"] >= 8
# Flush cursor must NOT be reset to 0. Rotation resets it (a fresh
# row starts empty); in-place keeps writing to the same row, so the
# cursor only ever advances as current-turn messages are persisted.
assert agent._last_flushed_db_idx != 0
# DURABLE REPLACE (the core invariant): the persisted transcript is
# now the COMPACTED set, not "full history + summary". A resume must
# reload the compacted transcript so compaction actually shrinks the
# session and doesn't immediately re-compact (#38763).
reloaded = db.get_messages_as_conversation(sid)
assert len(reloaded) == 2
assert [m.get("content") for m in reloaded] == [
"[CONTEXT COMPACTION] summary of prior turns",
"recent reply",
]
assert row["message_count"] == 2
# Flush identity/cursor reset so next-turn appends diff against the
# compacted transcript (rebuilds the identity set on next flush).
assert agent._last_flushed_db_idx == 0
assert agent._flushed_db_message_ids == set()
# Rotation-independent in-place signal set for the gateway.
assert agent._last_compaction_in_place is True
# Transcript actually shrank.
assert len(compressed) == 2
@ -143,6 +154,37 @@ class TestRotationStillDefault:
assert child[0]["title"] == "my-research #2"
# Flush cursor reset for the new row.
assert agent._last_flushed_db_idx == 0
# Rotation mode does NOT set the in-place signal.
assert getattr(agent, "_last_compaction_in_place", False) is False
class TestInPlaceSignalForGateway:
"""compress_context must expose a rotation-independent flag the gateway can
read (instead of an id-change diff) to re-baseline transcript handling."""
def test_signal_set_on_in_place_unset_on_rotation(self):
from hermes_state import SessionDB
from agent.conversation_compression import compress_context
with tempfile.TemporaryDirectory() as tmp:
db = SessionDB(db_path=Path(tmp) / "t.db")
# in-place → flag True
_seed(db, "s_ip", "ip")
a_ip = _make_agent(db, "s_ip", in_place=True)
compress_context(
a_ip, [{"role": "user", "content": "x"}] * 8,
approx_tokens=100_000, system_message="sys",
)
assert a_ip._last_compaction_in_place is True
# rotation → flag False
_seed(db, "s_rot", "rot")
a_rot = _make_agent(db, "s_rot", in_place=False)
compress_context(
a_rot, [{"role": "user", "content": "x"}] * 8,
approx_tokens=100_000, system_message="sys",
)
assert a_rot._last_compaction_in_place is False
class TestInPlaceConfigDefault: