diff --git a/agent/conversation_compression.py b/agent/conversation_compression.py index 42874155f8c..610f0ac5ac6 100644 --- a/agent/conversation_compression.py +++ b/agent/conversation_compression.py @@ -530,18 +530,20 @@ def compress_context( # renumber, no contextvar/env/logging re-sync. The session's # id, title, cwd, /goal, and gateway routing all stay put. # - # Durable replace: the persisted transcript MUST become the - # compacted set, not "original history + summary". `compressed` - # already carries the surviving tail (current-turn messages the - # compressor kept via protect_last_n), so we DON'T pre-flush - # here — a flush would INSERT current-turn rows that the - # replace_messages DELETE immediately discards (wasted writes). - # Atomically replace ALL rows with `compressed` so a resume - # reloads the compacted transcript (lossy by design — the - # pre-compaction turns are summarized away). Without this the - # row keeps the full history and compaction never durably - # shrinks anything (the next turn just re-compacts). See #38763. - agent._session_db.replace_messages(agent.session_id, compressed) + # Durable, NON-DESTRUCTIVE replace: soft-archive the + # pre-compaction turns (active=0, kept on disk + FTS-searchable + + # recoverable) and insert `compressed` as the new live (active=1) + # set, atomically. `compressed` already carries the surviving + # tail (current-turn messages the compressor kept via + # protect_last_n), so we DON'T pre-flush here — a flush would + # INSERT current-turn rows that archive_and_compact would then + # archive alongside the rest (harmless but wasted writes). The + # live-context load filters active=1, so a resume reloads ONLY + # the compacted set; the original turns remain under the SAME id + # for search/recovery (Teknium review — keep one durable id + # WITHOUT destroying history, unlike a hard replace_messages). + # See #38763. + agent._session_db.archive_and_compact(agent.session_id, compressed) # Reset the flush identity set so the next turn's appends are # diffed against the COMPACTED transcript: the compacted dicts # are passed as conversation_history next turn and skipped by diff --git a/hermes_state.py b/hermes_state.py index 8847593d47c..54f4fcf420c 100644 --- a/hermes_state.py +++ b/hermes_state.py @@ -2585,12 +2585,97 @@ class SessionDB: return self._execute_write(_do) + def _insert_message_rows(self, conn, session_id: str, messages: List[Dict[str, Any]]) -> tuple[int, int]: + """Insert *messages* as fresh active rows for *session_id*. + + Shared by :meth:`replace_messages` (delete-then-insert) and + :meth:`archive_and_compact` (soft-archive-then-insert). Runs inside the + caller's write transaction (takes the live ``conn``). Returns + ``(inserted_count, tool_call_count)``. Does NOT touch sessions.* counters + — the caller owns that, since the two flows reconcile counts differently. + """ + now_ts = time.time() + inserted = 0 + tool_calls_total = 0 + for msg in messages: + role = msg.get("role", "unknown") + tool_calls = msg.get("tool_calls") + message_timestamp = now_ts + if msg.get("timestamp") is not None: + try: + ts_value = msg.get("timestamp") + if hasattr(ts_value, "timestamp"): + message_timestamp = float(ts_value.timestamp()) + else: + message_timestamp = float(ts_value) + except (TypeError, ValueError): + logger.debug("Ignoring invalid explicit message timestamp: %r", msg.get("timestamp")) + reasoning_details = msg.get("reasoning_details") if role == "assistant" else None + codex_reasoning_items = ( + msg.get("codex_reasoning_items") if role == "assistant" else None + ) + codex_message_items = ( + msg.get("codex_message_items") if role == "assistant" else None + ) + reasoning_details_json = ( + json.dumps(reasoning_details) if reasoning_details else None + ) + codex_items_json = ( + json.dumps(codex_reasoning_items) if codex_reasoning_items else None + ) + codex_message_items_json = ( + json.dumps(codex_message_items) if codex_message_items else None + ) + tool_calls_json = json.dumps(tool_calls) if tool_calls else None + # Accept either `platform_message_id` (new explicit name) or + # `message_id` (yuanbao's existing convention on message dicts). + platform_msg_id = ( + msg.get("platform_message_id") or msg.get("message_id") + ) + + conn.execute( + """INSERT INTO messages (session_id, role, content, tool_call_id, + tool_calls, tool_name, timestamp, token_count, finish_reason, + reasoning, reasoning_content, reasoning_details, codex_reasoning_items, + codex_message_items, platform_message_id, observed) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", + ( + session_id, + role, + self._encode_content(msg.get("content")), + msg.get("tool_call_id"), + tool_calls_json, + msg.get("tool_name"), + message_timestamp, + msg.get("token_count"), + msg.get("finish_reason"), + msg.get("reasoning") if role == "assistant" else None, + msg.get("reasoning_content") if role == "assistant" else None, + reasoning_details_json, + codex_items_json, + codex_message_items_json, + platform_msg_id, + 1 if msg.get("observed") else 0, + ), + ) + inserted += 1 + if tool_calls is not None: + tool_calls_total += ( + len(tool_calls) if isinstance(tool_calls, list) else 1 + ) + now_ts = max(now_ts + 1e-6, message_timestamp + 1e-6) + return inserted, tool_calls_total + def replace_messages(self, session_id: str, messages: List[Dict[str, Any]]) -> None: """Atomically replace every message for a session. Used by transcript-rewrite flows such as /retry, /undo, and /compress. The delete + reinsert sequence must commit as one transaction so a mid-rewrite failure does not leave SQLite with a partial transcript. + + DESTRUCTIVE: the prior rows are DELETEd (and drop out of the FTS index). + For compaction that must preserve the pre-compaction transcript under + the same id, use :meth:`archive_and_compact` instead. """ def _do(conn): @@ -2601,79 +2686,9 @@ class SessionDB: "UPDATE sessions SET message_count = 0, tool_call_count = 0 WHERE id = ?", (session_id,), ) - - now_ts = time.time() - total_messages = 0 - total_tool_calls = 0 - for msg in messages: - role = msg.get("role", "unknown") - tool_calls = msg.get("tool_calls") - message_timestamp = now_ts - if msg.get("timestamp") is not None: - try: - ts_value = msg.get("timestamp") - if hasattr(ts_value, "timestamp"): - message_timestamp = float(ts_value.timestamp()) - else: - message_timestamp = float(ts_value) - except (TypeError, ValueError): - logger.debug("Ignoring invalid explicit message timestamp: %r", msg.get("timestamp")) - reasoning_details = msg.get("reasoning_details") if role == "assistant" else None - codex_reasoning_items = ( - msg.get("codex_reasoning_items") if role == "assistant" else None - ) - codex_message_items = ( - msg.get("codex_message_items") if role == "assistant" else None - ) - - reasoning_details_json = ( - json.dumps(reasoning_details) if reasoning_details else None - ) - codex_items_json = ( - json.dumps(codex_reasoning_items) if codex_reasoning_items else None - ) - codex_message_items_json = ( - json.dumps(codex_message_items) if codex_message_items else None - ) - tool_calls_json = json.dumps(tool_calls) if tool_calls else None - # Accept either `platform_message_id` (new explicit name) or - # `message_id` (yuanbao's existing convention on message dicts). - platform_msg_id = ( - msg.get("platform_message_id") or msg.get("message_id") - ) - - conn.execute( - """INSERT INTO messages (session_id, role, content, tool_call_id, - tool_calls, tool_name, timestamp, token_count, finish_reason, - reasoning, reasoning_content, reasoning_details, codex_reasoning_items, - codex_message_items, platform_message_id, observed) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", - ( - session_id, - role, - self._encode_content(msg.get("content")), - msg.get("tool_call_id"), - tool_calls_json, - msg.get("tool_name"), - message_timestamp, - msg.get("token_count"), - msg.get("finish_reason"), - msg.get("reasoning") if role == "assistant" else None, - msg.get("reasoning_content") if role == "assistant" else None, - reasoning_details_json, - codex_items_json, - codex_message_items_json, - platform_msg_id, - 1 if msg.get("observed") else 0, - ), - ) - total_messages += 1 - if tool_calls is not None: - total_tool_calls += ( - len(tool_calls) if isinstance(tool_calls, list) else 1 - ) - now_ts = max(now_ts + 1e-6, message_timestamp + 1e-6) - + total_messages, total_tool_calls = self._insert_message_rows( + conn, session_id, messages + ) conn.execute( "UPDATE sessions SET message_count = ?, tool_call_count = ? WHERE id = ?", (total_messages, total_tool_calls, session_id), @@ -2681,6 +2696,49 @@ class SessionDB: self._execute_write(_do) + def archive_and_compact( + self, session_id: str, compacted_messages: List[Dict[str, Any]] + ) -> int: + """Non-destructive in-place compaction for a single durable session id. + + Soft-archives every currently-active message (``active = 0``) and + inserts *compacted_messages* as fresh active rows — atomically, in one + write transaction. The conversation keeps ONE session id for life + (#38763) WITHOUT destroying history: + + - The live-context load (:meth:`get_messages_as_conversation`, + :meth:`get_messages`) filters ``active = 1`` by default, so the model + reloads ONLY the compacted set. + - The archived pre-compaction turns stay on disk and remain + FTS-searchable (the ``messages_fts*`` triggers index on INSERT / drop + on DELETE and do NOT key on ``active``; flipping to ``active = 0`` is a + content-preserving UPDATE), and are recoverable via + ``get_messages(..., include_inactive=True)`` / ``restore_rewound``. + + This is the durability-preserving alternative to :meth:`replace_messages` + for compaction. ``message_count`` is set to the ACTIVE (compacted) count, + matching what the live load returns. Returns the new active count. + """ + + def _do(conn): + conn.execute( + "UPDATE messages SET active = 0 WHERE session_id = ? AND active = 1", + (session_id,), + ) + inserted, tool_calls_total = self._insert_message_rows( + conn, session_id, compacted_messages + ) + # message_count / tool_call_count reflect the LIVE (active) set — + # the archived rows are still on disk but not part of the live count. + conn.execute( + "UPDATE sessions SET message_count = ?, tool_call_count = ? WHERE id = ?", + (inserted, tool_calls_total, session_id), + ) + return inserted + + return self._execute_write(_do) + + def get_messages( self, session_id: str, include_inactive: bool = False ) -> List[Dict[str, Any]]: diff --git a/tests/run_agent/test_in_place_compaction.py b/tests/run_agent/test_in_place_compaction.py index 04248fbb32f..384f7049d9d 100644 --- a/tests/run_agent/test_in_place_compaction.py +++ b/tests/run_agent/test_in_place_compaction.py @@ -87,24 +87,41 @@ class TestInPlaceCompaction: row = db.get_session(sid) assert row["end_reason"] is None assert row["title"] == "my-research" - # DURABLE REPLACE (the core invariant): the persisted transcript is - # now the COMPACTED set, not "full history + summary". A resume must - # reload the compacted transcript so compaction actually shrinks the - # session and doesn't immediately re-compact (#38763). + # DURABLE, NON-DESTRUCTIVE compaction (the core invariant, per + # Teknium's review): the LIVE context is the compacted set, but the + # pre-compaction turns are PRESERVED on disk (active=0), not deleted + # — searchable + recoverable under the SAME id. A resume reloads the + # compacted set so compaction actually shrinks the live session and + # doesn't immediately re-compact (#38763). reloaded = db.get_messages_as_conversation(sid) assert len(reloaded) == 2 assert [m.get("content") for m in reloaded] == [ "[CONTEXT COMPACTION] summary of prior turns", "recent reply", ] - assert row["message_count"] == 2 + assert row["message_count"] == 2 # live (active) count + # NON-DESTRUCTIVE: the 8 seeded originals survive at active=0 + # alongside the 2 compacted rows — nothing was DELETEd. + all_rows = db.get_messages(sid, include_inactive=True) + assert len(all_rows) == 10 + archived = [m for m in all_rows if not m.get("active", 1)] + assert len(archived) == 8 + # The originals remain FTS-searchable (active=0 is a content- + # preserving UPDATE; the fts triggers don't key on active). + hit = db._conn.execute( + "SELECT 1 FROM messages_fts f JOIN messages m ON m.id = f.rowid " + "WHERE m.session_id = ? AND messages_fts MATCH 'msg' AND m.active = 0 " + "LIMIT 1", + (sid,), + ).fetchone() + assert hit is not None # Flush identity/cursor reset so next-turn appends diff against the # compacted transcript (rebuilds the identity set on next flush). assert agent._last_flushed_db_idx == 0 assert agent._flushed_db_message_ids == set() # Rotation-independent in-place signal set for the gateway. assert agent._last_compaction_in_place is True - # Transcript actually shrank. + # Live transcript actually shrank. assert len(compressed) == 2 def test_in_place_alternation_preserved(self):