diff --git a/agent/agent_init.py b/agent/agent_init.py index f40288abcff..c1e9bd335b5 100644 --- a/agent/agent_init.py +++ b/agent/agent_init.py @@ -50,7 +50,7 @@ from agent.tool_guardrails import ( from hermes_cli.config import cfg_get from hermes_cli.timeouts import get_provider_request_timeout from hermes_constants import get_hermes_home -from utils import base_url_host_matches +from utils import base_url_host_matches, is_truthy_value # Use the same logger name as run_agent so tests patching ``run_agent.logger`` # capture our warnings. (run_agent.py also does @@ -1344,9 +1344,9 @@ def init_agent( # parent_session_id chain, no `name #N` renumber). See #38763 and # agent/conversation_compression.py. Consumed by compress_context(), not the # compressor, so it rides on the agent. - compression_in_place = str( - _compression_cfg.get("in_place", False) - ).lower() in {"true", "1", "yes"} + compression_in_place = is_truthy_value( + _compression_cfg.get("in_place"), default=False + ) # Read optional explicit context_length override for the auxiliary # compression model. Custom endpoints often cannot report this via diff --git a/agent/conversation_compression.py b/agent/conversation_compression.py index 73195be0e6f..42874155f8c 100644 --- a/agent/conversation_compression.py +++ b/agent/conversation_compression.py @@ -335,6 +335,9 @@ def compress_context( # engine session-switch. The conversation keeps one durable id for life, # eliminating the session-rotation bug cluster. Default False during rollout. in_place = bool(getattr(agent, "compression_in_place", False)) + # Set True once the in-place DB write actually completes (the DB block can + # raise and skip it). Surfaced to the gateway via agent._last_compaction_in_place. + compacted_in_place = False logger.info( "context compression started: session=%s messages=%d tokens=~%s model=%s focus=%r", agent.session_id or "none", _pre_msg_count, @@ -520,17 +523,6 @@ def compress_context( # conversation's pre-compaction turns are about to be summarized # away regardless of whether the id rotates). agent.commit_memory_session(messages) - # Flush any un-persisted messages from the current turn *before* - # the rewrite. compress_context() can be called mid-turn - # (auto-compress when context exceeds threshold) at a point when - # _flush_messages_to_session_db() has not yet run. Without this, - # messages generated during the current turn are silently lost - # (#47202). In-place mode flushes to the SAME session; rotation - # mode flushes to the old session before ending it. - try: - agent._flush_messages_to_session_db(messages) - except Exception: - pass # best-effort — don't block compression on a flush error if in_place: # ── In-place compaction: keep the same session_id ────────── @@ -539,32 +531,36 @@ def compress_context( # id, title, cwd, /goal, and gateway routing all stay put. # # Durable replace: the persisted transcript MUST become the - # compacted set, not "original history + summary". The flush - # above wrote any un-persisted current-turn messages onto the - # row; now atomically replace ALL rows with `compressed` so a - # resume reloads the compacted transcript (lossy by design — - # the pre-compaction turns are summarized away). Without this - # the row keeps the full history and compaction never durably + # compacted set, not "original history + summary". `compressed` + # already carries the surviving tail (current-turn messages the + # compressor kept via protect_last_n), so we DON'T pre-flush + # here — a flush would INSERT current-turn rows that the + # replace_messages DELETE immediately discards (wasted writes). + # Atomically replace ALL rows with `compressed` so a resume + # reloads the compacted transcript (lossy by design — the + # pre-compaction turns are summarized away). Without this the + # row keeps the full history and compaction never durably # shrinks anything (the next turn just re-compacts). See #38763. agent._session_db.replace_messages(agent.session_id, compressed) - agent._session_db.update_system_prompt( - agent.session_id, new_system_prompt - ) - # Reset the flush identity/cursor so the next turn's appends are - # diffed against the COMPACTED transcript, not the pre-compaction - # one. _flush_messages_to_session_db rebuilds its identity set - # when _last_flushed_db_idx == 0; the compacted dicts are passed - # as conversation_history next turn and skipped by identity, so - # only genuinely new turn messages get appended (no dup of the - # summary, no resurrection of dropped turns). - agent._last_flushed_db_idx = 0 + # Reset the flush identity set so the next turn's appends are + # diffed against the COMPACTED transcript: the compacted dicts + # are passed as conversation_history next turn and skipped by + # identity, so only genuinely new turn messages get appended + # (no dup of the summary, no resurrection of dropped turns). agent._flushed_db_message_ids = set() # Rotation-independent signal: the conversation was compacted in - # place (id unchanged). The caller / gateway uses this instead of - # an id-change diff to re-baseline transcript handling. + # place (id unchanged). The gateway reads this (NOT an id-change + # diff) to re-baseline transcript handling. compacted_in_place = True else: # ── Rotation (legacy): end this session, fork a continuation ─ + # Flush any un-persisted current-turn messages to the OLD + # session before ending it, so they survive in the preserved + # parent transcript (#47202). (In-place skips this — see above.) + try: + agent._flush_messages_to_session_db(messages) + except Exception: + pass # best-effort — don't block compression on a flush error # Propagate title to the new session with auto-numbering old_title = agent._session_db.get_session_title(agent.session_id) agent._session_db.end_session(agent.session_id, "compression") @@ -609,12 +605,24 @@ def compress_context( agent._session_db.set_session_title(agent.session_id, new_title) except (ValueError, Exception) as e: logger.debug("Could not propagate title on compression: %s", e) - agent._session_db.update_system_prompt(agent.session_id, new_system_prompt) - # Reset flush cursor — new session starts with no messages written - agent._last_flushed_db_idx = 0 + + # Shared post-write steps (both modes target agent.session_id, which + # in-place keeps and rotation has already reassigned to the new id): + # refresh the stored system prompt and reset the flush cursor so the + # next turn re-bases its append diff. + agent._session_db.update_system_prompt(agent.session_id, new_system_prompt) + agent._last_flushed_db_idx = 0 except Exception as e: logger.warning("Session DB compression split failed — new session will NOT be indexed: %s", e) + # Compaction-boundary bookkeeping, computed once. `old_session_id` is only + # bound in the rotation branch; in-place leaves it unset. `_boundary_parent` + # is the id the boundary notifications attribute the prior state to: the old + # id on rotation, the (unchanged) current id in-place. + _old_sid = locals().get("old_session_id") + _is_boundary = bool(_old_sid) or in_place + _boundary_parent = _old_sid or agent.session_id or "" + # Notify the context engine that a compaction boundary occurred. Plugin # engines (e.g. hermes-lcm) use boundary_reason="compression" to preserve # DAG lineage / checkpoint per-session state across the boundary instead of @@ -622,13 +630,11 @@ def compress_context( # ignores kwargs. Fires in BOTH modes: rotation passes old→new ids; in-place # passes the SAME id (the boundary is real even though the id didn't move). try: - _old_sid = locals().get("old_session_id") - _boundary = bool(_old_sid) or in_place - if _boundary and hasattr(agent.context_compressor, "on_session_start"): + if _is_boundary and hasattr(agent.context_compressor, "on_session_start"): agent.context_compressor.on_session_start( agent.session_id or "", boundary_reason="compression", - old_session_id=_old_sid or agent.session_id or "", + old_session_id=_boundary_parent, conversation_id=getattr(agent, "_gateway_session_key", None), ) except Exception as _ce_err: @@ -641,11 +647,10 @@ def compress_context( # parent (the conversation didn't fork, but the buffer must still be told # the transcript was compacted so it doesn't double-count dropped turns). try: - _old_sid = locals().get("old_session_id") - if (_old_sid or in_place) and agent._memory_manager: + if _is_boundary and agent._memory_manager: agent._memory_manager.on_session_switch( agent.session_id or "", - parent_session_id=_old_sid or agent.session_id or "", + parent_session_id=_boundary_parent, reset=False, reason="compression", ) @@ -665,13 +670,12 @@ def compress_context( # the completed old session before its details are lost. In in-place mode # there is no old id (same session); ``in_place=True`` tells hooks the # transcript was compacted on the same id rather than rotated. - _old_sid_for_event = locals().get("old_session_id") if getattr(agent, "event_callback", None): try: agent.event_callback("session:compress", { "platform": agent.platform or "", "session_id": agent.session_id, - "old_session_id": _old_sid_for_event or "", + "old_session_id": _old_sid or "", "in_place": in_place, "compression_count": agent.context_compressor.compression_count, }) @@ -682,7 +686,7 @@ def compress_context( # via a rotation-independent flag. The gateway uses this — NOT an # id-change diff — to re-baseline transcript handling (history_offset=0 + # rewrite on the same id) when compaction happened in place. See #38763. - agent._last_compaction_in_place = bool(locals().get("compacted_in_place", False)) + agent._last_compaction_in_place = compacted_in_place # Keep the post-compression rough estimate for diagnostics, but do not # treat it as provider-reported prompt usage. Schema-heavy rough estimates diff --git a/tests/run_agent/test_in_place_compaction.py b/tests/run_agent/test_in_place_compaction.py index 586b88bdfd3..04248fbb32f 100644 --- a/tests/run_agent/test_in_place_compaction.py +++ b/tests/run_agent/test_in_place_compaction.py @@ -124,6 +124,48 @@ class TestInPlaceCompaction: roles = [m["role"] for m in compressed if m.get("role") != "system"] assert all(roles[i] != roles[i + 1] for i in range(len(roles) - 1)) + def test_in_place_skips_redundant_preflush(self): + """In-place must NOT pre-flush current-turn messages: replace_messages + rewrites the whole row, so a flush would INSERT rows it immediately + deletes (wasted writes). The current-turn tail survives via the + compressor's `compressed` output, not the flush.""" + from hermes_state import SessionDB + from agent.conversation_compression import compress_context + + with tempfile.TemporaryDirectory() as tmp: + db = SessionDB(db_path=Path(tmp) / "t.db") + _seed(db, "ip_flush", "f") + agent = _make_agent(db, "ip_flush", in_place=True) + calls = {"n": 0} + agent._flush_messages_to_session_db = lambda *a, **k: calls.__setitem__( + "n", calls["n"] + 1 + ) + compress_context( + agent, [{"role": "user", "content": "x"}] * 8, + approx_tokens=100_000, system_message="sys", + ) + assert calls["n"] == 0 + + def test_rotation_still_preflushes(self): + """Rotation MUST pre-flush so current-turn messages survive in the + preserved old (parent) session before it is ended (#47202).""" + from hermes_state import SessionDB + from agent.conversation_compression import compress_context + + with tempfile.TemporaryDirectory() as tmp: + db = SessionDB(db_path=Path(tmp) / "t.db") + _seed(db, "rot_flush", "f") + agent = _make_agent(db, "rot_flush", in_place=False) + calls = {"n": 0} + agent._flush_messages_to_session_db = lambda *a, **k: calls.__setitem__( + "n", calls["n"] + 1 + ) + compress_context( + agent, [{"role": "user", "content": "x"}] * 8, + approx_tokens=100_000, system_message="sys", + ) + assert calls["n"] == 1 + class TestRotationStillDefault: def test_rotation_when_flag_off(self):