From 47fadc24d79c1ff21b23518c0e27aaa3146a421d Mon Sep 17 00:00:00 2001 From: kshitijk4poor <82637225+kshitijk4poor@users.noreply.github.com> Date: Fri, 19 Jun 2026 18:46:05 +0530 Subject: [PATCH] feat(compression): in-place compaction option that keeps one session id (#38763) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Context compression today rewrites the message list AND rotates the session id — it ends the session, forks a parent_session_id child, and renumbers the title (name -> name #2). That moving identity key is the root cause of a whole bug cluster: /goal lost (#33618), pending response lost at the split (#14238), orphan sessions (#33907), TUI sid desync (#36777), FTS search gaps + duplicate sidebar entries (#45117), null continuation cwd (#42228), and title-rename dead-ends (#48989). It also forced a large defensive apparatus (compression lock, contextvar/env/ logging triple-sync, orphan finalization, gateway SessionEntry re-propagation, tip projection) whose only job is surviving a mid-conversation id change. Add a compression.in_place config flag (default False during rollout). When True, compaction rewrites the transcript and rebuilds the system prompt but keeps the SAME session_id: no end_session, no child row, no title renumber, no contextvar/logging re-sync, no memory/context-engine session-switch. The conversation keeps one durable id for life, like Claude Code / Codex. Compaction is lossy by design — the pre-compaction transcript is summarized away, not archived. The rotation path is unchanged when the flag is off (moved verbatim into an else branch). Staged rollout: this PR ships the option behind a default-off flag for live validation; a follow-up flips the default and deletes the now-redundant rotation machinery, superseding the 14 open band-aid PRs in this area. - hermes_cli/config.py: add compression.in_place (default False), documented - agent/agent_init.py: resolve the flag -> agent.compression_in_place - agent/conversation_compression.py: branch compress_context() on the flag - tests/run_agent/test_in_place_compaction.py: in-place invariants + rotation regression guard + config default The pre-flush of current-turn messages (#47202) runs in BOTH modes, so no boundary data loss. Prompt-cache invariant preserved: the system-prompt rebuild is the same single sanctioned invalidation that already happens during compaction — no NEW invalidation. Message alternation preserved. --- agent/agent_init.py | 9 ++ agent/conversation_compression.py | 128 ++++++++++------- hermes_cli/config.py | 13 ++ tests/run_agent/test_in_place_compaction.py | 152 ++++++++++++++++++++ 4 files changed, 250 insertions(+), 52 deletions(-) create mode 100644 tests/run_agent/test_in_place_compaction.py diff --git a/agent/agent_init.py b/agent/agent_init.py index 2d443241367..f40288abcff 100644 --- a/agent/agent_init.py +++ b/agent/agent_init.py @@ -1339,6 +1339,14 @@ def init_agent( compression_abort_on_summary_failure = str( _compression_cfg.get("abort_on_summary_failure", False) ).lower() in {"true", "1", "yes"} + # In-place compaction: when True, compress_context() rewrites the message + # list + rebuilds the system prompt WITHOUT rotating the session id (no + # parent_session_id chain, no `name #N` renumber). See #38763 and + # agent/conversation_compression.py. Consumed by compress_context(), not the + # compressor, so it rides on the agent. + compression_in_place = str( + _compression_cfg.get("in_place", False) + ).lower() in {"true", "1", "yes"} # Read optional explicit context_length override for the auxiliary # compression model. Custom endpoints often cannot report this via @@ -1558,6 +1566,7 @@ def init_agent( abort_on_summary_failure=compression_abort_on_summary_failure, ) agent.compression_enabled = compression_enabled + agent.compression_in_place = compression_in_place # Reject models whose context window is below the minimum required # for reliable tool-calling workflows (64K tokens). diff --git a/agent/conversation_compression.py b/agent/conversation_compression.py index 89bb4ceb55a..a4fedaba5fc 100644 --- a/agent/conversation_compression.py +++ b/agent/conversation_compression.py @@ -328,6 +328,13 @@ def compress_context( agent._compression_feasibility_checked = True _pre_msg_count = len(messages) + # In-place compaction (config: compression.in_place, see #38763). When True, + # this compaction rewrites the message list + rebuilds the system prompt but + # keeps the SAME session_id — no end_session, no parent_session_id child, no + # `name #N` renumber, no contextvar/env/logging re-sync, no memory/context- + # engine session-switch. The conversation keeps one durable id for life, + # eliminating the session-rotation bug cluster. Default False during rollout. + in_place = bool(getattr(agent, "compression_in_place", False)) logger.info( "context compression started: session=%s messages=%d tokens=~%s model=%s focus=%r", agent.session_id or "none", _pre_msg_count, @@ -508,65 +515,82 @@ def compress_context( if agent._session_db: try: - # Propagate title to the new session with auto-numbering - old_title = agent._session_db.get_session_title(agent.session_id) - # Trigger memory extraction on the old session before it rotates. + # Trigger memory extraction on the current session before the + # transcript is rewritten (runs in BOTH modes — the logical + # conversation's pre-compaction turns are about to be summarized + # away regardless of whether the id rotates). agent.commit_memory_session(messages) - # Flush any un-persisted messages from the current turn to the - # old session *before* rotating. compress_context() can be - # called mid-turn (auto-compress when context exceeds threshold) - # at a point when _flush_messages_to_session_db() has not yet - # run. Without this, messages generated during the current turn - # are silently lost on session rotation (#47202). + # Flush any un-persisted messages from the current turn *before* + # the rewrite. compress_context() can be called mid-turn + # (auto-compress when context exceeds threshold) at a point when + # _flush_messages_to_session_db() has not yet run. Without this, + # messages generated during the current turn are silently lost + # (#47202). In-place mode flushes to the SAME session; rotation + # mode flushes to the old session before ending it. try: agent._flush_messages_to_session_db(messages) except Exception: pass # best-effort — don't block compression on a flush error - agent._session_db.end_session(agent.session_id, "compression") - old_session_id = agent.session_id - agent.session_id = f"{datetime.now().strftime('%Y%m%d_%H%M%S')}_{uuid.uuid4().hex[:6]}" - # Ordering contract: the agent thread updates the contextvar here; - # the gateway propagates to SessionEntry after run_in_executor returns. - try: - from gateway.session_context import set_current_session_id - set_current_session_id(agent.session_id) - except Exception: - os.environ["HERMES_SESSION_ID"] = agent.session_id - # The gateway/tools session context (ContextVar + env) and the - # logging session context are SEPARATE mechanisms. The call above - # moves the former; the ``[session_id]`` tag on log lines comes - # from ``hermes_logging._session_context`` (set once per turn in - # conversation_loop.py). Without this, post-rotation log lines in - # the same turn keep the STALE old id while the message/DB/gateway - # state carry the new one — breaking log correlation exactly at the - # compaction boundary (see #34089). Guarded separately so a logging - # failure can never regress the routing update above. - try: - from hermes_logging import set_session_context - - set_session_context(agent.session_id) - except Exception: - pass - agent._session_db_created = False - agent._session_db.create_session( - session_id=agent.session_id, - source=agent.platform or os.environ.get("HERMES_SESSION_SOURCE", "cli"), - model=agent.model, - model_config=agent._session_init_model_config, - parent_session_id=old_session_id, - ) - agent._session_db_created = True - # Auto-number the title for the continuation session - if old_title: + if in_place: + # ── In-place compaction: keep the same session_id ────────── + # No end_session, no new row, no parent_session_id, no title + # renumber, no contextvar/env/logging re-sync. Just refresh + # the stored system prompt on the existing row. The session's + # id, title, cwd, /goal, FTS-indexed history, and gateway + # routing all stay put. See #38763. + agent._session_db.update_system_prompt( + agent.session_id, new_system_prompt + ) + else: + # ── Rotation (legacy): end this session, fork a continuation ─ + # Propagate title to the new session with auto-numbering + old_title = agent._session_db.get_session_title(agent.session_id) + agent._session_db.end_session(agent.session_id, "compression") + old_session_id = agent.session_id + agent.session_id = f"{datetime.now().strftime('%Y%m%d_%H%M%S')}_{uuid.uuid4().hex[:6]}" + # Ordering contract: the agent thread updates the contextvar here; + # the gateway propagates to SessionEntry after run_in_executor returns. try: - new_title = agent._session_db.get_next_title_in_lineage(old_title) - agent._session_db.set_session_title(agent.session_id, new_title) - except (ValueError, Exception) as e: - logger.debug("Could not propagate title on compression: %s", e) - agent._session_db.update_system_prompt(agent.session_id, new_system_prompt) - # Reset flush cursor — new session starts with no messages written - agent._last_flushed_db_idx = 0 + from gateway.session_context import set_current_session_id + + set_current_session_id(agent.session_id) + except Exception: + os.environ["HERMES_SESSION_ID"] = agent.session_id + # The gateway/tools session context (ContextVar + env) and the + # logging session context are SEPARATE mechanisms. The call above + # moves the former; the ``[session_id]`` tag on log lines comes + # from ``hermes_logging._session_context`` (set once per turn in + # conversation_loop.py). Without this, post-rotation log lines in + # the same turn keep the STALE old id while the message/DB/gateway + # state carry the new one — breaking log correlation exactly at the + # compaction boundary (see #34089). Guarded separately so a logging + # failure can never regress the routing update above. + try: + from hermes_logging import set_session_context + + set_session_context(agent.session_id) + except Exception: + pass + agent._session_db_created = False + agent._session_db.create_session( + session_id=agent.session_id, + source=agent.platform or os.environ.get("HERMES_SESSION_SOURCE", "cli"), + model=agent.model, + model_config=agent._session_init_model_config, + parent_session_id=old_session_id, + ) + agent._session_db_created = True + # Auto-number the title for the continuation session + if old_title: + try: + new_title = agent._session_db.get_next_title_in_lineage(old_title) + agent._session_db.set_session_title(agent.session_id, new_title) + except (ValueError, Exception) as e: + logger.debug("Could not propagate title on compression: %s", e) + agent._session_db.update_system_prompt(agent.session_id, new_system_prompt) + # Reset flush cursor — new session starts with no messages written + agent._last_flushed_db_idx = 0 except Exception as e: logger.warning("Session DB compression split failed — new session will NOT be indexed: %s", e) diff --git a/hermes_cli/config.py b/hermes_cli/config.py index ea87623d8fb..ba654a21e74 100644 --- a/hermes_cli/config.py +++ b/hermes_cli/config.py @@ -1287,6 +1287,19 @@ DEFAULT_CONFIG = { # exact route is affected — gpt-5.5 on OpenAI's # direct API, OpenRouter, and Copilot keep the # global threshold regardless. + "in_place": False, # When True, compaction rewrites the message + # list and rebuilds the system prompt WITHOUT + # rotating the session id — the conversation + # keeps one durable id for its whole life + # (no parent_session_id chain, no `name #N` + # renumbering). Eliminates the session-rotation + # bug cluster (#33618 /goal loss, #14238 lost + # response, #33907 orphans, #45117 search gaps, + # #42228 null cwd) — see #38763. Compaction is + # lossy: the pre-compaction transcript is + # discarded, matching Claude Code / Codex. + # Default False during rollout; will flip on + # after live validation. }, # Kanban subsystem (orchestrator workers + dispatcher-driven child tasks). diff --git a/tests/run_agent/test_in_place_compaction.py b/tests/run_agent/test_in_place_compaction.py new file mode 100644 index 00000000000..74a71815845 --- /dev/null +++ b/tests/run_agent/test_in_place_compaction.py @@ -0,0 +1,152 @@ +"""Tests for in-place context compaction (config: compression.in_place, #38763). + +When ``compression.in_place`` is True, ``compress_context()`` rewrites the +message list and rebuilds the system prompt but keeps the SAME ``session_id``: +no ``end_session``, no ``parent_session_id`` child row, no ``name #N`` title +renumber, no flush-cursor reset. This eliminates the session-rotation bug +cluster (#33618 /goal loss, #14238 lost response, #33907 orphans, #45117 search +gaps, #42228 null cwd). When the flag is False (default), rotation behaves +exactly as before. +""" + +import os +import tempfile +from pathlib import Path +from unittest.mock import patch + +import pytest + + +def _make_agent(session_db, session_id, *, in_place): + with patch.dict(os.environ, {"OPENROUTER_API_KEY": "test-key"}): + from run_agent import AIAgent + + agent = AIAgent( + api_key="test-key", + base_url="https://openrouter.ai/api/v1", + model="test/model", + quiet_mode=True, + session_db=session_db, + session_id=session_id, + skip_context_files=True, + skip_memory=True, + ) + agent.compression_in_place = in_place + # Mock the compressor to return a deterministic shrunk transcript so the + # test exercises the DB-mutation path, not summarization quality. + def _fake_compress(messages, current_tokens=None, focus_topic=None, force=False): + return [ + {"role": "user", "content": "[CONTEXT COMPACTION] summary of prior turns"}, + {"role": "assistant", "content": "recent reply"}, + ] + + agent.context_compressor.compress = _fake_compress + agent.context_compressor._last_compress_aborted = False + agent.context_compressor._last_summary_error = None + agent.context_compressor.compression_count = 1 + return agent + + +def _seed(db, sid, title, n=8): + db.create_session(sid, "cli", model="test/model") + db.set_session_title(sid, title) + for i in range(n): + db.append_message( + session_id=sid, + role="user" if i % 2 == 0 else "assistant", + content=f"msg {i}", + ) + + +class TestInPlaceCompaction: + def test_in_place_keeps_same_session_id(self): + """In-place mode: id unchanged, no child row, no rename, history kept.""" + from hermes_state import SessionDB + from agent.conversation_compression import compress_context + + with tempfile.TemporaryDirectory() as tmp: + db = SessionDB(db_path=Path(tmp) / "t.db") + sid = "20260619_120000_aaaaaa" + _seed(db, sid, "my-research") + agent = _make_agent(db, sid, in_place=True) + agent._last_flushed_db_idx = 5 + + messages = [{"role": "user", "content": f"m{i}"} for i in range(8)] + compressed, _sp = compress_context( + agent, messages, approx_tokens=100_000, system_message="sys" + ) + + # Identity never moved. + assert agent.session_id == sid + # No continuation row forked. + child = db._conn.execute( + "SELECT id FROM sessions WHERE parent_session_id = ?", (sid,) + ).fetchall() + assert child == [] + # Session not ended; title untouched (no "#2"). + row = db.get_session(sid) + assert row["end_reason"] is None + assert row["title"] == "my-research" + # Pre-compaction messages remain under the one id (FTS continuity). + assert row["message_count"] >= 8 + # Flush cursor must NOT be reset to 0. Rotation resets it (a fresh + # row starts empty); in-place keeps writing to the same row, so the + # cursor only ever advances as current-turn messages are persisted. + assert agent._last_flushed_db_idx != 0 + # Transcript actually shrank. + assert len(compressed) == 2 + + def test_in_place_alternation_preserved(self): + """The compacted list must not introduce consecutive same-role messages.""" + from hermes_state import SessionDB + from agent.conversation_compression import compress_context + + with tempfile.TemporaryDirectory() as tmp: + db = SessionDB(db_path=Path(tmp) / "t.db") + sid = "20260619_120500_cccccc" + _seed(db, sid, "alt") + agent = _make_agent(db, sid, in_place=True) + messages = [{"role": "user", "content": f"m{i}"} for i in range(8)] + compressed, _ = compress_context( + agent, messages, approx_tokens=100_000, system_message="sys" + ) + roles = [m["role"] for m in compressed if m.get("role") != "system"] + assert all(roles[i] != roles[i + 1] for i in range(len(roles) - 1)) + + +class TestRotationStillDefault: + def test_rotation_when_flag_off(self): + """Regression guard: flag off => legacy rotation is unchanged.""" + from hermes_state import SessionDB + from agent.conversation_compression import compress_context + + with tempfile.TemporaryDirectory() as tmp: + db = SessionDB(db_path=Path(tmp) / "t.db") + sid = "20260619_130000_bbbbbb" + _seed(db, sid, "my-research") + agent = _make_agent(db, sid, in_place=False) + agent._last_flushed_db_idx = 5 + + messages = [{"role": "user", "content": f"m{i}"} for i in range(8)] + compress_context( + agent, messages, approx_tokens=100_000, system_message="sys" + ) + + # Identity rotated to a fresh id. + assert agent.session_id != sid + # Old session ended via compression; continuation forked + renamed. + assert db.get_session(sid)["end_reason"] == "compression" + child = db._conn.execute( + "SELECT id, title FROM sessions WHERE parent_session_id = ?", (sid,) + ).fetchall() + assert len(child) == 1 + assert child[0]["title"] == "my-research #2" + # Flush cursor reset for the new row. + assert agent._last_flushed_db_idx == 0 + + +class TestInPlaceConfigDefault: + def test_flag_defaults_off(self): + from hermes_cli.config import DEFAULT_CONFIG + + assert DEFAULT_CONFIG["compression"].get("in_place") is False