From 1fbf48d4ad827253a5637b0444a00beb38e22b2f Mon Sep 17 00:00:00 2001 From: kshitijk4poor <82637225+kshitijk4poor@users.noreply.github.com> Date: Fri, 19 Jun 2026 19:20:49 +0530 Subject: [PATCH] fix(compression): make in-place compaction durable + rotation-independent end-to-end MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Review (Codex + 3-agent parallel) found the first cut of in-place mode was incomplete: it only updated the system prompt, so the persisted transcript stayed 'full history + summary' and the next turn/resume reloaded the full history and immediately re-compacted (a loop), and every downstream layer that keyed off session-id rotation silently no-op'd. The session_id was doing double duty as the 'compaction happened' signal. This wires the whole path so removing rotation is actually complete: Agent (agent/conversation_compression.py): - In-place now DURABLY replaces the transcript: replace_messages(session_id, compressed) on the same row (the canonical store the gateway reloads from), not just update_system_prompt. Resume reloads the compacted set; no loop. - Reset flush identity/cursor (_last_flushed_db_idx=0, _flushed_db_message_ids cleared) so next-turn appends diff against the compacted transcript. - Expose a rotation-independent signal: agent._last_compaction_in_place, and in_place=True on the session:compress event. - Fire the compaction-boundary hooks (context-engine on_session_start, memory manager on_session_switch, reason='compression') in BOTH modes — in-place passes the same id as parent so DAG/buffer state still checkpoints. Without this, memory/context plugins miss every in-place compaction. Gateway auto-compress (gateway/run.py): - Read agent._last_compaction_in_place; set history_offset=0 on rotation OR in-place (both return the compacted set, so slicing past the pre-compaction length would drop everything). Carry compacted_in_place in the result dict. - No extra rewrite needed: the agent shares the gateway's SessionDB, so its replace_messages already updated the canonical store load_transcript reads. Manual /compress (gateway/slash_commands.py): - The throwaway /compress agent has no _session_db, so rewrite_transcript is the durable write. Previously gated behind 'if rotated:' which treated 'id unchanged' as the #44794 data-loss failure case and SKIPPED the rewrite — making /compress a silent no-op in in-place mode. Now rewrites on rotated OR in_place; the data-loss guard still fires only for the genuine no-rotation-AND-not-in-place failure. Hygiene auto-compress already writes _compressed to the same id unconditionally (its agent has no _session_db, can't rotate) — correct for in-place, no change. Tests (tests/run_agent/test_in_place_compaction.py): - Assert the DURABLE transcript IS the compacted set after reload (get_messages_as_conversation == compacted), message_count==2, flush identity reset, and the rotation-independent signal set on in-place / unset on rotation. Rotation regression guard unchanged. Verified: 64 tests green across in-place + rotation/persistence/boundary/ concurrent/failure-sync/command/cli suites; E2E both modes (durable replace, gateway offset=0, rotation preserves old transcript); ruff clean. Still default-off. --- agent/conversation_compression.py | 71 +++++++++++++++------ gateway/run.py | 18 +++++- gateway/slash_commands.py | 39 ++++++----- tests/run_agent/test_in_place_compaction.py | 54 ++++++++++++++-- 4 files changed, 141 insertions(+), 41 deletions(-) diff --git a/agent/conversation_compression.py b/agent/conversation_compression.py index a4fedaba5fc..73195be0e6f 100644 --- a/agent/conversation_compression.py +++ b/agent/conversation_compression.py @@ -535,13 +535,34 @@ def compress_context( if in_place: # ── In-place compaction: keep the same session_id ────────── # No end_session, no new row, no parent_session_id, no title - # renumber, no contextvar/env/logging re-sync. Just refresh - # the stored system prompt on the existing row. The session's - # id, title, cwd, /goal, FTS-indexed history, and gateway - # routing all stay put. See #38763. + # renumber, no contextvar/env/logging re-sync. The session's + # id, title, cwd, /goal, and gateway routing all stay put. + # + # Durable replace: the persisted transcript MUST become the + # compacted set, not "original history + summary". The flush + # above wrote any un-persisted current-turn messages onto the + # row; now atomically replace ALL rows with `compressed` so a + # resume reloads the compacted transcript (lossy by design — + # the pre-compaction turns are summarized away). Without this + # the row keeps the full history and compaction never durably + # shrinks anything (the next turn just re-compacts). See #38763. + agent._session_db.replace_messages(agent.session_id, compressed) agent._session_db.update_system_prompt( agent.session_id, new_system_prompt ) + # Reset the flush identity/cursor so the next turn's appends are + # diffed against the COMPACTED transcript, not the pre-compaction + # one. _flush_messages_to_session_db rebuilds its identity set + # when _last_flushed_db_idx == 0; the compacted dicts are passed + # as conversation_history next turn and skipped by identity, so + # only genuinely new turn messages get appended (no dup of the + # summary, no resurrection of dropped turns). + agent._last_flushed_db_idx = 0 + agent._flushed_db_message_ids = set() + # Rotation-independent signal: the conversation was compacted in + # place (id unchanged). The caller / gateway uses this instead of + # an id-change diff to re-baseline transcript handling. + compacted_in_place = True else: # ── Rotation (legacy): end this session, fork a continuation ─ # Propagate title to the new session with auto-numbering @@ -594,34 +615,37 @@ def compress_context( except Exception as e: logger.warning("Session DB compression split failed — new session will NOT be indexed: %s", e) - # Notify the context engine that the session_id rotated because of - # compression (not a fresh /new). Plugin engines (e.g. hermes-lcm) use - # boundary_reason="compression" to preserve DAG lineage across the - # rollover instead of re-initializing fresh per-session state. - # See hermes-lcm#68. Built-in ContextCompressor ignores kwargs. + # Notify the context engine that a compaction boundary occurred. Plugin + # engines (e.g. hermes-lcm) use boundary_reason="compression" to preserve + # DAG lineage / checkpoint per-session state across the boundary instead of + # re-initializing fresh. See hermes-lcm#68. Built-in ContextCompressor + # ignores kwargs. Fires in BOTH modes: rotation passes old→new ids; in-place + # passes the SAME id (the boundary is real even though the id didn't move). try: _old_sid = locals().get("old_session_id") - if _old_sid and hasattr(agent.context_compressor, "on_session_start"): + _boundary = bool(_old_sid) or in_place + if _boundary and hasattr(agent.context_compressor, "on_session_start"): agent.context_compressor.on_session_start( agent.session_id or "", boundary_reason="compression", - old_session_id=_old_sid, + old_session_id=_old_sid or agent.session_id or "", conversation_id=getattr(agent, "_gateway_session_key", None), ) except Exception as _ce_err: logger.debug("context engine on_session_start (compression): %s", _ce_err) - # Notify memory providers of the compression-driven session_id rotation - # so provider-cached per-session state (Hindsight's _document_id, - # accumulated turn buffers, counters) refreshes. reset=False because - # the logical conversation continues; only the id and DB row rolled - # over. See #6672. + # Notify memory providers of the compaction boundary so provider-cached + # per-session state (Hindsight's _document_id, accumulated turn buffers, + # counters) refreshes. reset=False because the logical conversation + # continues. See #6672. Fires in BOTH modes: in-place uses the same id as + # parent (the conversation didn't fork, but the buffer must still be told + # the transcript was compacted so it doesn't double-count dropped turns). try: _old_sid = locals().get("old_session_id") - if _old_sid and agent._memory_manager: + if (_old_sid or in_place) and agent._memory_manager: agent._memory_manager.on_session_switch( agent.session_id or "", - parent_session_id=_old_sid, + parent_session_id=_old_sid or agent.session_id or "", reset=False, reason="compression", ) @@ -638,7 +662,9 @@ def compress_context( ) # Emit session:compress event so hooks (e.g. MemPalace sync) can ingest - # the completed old session before its details are lost. + # the completed old session before its details are lost. In in-place mode + # there is no old id (same session); ``in_place=True`` tells hooks the + # transcript was compacted on the same id rather than rotated. _old_sid_for_event = locals().get("old_session_id") if getattr(agent, "event_callback", None): try: @@ -646,11 +672,18 @@ def compress_context( "platform": agent.platform or "", "session_id": agent.session_id, "old_session_id": _old_sid_for_event or "", + "in_place": in_place, "compression_count": agent.context_compressor.compression_count, }) except Exception as e: logger.debug("event_callback error on session:compress: %s", e) + # Surface the compaction mode to the caller (run_conversation / gateway) + # via a rotation-independent flag. The gateway uses this — NOT an + # id-change diff — to re-baseline transcript handling (history_offset=0 + + # rewrite on the same id) when compaction happened in place. See #38763. + agent._last_compaction_in_place = bool(locals().get("compacted_in_place", False)) + # Keep the post-compression rough estimate for diagnostics, but do not # treat it as provider-reported prompt usage. Schema-heavy rough estimates # can remain above threshold even after the next real API request fits. diff --git a/gateway/run.py b/gateway/run.py index cb777fbf4da..9c280f3dc12 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -15795,6 +15795,13 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew # below must still point the gateway at the compressed child. agent = agent_holder[0] _session_was_split = False + # In-place compaction (compression.in_place / #38763) compacts the + # transcript WITHOUT rotating the id, so the id-change diff below + # can't detect it. compress_context() sets this rotation-independent + # flag on the agent; the gateway uses it to re-baseline transcript + # handling (history_offset=0 + rewrite the JSONL transcript) the + # same way a split would, even though the session_id is unchanged. + _compacted_in_place = bool(getattr(agent, "_last_compaction_in_place", False)) if agent else False agent_session_id = getattr(agent, 'session_id', session_id) if agent else session_id if agent and session_key and agent_session_id != session_id: _session_was_split = True @@ -15843,7 +15850,14 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew ) effective_session_id = agent_session_id - _effective_history_offset = 0 if _session_was_split else len(agent_history) + # history_offset=0 whenever the agent's message list no longer has + # the original history prefix — i.e. on rotation (split) OR in-place + # compaction. In both cases the returned `messages` is the compacted + # set, so the gateway must persist all of it (offset 0), not slice + # past the pre-compaction length (which would drop everything). + _effective_history_offset = ( + 0 if (_session_was_split or _compacted_in_place) else len(agent_history) + ) if not final_response: error_msg = f"⚠️ {result['error']}" if result.get("error") else "" @@ -15860,6 +15874,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew "compression_exhausted": result.get("compression_exhausted", False), "tools": tools_holder[0] or [], "history_offset": _effective_history_offset, + "compacted_in_place": _compacted_in_place, "session_id": effective_session_id, "last_prompt_tokens": _last_prompt_toks, "input_tokens": _input_toks, @@ -15960,6 +15975,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew "interrupt_message": result_holder[0].get("interrupt_message") if result_holder[0] else None, "tools": tools_holder[0] or [], "history_offset": _effective_history_offset, + "compacted_in_place": _compacted_in_place, "last_prompt_tokens": _last_prompt_toks, "input_tokens": _input_toks, "output_tokens": _output_toks, diff --git a/gateway/slash_commands.py b/gateway/slash_commands.py index c528f82e440..dbfd778daf9 100644 --- a/gateway/slash_commands.py +++ b/gateway/slash_commands.py @@ -2627,12 +2627,14 @@ class GatewaySlashCommandsMixin: if partial and tail: compressed = rejoin_compressed_head_and_tail(compressed, tail) - # _compress_context already calls end_session() on the old session - # (preserving its full transcript in SQLite) and creates a new - # session_id for the continuation. Write the compressed messages - # into the NEW session so the original history stays searchable. + # _compress_context either rotated (legacy: ended the old + # session, created a continuation id — write compressed messages + # into the NEW session so the original stays searchable) or + # compacted in place (compression.in_place / #38763: same id, + # transcript replaced with the compacted set). new_session_id = tmp_agent.session_id rotated = new_session_id != session_entry.session_id + _in_place = bool(getattr(tmp_agent, "compression_in_place", False)) if rotated: session_entry.session_id = new_session_id self.session_store._save() @@ -2640,20 +2642,27 @@ class GatewaySlashCommandsMixin: source, session_entry, reason="compress-command", ) - # Only rewrite the transcript when rotation actually produced a - # NEW session id. If _compress_context could not rotate (e.g. - # _session_db unavailable, or the DB split raised), session_id - # is unchanged and rewrite_transcript() would DELETE the - # original messages and replace them with only the compressed - # summary — permanent data loss (#44794, #39704). In that case - # leave the original transcript intact. - if rotated: - self.session_store.rewrite_transcript(new_session_id, compressed) + # Rewrite the transcript when EITHER rotation produced a new id + # OR in-place compaction succeeded. The danger this guards + # against is the THIRD case: _compress_context could NOT rotate + # AND was not in-place (e.g. legacy mode but _session_db + # unavailable / the DB split raised) — there session_id is + # unchanged for a FAILURE reason, and rewrite_transcript() would + # DELETE the original messages and replace them with only the + # compressed summary (permanent data loss #44794, #39704). In + # in-place mode the unchanged id is SUCCESS, so the rewrite is + # exactly right (and is the durable write when the throwaway + # /compress agent has no _session_db of its own). + if rotated or _in_place: + self.session_store.rewrite_transcript( + new_session_id, compressed + ) else: logger.warning( "Manual /compress: session rotation did not occur " - "(session_id unchanged) — preserving original transcript " - "instead of overwriting it (#44794)." + "(session_id unchanged) and in-place mode is off — " + "preserving original transcript instead of overwriting " + "it (#44794)." ) # Reset stored token count — transcript changed, old value is stale self.session_store.update_session( diff --git a/tests/run_agent/test_in_place_compaction.py b/tests/run_agent/test_in_place_compaction.py index 74a71815845..586b88bdfd3 100644 --- a/tests/run_agent/test_in_place_compaction.py +++ b/tests/run_agent/test_in_place_compaction.py @@ -87,12 +87,23 @@ class TestInPlaceCompaction: row = db.get_session(sid) assert row["end_reason"] is None assert row["title"] == "my-research" - # Pre-compaction messages remain under the one id (FTS continuity). - assert row["message_count"] >= 8 - # Flush cursor must NOT be reset to 0. Rotation resets it (a fresh - # row starts empty); in-place keeps writing to the same row, so the - # cursor only ever advances as current-turn messages are persisted. - assert agent._last_flushed_db_idx != 0 + # DURABLE REPLACE (the core invariant): the persisted transcript is + # now the COMPACTED set, not "full history + summary". A resume must + # reload the compacted transcript so compaction actually shrinks the + # session and doesn't immediately re-compact (#38763). + reloaded = db.get_messages_as_conversation(sid) + assert len(reloaded) == 2 + assert [m.get("content") for m in reloaded] == [ + "[CONTEXT COMPACTION] summary of prior turns", + "recent reply", + ] + assert row["message_count"] == 2 + # Flush identity/cursor reset so next-turn appends diff against the + # compacted transcript (rebuilds the identity set on next flush). + assert agent._last_flushed_db_idx == 0 + assert agent._flushed_db_message_ids == set() + # Rotation-independent in-place signal set for the gateway. + assert agent._last_compaction_in_place is True # Transcript actually shrank. assert len(compressed) == 2 @@ -143,6 +154,37 @@ class TestRotationStillDefault: assert child[0]["title"] == "my-research #2" # Flush cursor reset for the new row. assert agent._last_flushed_db_idx == 0 + # Rotation mode does NOT set the in-place signal. + assert getattr(agent, "_last_compaction_in_place", False) is False + + +class TestInPlaceSignalForGateway: + """compress_context must expose a rotation-independent flag the gateway can + read (instead of an id-change diff) to re-baseline transcript handling.""" + + def test_signal_set_on_in_place_unset_on_rotation(self): + from hermes_state import SessionDB + from agent.conversation_compression import compress_context + + with tempfile.TemporaryDirectory() as tmp: + db = SessionDB(db_path=Path(tmp) / "t.db") + # in-place → flag True + _seed(db, "s_ip", "ip") + a_ip = _make_agent(db, "s_ip", in_place=True) + compress_context( + a_ip, [{"role": "user", "content": "x"}] * 8, + approx_tokens=100_000, system_message="sys", + ) + assert a_ip._last_compaction_in_place is True + + # rotation → flag False + _seed(db, "s_rot", "rot") + a_rot = _make_agent(db, "s_rot", in_place=False) + compress_context( + a_rot, [{"role": "user", "content": "x"}] * 8, + approx_tokens=100_000, system_message="sys", + ) + assert a_rot._last_compaction_in_place is False class TestInPlaceConfigDefault: