mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-21 10:22:18 +00:00
feat(compression): in-place compaction option that keeps one session id (#38763)
Context compression today rewrites the message list AND rotates the session id — it ends the session, forks a parent_session_id child, and renumbers the title (name -> name #2). That moving identity key is the root cause of a whole bug cluster: /goal lost (#33618), pending response lost at the split (#14238), orphan sessions (#33907), TUI sid desync (#36777), FTS search gaps + duplicate sidebar entries (#45117), null continuation cwd (#42228), and title-rename dead-ends (#48989). It also forced a large defensive apparatus (compression lock, contextvar/env/ logging triple-sync, orphan finalization, gateway SessionEntry re-propagation, tip projection) whose only job is surviving a mid-conversation id change. Add a compression.in_place config flag (default False during rollout). When True, compaction rewrites the transcript and rebuilds the system prompt but keeps the SAME session_id: no end_session, no child row, no title renumber, no contextvar/logging re-sync, no memory/context-engine session-switch. The conversation keeps one durable id for life, like Claude Code / Codex. Compaction is lossy by design — the pre-compaction transcript is summarized away, not archived. The rotation path is unchanged when the flag is off (moved verbatim into an else branch). Staged rollout: this PR ships the option behind a default-off flag for live validation; a follow-up flips the default and deletes the now-redundant rotation machinery, superseding the 14 open band-aid PRs in this area. - hermes_cli/config.py: add compression.in_place (default False), documented - agent/agent_init.py: resolve the flag -> agent.compression_in_place - agent/conversation_compression.py: branch compress_context() on the flag - tests/run_agent/test_in_place_compaction.py: in-place invariants + rotation regression guard + config default The pre-flush of current-turn messages (#47202) runs in BOTH modes, so no boundary data loss. Prompt-cache invariant preserved: the system-prompt rebuild is the same single sanctioned invalidation that already happens during compaction — no NEW invalidation. Message alternation preserved.
This commit is contained in:
parent
37a4dd4982
commit
47fadc24d7
4 changed files with 250 additions and 52 deletions
|
|
@ -1339,6 +1339,14 @@ def init_agent(
|
|||
compression_abort_on_summary_failure = str(
|
||||
_compression_cfg.get("abort_on_summary_failure", False)
|
||||
).lower() in {"true", "1", "yes"}
|
||||
# In-place compaction: when True, compress_context() rewrites the message
|
||||
# list + rebuilds the system prompt WITHOUT rotating the session id (no
|
||||
# parent_session_id chain, no `name #N` renumber). See #38763 and
|
||||
# agent/conversation_compression.py. Consumed by compress_context(), not the
|
||||
# compressor, so it rides on the agent.
|
||||
compression_in_place = str(
|
||||
_compression_cfg.get("in_place", False)
|
||||
).lower() in {"true", "1", "yes"}
|
||||
|
||||
# Read optional explicit context_length override for the auxiliary
|
||||
# compression model. Custom endpoints often cannot report this via
|
||||
|
|
@ -1558,6 +1566,7 @@ def init_agent(
|
|||
abort_on_summary_failure=compression_abort_on_summary_failure,
|
||||
)
|
||||
agent.compression_enabled = compression_enabled
|
||||
agent.compression_in_place = compression_in_place
|
||||
|
||||
# Reject models whose context window is below the minimum required
|
||||
# for reliable tool-calling workflows (64K tokens).
|
||||
|
|
|
|||
|
|
@ -328,6 +328,13 @@ def compress_context(
|
|||
agent._compression_feasibility_checked = True
|
||||
|
||||
_pre_msg_count = len(messages)
|
||||
# In-place compaction (config: compression.in_place, see #38763). When True,
|
||||
# this compaction rewrites the message list + rebuilds the system prompt but
|
||||
# keeps the SAME session_id — no end_session, no parent_session_id child, no
|
||||
# `name #N` renumber, no contextvar/env/logging re-sync, no memory/context-
|
||||
# engine session-switch. The conversation keeps one durable id for life,
|
||||
# eliminating the session-rotation bug cluster. Default False during rollout.
|
||||
in_place = bool(getattr(agent, "compression_in_place", False))
|
||||
logger.info(
|
||||
"context compression started: session=%s messages=%d tokens=~%s model=%s focus=%r",
|
||||
agent.session_id or "none", _pre_msg_count,
|
||||
|
|
@ -508,65 +515,82 @@ def compress_context(
|
|||
|
||||
if agent._session_db:
|
||||
try:
|
||||
# Propagate title to the new session with auto-numbering
|
||||
old_title = agent._session_db.get_session_title(agent.session_id)
|
||||
# Trigger memory extraction on the old session before it rotates.
|
||||
# Trigger memory extraction on the current session before the
|
||||
# transcript is rewritten (runs in BOTH modes — the logical
|
||||
# conversation's pre-compaction turns are about to be summarized
|
||||
# away regardless of whether the id rotates).
|
||||
agent.commit_memory_session(messages)
|
||||
# Flush any un-persisted messages from the current turn to the
|
||||
# old session *before* rotating. compress_context() can be
|
||||
# called mid-turn (auto-compress when context exceeds threshold)
|
||||
# at a point when _flush_messages_to_session_db() has not yet
|
||||
# run. Without this, messages generated during the current turn
|
||||
# are silently lost on session rotation (#47202).
|
||||
# Flush any un-persisted messages from the current turn *before*
|
||||
# the rewrite. compress_context() can be called mid-turn
|
||||
# (auto-compress when context exceeds threshold) at a point when
|
||||
# _flush_messages_to_session_db() has not yet run. Without this,
|
||||
# messages generated during the current turn are silently lost
|
||||
# (#47202). In-place mode flushes to the SAME session; rotation
|
||||
# mode flushes to the old session before ending it.
|
||||
try:
|
||||
agent._flush_messages_to_session_db(messages)
|
||||
except Exception:
|
||||
pass # best-effort — don't block compression on a flush error
|
||||
agent._session_db.end_session(agent.session_id, "compression")
|
||||
old_session_id = agent.session_id
|
||||
agent.session_id = f"{datetime.now().strftime('%Y%m%d_%H%M%S')}_{uuid.uuid4().hex[:6]}"
|
||||
# Ordering contract: the agent thread updates the contextvar here;
|
||||
# the gateway propagates to SessionEntry after run_in_executor returns.
|
||||
try:
|
||||
from gateway.session_context import set_current_session_id
|
||||
|
||||
set_current_session_id(agent.session_id)
|
||||
except Exception:
|
||||
os.environ["HERMES_SESSION_ID"] = agent.session_id
|
||||
# The gateway/tools session context (ContextVar + env) and the
|
||||
# logging session context are SEPARATE mechanisms. The call above
|
||||
# moves the former; the ``[session_id]`` tag on log lines comes
|
||||
# from ``hermes_logging._session_context`` (set once per turn in
|
||||
# conversation_loop.py). Without this, post-rotation log lines in
|
||||
# the same turn keep the STALE old id while the message/DB/gateway
|
||||
# state carry the new one — breaking log correlation exactly at the
|
||||
# compaction boundary (see #34089). Guarded separately so a logging
|
||||
# failure can never regress the routing update above.
|
||||
try:
|
||||
from hermes_logging import set_session_context
|
||||
|
||||
set_session_context(agent.session_id)
|
||||
except Exception:
|
||||
pass
|
||||
agent._session_db_created = False
|
||||
agent._session_db.create_session(
|
||||
session_id=agent.session_id,
|
||||
source=agent.platform or os.environ.get("HERMES_SESSION_SOURCE", "cli"),
|
||||
model=agent.model,
|
||||
model_config=agent._session_init_model_config,
|
||||
parent_session_id=old_session_id,
|
||||
)
|
||||
agent._session_db_created = True
|
||||
# Auto-number the title for the continuation session
|
||||
if old_title:
|
||||
if in_place:
|
||||
# ── In-place compaction: keep the same session_id ──────────
|
||||
# No end_session, no new row, no parent_session_id, no title
|
||||
# renumber, no contextvar/env/logging re-sync. Just refresh
|
||||
# the stored system prompt on the existing row. The session's
|
||||
# id, title, cwd, /goal, FTS-indexed history, and gateway
|
||||
# routing all stay put. See #38763.
|
||||
agent._session_db.update_system_prompt(
|
||||
agent.session_id, new_system_prompt
|
||||
)
|
||||
else:
|
||||
# ── Rotation (legacy): end this session, fork a continuation ─
|
||||
# Propagate title to the new session with auto-numbering
|
||||
old_title = agent._session_db.get_session_title(agent.session_id)
|
||||
agent._session_db.end_session(agent.session_id, "compression")
|
||||
old_session_id = agent.session_id
|
||||
agent.session_id = f"{datetime.now().strftime('%Y%m%d_%H%M%S')}_{uuid.uuid4().hex[:6]}"
|
||||
# Ordering contract: the agent thread updates the contextvar here;
|
||||
# the gateway propagates to SessionEntry after run_in_executor returns.
|
||||
try:
|
||||
new_title = agent._session_db.get_next_title_in_lineage(old_title)
|
||||
agent._session_db.set_session_title(agent.session_id, new_title)
|
||||
except (ValueError, Exception) as e:
|
||||
logger.debug("Could not propagate title on compression: %s", e)
|
||||
agent._session_db.update_system_prompt(agent.session_id, new_system_prompt)
|
||||
# Reset flush cursor — new session starts with no messages written
|
||||
agent._last_flushed_db_idx = 0
|
||||
from gateway.session_context import set_current_session_id
|
||||
|
||||
set_current_session_id(agent.session_id)
|
||||
except Exception:
|
||||
os.environ["HERMES_SESSION_ID"] = agent.session_id
|
||||
# The gateway/tools session context (ContextVar + env) and the
|
||||
# logging session context are SEPARATE mechanisms. The call above
|
||||
# moves the former; the ``[session_id]`` tag on log lines comes
|
||||
# from ``hermes_logging._session_context`` (set once per turn in
|
||||
# conversation_loop.py). Without this, post-rotation log lines in
|
||||
# the same turn keep the STALE old id while the message/DB/gateway
|
||||
# state carry the new one — breaking log correlation exactly at the
|
||||
# compaction boundary (see #34089). Guarded separately so a logging
|
||||
# failure can never regress the routing update above.
|
||||
try:
|
||||
from hermes_logging import set_session_context
|
||||
|
||||
set_session_context(agent.session_id)
|
||||
except Exception:
|
||||
pass
|
||||
agent._session_db_created = False
|
||||
agent._session_db.create_session(
|
||||
session_id=agent.session_id,
|
||||
source=agent.platform or os.environ.get("HERMES_SESSION_SOURCE", "cli"),
|
||||
model=agent.model,
|
||||
model_config=agent._session_init_model_config,
|
||||
parent_session_id=old_session_id,
|
||||
)
|
||||
agent._session_db_created = True
|
||||
# Auto-number the title for the continuation session
|
||||
if old_title:
|
||||
try:
|
||||
new_title = agent._session_db.get_next_title_in_lineage(old_title)
|
||||
agent._session_db.set_session_title(agent.session_id, new_title)
|
||||
except (ValueError, Exception) as e:
|
||||
logger.debug("Could not propagate title on compression: %s", e)
|
||||
agent._session_db.update_system_prompt(agent.session_id, new_system_prompt)
|
||||
# Reset flush cursor — new session starts with no messages written
|
||||
agent._last_flushed_db_idx = 0
|
||||
except Exception as e:
|
||||
logger.warning("Session DB compression split failed — new session will NOT be indexed: %s", e)
|
||||
|
||||
|
|
|
|||
|
|
@ -1287,6 +1287,19 @@ DEFAULT_CONFIG = {
|
|||
# exact route is affected — gpt-5.5 on OpenAI's
|
||||
# direct API, OpenRouter, and Copilot keep the
|
||||
# global threshold regardless.
|
||||
"in_place": False, # When True, compaction rewrites the message
|
||||
# list and rebuilds the system prompt WITHOUT
|
||||
# rotating the session id — the conversation
|
||||
# keeps one durable id for its whole life
|
||||
# (no parent_session_id chain, no `name #N`
|
||||
# renumbering). Eliminates the session-rotation
|
||||
# bug cluster (#33618 /goal loss, #14238 lost
|
||||
# response, #33907 orphans, #45117 search gaps,
|
||||
# #42228 null cwd) — see #38763. Compaction is
|
||||
# lossy: the pre-compaction transcript is
|
||||
# discarded, matching Claude Code / Codex.
|
||||
# Default False during rollout; will flip on
|
||||
# after live validation.
|
||||
},
|
||||
|
||||
# Kanban subsystem (orchestrator workers + dispatcher-driven child tasks).
|
||||
|
|
|
|||
152
tests/run_agent/test_in_place_compaction.py
Normal file
152
tests/run_agent/test_in_place_compaction.py
Normal file
|
|
@ -0,0 +1,152 @@
|
|||
"""Tests for in-place context compaction (config: compression.in_place, #38763).
|
||||
|
||||
When ``compression.in_place`` is True, ``compress_context()`` rewrites the
|
||||
message list and rebuilds the system prompt but keeps the SAME ``session_id``:
|
||||
no ``end_session``, no ``parent_session_id`` child row, no ``name #N`` title
|
||||
renumber, no flush-cursor reset. This eliminates the session-rotation bug
|
||||
cluster (#33618 /goal loss, #14238 lost response, #33907 orphans, #45117 search
|
||||
gaps, #42228 null cwd). When the flag is False (default), rotation behaves
|
||||
exactly as before.
|
||||
"""
|
||||
|
||||
import os
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
def _make_agent(session_db, session_id, *, in_place):
|
||||
with patch.dict(os.environ, {"OPENROUTER_API_KEY": "test-key"}):
|
||||
from run_agent import AIAgent
|
||||
|
||||
agent = AIAgent(
|
||||
api_key="test-key",
|
||||
base_url="https://openrouter.ai/api/v1",
|
||||
model="test/model",
|
||||
quiet_mode=True,
|
||||
session_db=session_db,
|
||||
session_id=session_id,
|
||||
skip_context_files=True,
|
||||
skip_memory=True,
|
||||
)
|
||||
agent.compression_in_place = in_place
|
||||
# Mock the compressor to return a deterministic shrunk transcript so the
|
||||
# test exercises the DB-mutation path, not summarization quality.
|
||||
def _fake_compress(messages, current_tokens=None, focus_topic=None, force=False):
|
||||
return [
|
||||
{"role": "user", "content": "[CONTEXT COMPACTION] summary of prior turns"},
|
||||
{"role": "assistant", "content": "recent reply"},
|
||||
]
|
||||
|
||||
agent.context_compressor.compress = _fake_compress
|
||||
agent.context_compressor._last_compress_aborted = False
|
||||
agent.context_compressor._last_summary_error = None
|
||||
agent.context_compressor.compression_count = 1
|
||||
return agent
|
||||
|
||||
|
||||
def _seed(db, sid, title, n=8):
|
||||
db.create_session(sid, "cli", model="test/model")
|
||||
db.set_session_title(sid, title)
|
||||
for i in range(n):
|
||||
db.append_message(
|
||||
session_id=sid,
|
||||
role="user" if i % 2 == 0 else "assistant",
|
||||
content=f"msg {i}",
|
||||
)
|
||||
|
||||
|
||||
class TestInPlaceCompaction:
|
||||
def test_in_place_keeps_same_session_id(self):
|
||||
"""In-place mode: id unchanged, no child row, no rename, history kept."""
|
||||
from hermes_state import SessionDB
|
||||
from agent.conversation_compression import compress_context
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
db = SessionDB(db_path=Path(tmp) / "t.db")
|
||||
sid = "20260619_120000_aaaaaa"
|
||||
_seed(db, sid, "my-research")
|
||||
agent = _make_agent(db, sid, in_place=True)
|
||||
agent._last_flushed_db_idx = 5
|
||||
|
||||
messages = [{"role": "user", "content": f"m{i}"} for i in range(8)]
|
||||
compressed, _sp = compress_context(
|
||||
agent, messages, approx_tokens=100_000, system_message="sys"
|
||||
)
|
||||
|
||||
# Identity never moved.
|
||||
assert agent.session_id == sid
|
||||
# No continuation row forked.
|
||||
child = db._conn.execute(
|
||||
"SELECT id FROM sessions WHERE parent_session_id = ?", (sid,)
|
||||
).fetchall()
|
||||
assert child == []
|
||||
# Session not ended; title untouched (no "#2").
|
||||
row = db.get_session(sid)
|
||||
assert row["end_reason"] is None
|
||||
assert row["title"] == "my-research"
|
||||
# Pre-compaction messages remain under the one id (FTS continuity).
|
||||
assert row["message_count"] >= 8
|
||||
# Flush cursor must NOT be reset to 0. Rotation resets it (a fresh
|
||||
# row starts empty); in-place keeps writing to the same row, so the
|
||||
# cursor only ever advances as current-turn messages are persisted.
|
||||
assert agent._last_flushed_db_idx != 0
|
||||
# Transcript actually shrank.
|
||||
assert len(compressed) == 2
|
||||
|
||||
def test_in_place_alternation_preserved(self):
|
||||
"""The compacted list must not introduce consecutive same-role messages."""
|
||||
from hermes_state import SessionDB
|
||||
from agent.conversation_compression import compress_context
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
db = SessionDB(db_path=Path(tmp) / "t.db")
|
||||
sid = "20260619_120500_cccccc"
|
||||
_seed(db, sid, "alt")
|
||||
agent = _make_agent(db, sid, in_place=True)
|
||||
messages = [{"role": "user", "content": f"m{i}"} for i in range(8)]
|
||||
compressed, _ = compress_context(
|
||||
agent, messages, approx_tokens=100_000, system_message="sys"
|
||||
)
|
||||
roles = [m["role"] for m in compressed if m.get("role") != "system"]
|
||||
assert all(roles[i] != roles[i + 1] for i in range(len(roles) - 1))
|
||||
|
||||
|
||||
class TestRotationStillDefault:
|
||||
def test_rotation_when_flag_off(self):
|
||||
"""Regression guard: flag off => legacy rotation is unchanged."""
|
||||
from hermes_state import SessionDB
|
||||
from agent.conversation_compression import compress_context
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
db = SessionDB(db_path=Path(tmp) / "t.db")
|
||||
sid = "20260619_130000_bbbbbb"
|
||||
_seed(db, sid, "my-research")
|
||||
agent = _make_agent(db, sid, in_place=False)
|
||||
agent._last_flushed_db_idx = 5
|
||||
|
||||
messages = [{"role": "user", "content": f"m{i}"} for i in range(8)]
|
||||
compress_context(
|
||||
agent, messages, approx_tokens=100_000, system_message="sys"
|
||||
)
|
||||
|
||||
# Identity rotated to a fresh id.
|
||||
assert agent.session_id != sid
|
||||
# Old session ended via compression; continuation forked + renamed.
|
||||
assert db.get_session(sid)["end_reason"] == "compression"
|
||||
child = db._conn.execute(
|
||||
"SELECT id, title FROM sessions WHERE parent_session_id = ?", (sid,)
|
||||
).fetchall()
|
||||
assert len(child) == 1
|
||||
assert child[0]["title"] == "my-research #2"
|
||||
# Flush cursor reset for the new row.
|
||||
assert agent._last_flushed_db_idx == 0
|
||||
|
||||
|
||||
class TestInPlaceConfigDefault:
|
||||
def test_flag_defaults_off(self):
|
||||
from hermes_cli.config import DEFAULT_CONFIG
|
||||
|
||||
assert DEFAULT_CONFIG["compression"].get("in_place") is False
|
||||
Loading…
Add table
Add a link
Reference in a new issue