refactor(compression): tidy in-place compaction path (simplify pass)

Parallel 3-reviewer cleanup of the in-place compaction code. Findings applied:

- perf: in-place mode no longer pre-flushes current-turn messages. The flush
  ran INSERTs that the immediately-following replace_messages(compressed)
  DELETE+reinsert discarded -- pure wasted writes per compaction. The
  current-turn tail survives via the compressor's compressed output
  (protect_last_n), not the flush. Verified no data loss; rotation still
  pre-flushes (its old session row is preserved, so the flush is real there).
- quality: hoist the two shared post-write steps (update_system_prompt +
  _last_flushed_db_idx = 0) below the if/else -- they ran in both branches
  against agent.session_id. Removes the easiest divergence bug.
- quality: compute the compaction-boundary locals (_old_sid, _is_boundary,
  _boundary_parent) ONCE instead of recomputing locals().get('old_session_id')
  and the "_old_sid or agent.session_id or ''" chain three times.
- quality: initialize compacted_in_place up front and assign
  agent._last_compaction_in_place directly, dropping the fragile
  locals().get('compacted_in_place') reflection.
- reuse: parse the in_place config flag with utils.is_truthy_value (the
  project's canonical truthy coerce) instead of a hand-rolled
  str().lower() in {...} (agent_init already imports from utils).

Dropped as false positives / out of scope: gateway getattr of agent internals
(established session_id pattern), dual result-dict carry (mirrors history_offset
etc.), stringly-typed "compression" (codebase-wide convention, no constant).

Behavior-preserving: 7 in-place tests (incl. 2 new flush-guard tests) + 26
rotation/boundary/persistence/command tests green; mutation check confirms the
durable-replace guard still binds (removing replace_messages fails the test);
ruff clean. Added test_in_place_skips_redundant_preflush /
test_rotation_still_preflushes to guard the perf change.
This commit is contained in:
kshitijk4poor 2026-06-19 19:29:26 +05:30 committed by Teknium
parent 1fbf48d4ad
commit 4f9485a95d
3 changed files with 93 additions and 47 deletions

View file

@ -50,7 +50,7 @@ from agent.tool_guardrails import (
from hermes_cli.config import cfg_get
from hermes_cli.timeouts import get_provider_request_timeout
from hermes_constants import get_hermes_home
from utils import base_url_host_matches
from utils import base_url_host_matches, is_truthy_value
# Use the same logger name as run_agent so tests patching ``run_agent.logger``
# capture our warnings. (run_agent.py also does
@ -1344,9 +1344,9 @@ def init_agent(
# parent_session_id chain, no `name #N` renumber). See #38763 and
# agent/conversation_compression.py. Consumed by compress_context(), not the
# compressor, so it rides on the agent.
compression_in_place = str(
_compression_cfg.get("in_place", False)
).lower() in {"true", "1", "yes"}
compression_in_place = is_truthy_value(
_compression_cfg.get("in_place"), default=False
)
# Read optional explicit context_length override for the auxiliary
# compression model. Custom endpoints often cannot report this via

View file

@ -335,6 +335,9 @@ def compress_context(
# engine session-switch. The conversation keeps one durable id for life,
# eliminating the session-rotation bug cluster. Default False during rollout.
in_place = bool(getattr(agent, "compression_in_place", False))
# Set True once the in-place DB write actually completes (the DB block can
# raise and skip it). Surfaced to the gateway via agent._last_compaction_in_place.
compacted_in_place = False
logger.info(
"context compression started: session=%s messages=%d tokens=~%s model=%s focus=%r",
agent.session_id or "none", _pre_msg_count,
@ -520,17 +523,6 @@ def compress_context(
# conversation's pre-compaction turns are about to be summarized
# away regardless of whether the id rotates).
agent.commit_memory_session(messages)
# Flush any un-persisted messages from the current turn *before*
# the rewrite. compress_context() can be called mid-turn
# (auto-compress when context exceeds threshold) at a point when
# _flush_messages_to_session_db() has not yet run. Without this,
# messages generated during the current turn are silently lost
# (#47202). In-place mode flushes to the SAME session; rotation
# mode flushes to the old session before ending it.
try:
agent._flush_messages_to_session_db(messages)
except Exception:
pass # best-effort — don't block compression on a flush error
if in_place:
# ── In-place compaction: keep the same session_id ──────────
@ -539,32 +531,36 @@ def compress_context(
# id, title, cwd, /goal, and gateway routing all stay put.
#
# Durable replace: the persisted transcript MUST become the
# compacted set, not "original history + summary". The flush
# above wrote any un-persisted current-turn messages onto the
# row; now atomically replace ALL rows with `compressed` so a
# resume reloads the compacted transcript (lossy by design —
# the pre-compaction turns are summarized away). Without this
# the row keeps the full history and compaction never durably
# compacted set, not "original history + summary". `compressed`
# already carries the surviving tail (current-turn messages the
# compressor kept via protect_last_n), so we DON'T pre-flush
# here — a flush would INSERT current-turn rows that the
# replace_messages DELETE immediately discards (wasted writes).
# Atomically replace ALL rows with `compressed` so a resume
# reloads the compacted transcript (lossy by design — the
# pre-compaction turns are summarized away). Without this the
# row keeps the full history and compaction never durably
# shrinks anything (the next turn just re-compacts). See #38763.
agent._session_db.replace_messages(agent.session_id, compressed)
agent._session_db.update_system_prompt(
agent.session_id, new_system_prompt
)
# Reset the flush identity/cursor so the next turn's appends are
# diffed against the COMPACTED transcript, not the pre-compaction
# one. _flush_messages_to_session_db rebuilds its identity set
# when _last_flushed_db_idx == 0; the compacted dicts are passed
# as conversation_history next turn and skipped by identity, so
# only genuinely new turn messages get appended (no dup of the
# summary, no resurrection of dropped turns).
agent._last_flushed_db_idx = 0
# Reset the flush identity set so the next turn's appends are
# diffed against the COMPACTED transcript: the compacted dicts
# are passed as conversation_history next turn and skipped by
# identity, so only genuinely new turn messages get appended
# (no dup of the summary, no resurrection of dropped turns).
agent._flushed_db_message_ids = set()
# Rotation-independent signal: the conversation was compacted in
# place (id unchanged). The caller / gateway uses this instead of
# an id-change diff to re-baseline transcript handling.
# place (id unchanged). The gateway reads this (NOT an id-change
# diff) to re-baseline transcript handling.
compacted_in_place = True
else:
# ── Rotation (legacy): end this session, fork a continuation ─
# Flush any un-persisted current-turn messages to the OLD
# session before ending it, so they survive in the preserved
# parent transcript (#47202). (In-place skips this — see above.)
try:
agent._flush_messages_to_session_db(messages)
except Exception:
pass # best-effort — don't block compression on a flush error
# Propagate title to the new session with auto-numbering
old_title = agent._session_db.get_session_title(agent.session_id)
agent._session_db.end_session(agent.session_id, "compression")
@ -609,12 +605,24 @@ def compress_context(
agent._session_db.set_session_title(agent.session_id, new_title)
except (ValueError, Exception) as e:
logger.debug("Could not propagate title on compression: %s", e)
agent._session_db.update_system_prompt(agent.session_id, new_system_prompt)
# Reset flush cursor — new session starts with no messages written
agent._last_flushed_db_idx = 0
# Shared post-write steps (both modes target agent.session_id, which
# in-place keeps and rotation has already reassigned to the new id):
# refresh the stored system prompt and reset the flush cursor so the
# next turn re-bases its append diff.
agent._session_db.update_system_prompt(agent.session_id, new_system_prompt)
agent._last_flushed_db_idx = 0
except Exception as e:
logger.warning("Session DB compression split failed — new session will NOT be indexed: %s", e)
# Compaction-boundary bookkeeping, computed once. `old_session_id` is only
# bound in the rotation branch; in-place leaves it unset. `_boundary_parent`
# is the id the boundary notifications attribute the prior state to: the old
# id on rotation, the (unchanged) current id in-place.
_old_sid = locals().get("old_session_id")
_is_boundary = bool(_old_sid) or in_place
_boundary_parent = _old_sid or agent.session_id or ""
# Notify the context engine that a compaction boundary occurred. Plugin
# engines (e.g. hermes-lcm) use boundary_reason="compression" to preserve
# DAG lineage / checkpoint per-session state across the boundary instead of
@ -622,13 +630,11 @@ def compress_context(
# ignores kwargs. Fires in BOTH modes: rotation passes old→new ids; in-place
# passes the SAME id (the boundary is real even though the id didn't move).
try:
_old_sid = locals().get("old_session_id")
_boundary = bool(_old_sid) or in_place
if _boundary and hasattr(agent.context_compressor, "on_session_start"):
if _is_boundary and hasattr(agent.context_compressor, "on_session_start"):
agent.context_compressor.on_session_start(
agent.session_id or "",
boundary_reason="compression",
old_session_id=_old_sid or agent.session_id or "",
old_session_id=_boundary_parent,
conversation_id=getattr(agent, "_gateway_session_key", None),
)
except Exception as _ce_err:
@ -641,11 +647,10 @@ def compress_context(
# parent (the conversation didn't fork, but the buffer must still be told
# the transcript was compacted so it doesn't double-count dropped turns).
try:
_old_sid = locals().get("old_session_id")
if (_old_sid or in_place) and agent._memory_manager:
if _is_boundary and agent._memory_manager:
agent._memory_manager.on_session_switch(
agent.session_id or "",
parent_session_id=_old_sid or agent.session_id or "",
parent_session_id=_boundary_parent,
reset=False,
reason="compression",
)
@ -665,13 +670,12 @@ def compress_context(
# the completed old session before its details are lost. In in-place mode
# there is no old id (same session); ``in_place=True`` tells hooks the
# transcript was compacted on the same id rather than rotated.
_old_sid_for_event = locals().get("old_session_id")
if getattr(agent, "event_callback", None):
try:
agent.event_callback("session:compress", {
"platform": agent.platform or "",
"session_id": agent.session_id,
"old_session_id": _old_sid_for_event or "",
"old_session_id": _old_sid or "",
"in_place": in_place,
"compression_count": agent.context_compressor.compression_count,
})
@ -682,7 +686,7 @@ def compress_context(
# via a rotation-independent flag. The gateway uses this — NOT an
# id-change diff — to re-baseline transcript handling (history_offset=0 +
# rewrite on the same id) when compaction happened in place. See #38763.
agent._last_compaction_in_place = bool(locals().get("compacted_in_place", False))
agent._last_compaction_in_place = compacted_in_place
# Keep the post-compression rough estimate for diagnostics, but do not
# treat it as provider-reported prompt usage. Schema-heavy rough estimates

View file

@ -124,6 +124,48 @@ class TestInPlaceCompaction:
roles = [m["role"] for m in compressed if m.get("role") != "system"]
assert all(roles[i] != roles[i + 1] for i in range(len(roles) - 1))
def test_in_place_skips_redundant_preflush(self):
"""In-place must NOT pre-flush current-turn messages: replace_messages
rewrites the whole row, so a flush would INSERT rows it immediately
deletes (wasted writes). The current-turn tail survives via the
compressor's `compressed` output, not the flush."""
from hermes_state import SessionDB
from agent.conversation_compression import compress_context
with tempfile.TemporaryDirectory() as tmp:
db = SessionDB(db_path=Path(tmp) / "t.db")
_seed(db, "ip_flush", "f")
agent = _make_agent(db, "ip_flush", in_place=True)
calls = {"n": 0}
agent._flush_messages_to_session_db = lambda *a, **k: calls.__setitem__(
"n", calls["n"] + 1
)
compress_context(
agent, [{"role": "user", "content": "x"}] * 8,
approx_tokens=100_000, system_message="sys",
)
assert calls["n"] == 0
def test_rotation_still_preflushes(self):
"""Rotation MUST pre-flush so current-turn messages survive in the
preserved old (parent) session before it is ended (#47202)."""
from hermes_state import SessionDB
from agent.conversation_compression import compress_context
with tempfile.TemporaryDirectory() as tmp:
db = SessionDB(db_path=Path(tmp) / "t.db")
_seed(db, "rot_flush", "f")
agent = _make_agent(db, "rot_flush", in_place=False)
calls = {"n": 0}
agent._flush_messages_to_session_db = lambda *a, **k: calls.__setitem__(
"n", calls["n"] + 1
)
compress_context(
agent, [{"role": "user", "content": "x"}] * 8,
approx_tokens=100_000, system_message="sys",
)
assert calls["n"] == 1
class TestRotationStillDefault:
def test_rotation_when_flag_off(self):