fix(compression): in-place compaction is non-destructive (soft-archive, not delete)

Teknium review: keeping one durable session id must NOT come at the cost of
destroying history. The prior in-place implementation used replace_messages,
which hard-DELETEs the pre-compaction turns (they also drop out of the FTS
index) — same id, but the original conversation is gone with no recovery path
and the summary becomes the only record. Rotation today is non-destructive
(the old session's full transcript survives under the old id); in-place must
match that durability contract, not weaken it.

Fix: compact in place by SOFT-ARCHIVING, reusing the existing messages.active
flag (the /undo soft-delete mechanic), instead of deleting:

- New SessionDB.archive_and_compact(session_id, compacted): in one atomic
  write, UPDATE messages SET active=0 on the live turns, then insert the
  compacted set as fresh active=1 rows. Nothing is deleted.
- The insert loop is extracted into a shared _insert_message_rows() helper so
  archive_and_compact and replace_messages don't duplicate the 60-line
  column/encoding block (extend-don't-duplicate).
- Agent in-place branch calls archive_and_compact instead of replace_messages.

Durability outcome (proven by test + E2E across repeated compactions):
- Live context load (get_messages_as_conversation / get_messages) filters
  active=1, so a resume reloads ONLY the compacted set — compaction still
  shrinks the live session.
- The pre-compaction turns stay on disk at active=0, recoverable via
  get_messages(include_inactive=True) / restore_rewound.
- They remain FTS-searchable: the messages_fts* triggers index on INSERT and
  remove on DELETE only — they do NOT key on active, and active=0 is a
  content-preserving UPDATE. session_search still finds them.
- Verified across TWO successive compactions: the 1st compaction's originals
  are still recoverable + searchable after the 2nd (answers the "no recovery
  path after the next compaction" concern directly).

message_count now reflects the LIVE (active/compacted) count, matching the
live load. replace_messages keeps its DELETE semantics (still correct for
/retry, /undo) and gains a docstring note pointing compaction at the
non-destructive method.

Tests: test_in_place_keeps_same_session_id strengthened to assert the 8
seeded originals survive at active=0 alongside the 2 compacted rows AND stay
FTS-searchable. Mutation check: swapping archive_and_compact back to a hard
DELETE fails the test, so the non-destructive contract is bound. 285
hermes_state + in-place tests green; rotation/persistence/compress-command/cli
suites green; ruff clean.
This commit is contained in:
kshitijk4poor 2026-06-19 20:53:30 +05:30 committed by Teknium
parent 4f9485a95d
commit 4663456996
3 changed files with 168 additions and 91 deletions

View file

@ -530,18 +530,20 @@ def compress_context(
# renumber, no contextvar/env/logging re-sync. The session's
# id, title, cwd, /goal, and gateway routing all stay put.
#
# Durable replace: the persisted transcript MUST become the
# compacted set, not "original history + summary". `compressed`
# already carries the surviving tail (current-turn messages the
# compressor kept via protect_last_n), so we DON'T pre-flush
# here — a flush would INSERT current-turn rows that the
# replace_messages DELETE immediately discards (wasted writes).
# Atomically replace ALL rows with `compressed` so a resume
# reloads the compacted transcript (lossy by design — the
# pre-compaction turns are summarized away). Without this the
# row keeps the full history and compaction never durably
# shrinks anything (the next turn just re-compacts). See #38763.
agent._session_db.replace_messages(agent.session_id, compressed)
# Durable, NON-DESTRUCTIVE replace: soft-archive the
# pre-compaction turns (active=0, kept on disk + FTS-searchable +
# recoverable) and insert `compressed` as the new live (active=1)
# set, atomically. `compressed` already carries the surviving
# tail (current-turn messages the compressor kept via
# protect_last_n), so we DON'T pre-flush here — a flush would
# INSERT current-turn rows that archive_and_compact would then
# archive alongside the rest (harmless but wasted writes). The
# live-context load filters active=1, so a resume reloads ONLY
# the compacted set; the original turns remain under the SAME id
# for search/recovery (Teknium review — keep one durable id
# WITHOUT destroying history, unlike a hard replace_messages).
# See #38763.
agent._session_db.archive_and_compact(agent.session_id, compressed)
# Reset the flush identity set so the next turn's appends are
# diffed against the COMPACTED transcript: the compacted dicts
# are passed as conversation_history next turn and skipped by

View file

@ -2585,12 +2585,97 @@ class SessionDB:
return self._execute_write(_do)
def _insert_message_rows(self, conn, session_id: str, messages: List[Dict[str, Any]]) -> tuple[int, int]:
"""Insert *messages* as fresh active rows for *session_id*.
Shared by :meth:`replace_messages` (delete-then-insert) and
:meth:`archive_and_compact` (soft-archive-then-insert). Runs inside the
caller's write transaction (takes the live ``conn``). Returns
``(inserted_count, tool_call_count)``. Does NOT touch sessions.* counters
the caller owns that, since the two flows reconcile counts differently.
"""
now_ts = time.time()
inserted = 0
tool_calls_total = 0
for msg in messages:
role = msg.get("role", "unknown")
tool_calls = msg.get("tool_calls")
message_timestamp = now_ts
if msg.get("timestamp") is not None:
try:
ts_value = msg.get("timestamp")
if hasattr(ts_value, "timestamp"):
message_timestamp = float(ts_value.timestamp())
else:
message_timestamp = float(ts_value)
except (TypeError, ValueError):
logger.debug("Ignoring invalid explicit message timestamp: %r", msg.get("timestamp"))
reasoning_details = msg.get("reasoning_details") if role == "assistant" else None
codex_reasoning_items = (
msg.get("codex_reasoning_items") if role == "assistant" else None
)
codex_message_items = (
msg.get("codex_message_items") if role == "assistant" else None
)
reasoning_details_json = (
json.dumps(reasoning_details) if reasoning_details else None
)
codex_items_json = (
json.dumps(codex_reasoning_items) if codex_reasoning_items else None
)
codex_message_items_json = (
json.dumps(codex_message_items) if codex_message_items else None
)
tool_calls_json = json.dumps(tool_calls) if tool_calls else None
# Accept either `platform_message_id` (new explicit name) or
# `message_id` (yuanbao's existing convention on message dicts).
platform_msg_id = (
msg.get("platform_message_id") or msg.get("message_id")
)
conn.execute(
"""INSERT INTO messages (session_id, role, content, tool_call_id,
tool_calls, tool_name, timestamp, token_count, finish_reason,
reasoning, reasoning_content, reasoning_details, codex_reasoning_items,
codex_message_items, platform_message_id, observed)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
session_id,
role,
self._encode_content(msg.get("content")),
msg.get("tool_call_id"),
tool_calls_json,
msg.get("tool_name"),
message_timestamp,
msg.get("token_count"),
msg.get("finish_reason"),
msg.get("reasoning") if role == "assistant" else None,
msg.get("reasoning_content") if role == "assistant" else None,
reasoning_details_json,
codex_items_json,
codex_message_items_json,
platform_msg_id,
1 if msg.get("observed") else 0,
),
)
inserted += 1
if tool_calls is not None:
tool_calls_total += (
len(tool_calls) if isinstance(tool_calls, list) else 1
)
now_ts = max(now_ts + 1e-6, message_timestamp + 1e-6)
return inserted, tool_calls_total
def replace_messages(self, session_id: str, messages: List[Dict[str, Any]]) -> None:
"""Atomically replace every message for a session.
Used by transcript-rewrite flows such as /retry, /undo, and /compress.
The delete + reinsert sequence must commit as one transaction so a
mid-rewrite failure does not leave SQLite with a partial transcript.
DESTRUCTIVE: the prior rows are DELETEd (and drop out of the FTS index).
For compaction that must preserve the pre-compaction transcript under
the same id, use :meth:`archive_and_compact` instead.
"""
def _do(conn):
@ -2601,79 +2686,9 @@ class SessionDB:
"UPDATE sessions SET message_count = 0, tool_call_count = 0 WHERE id = ?",
(session_id,),
)
now_ts = time.time()
total_messages = 0
total_tool_calls = 0
for msg in messages:
role = msg.get("role", "unknown")
tool_calls = msg.get("tool_calls")
message_timestamp = now_ts
if msg.get("timestamp") is not None:
try:
ts_value = msg.get("timestamp")
if hasattr(ts_value, "timestamp"):
message_timestamp = float(ts_value.timestamp())
else:
message_timestamp = float(ts_value)
except (TypeError, ValueError):
logger.debug("Ignoring invalid explicit message timestamp: %r", msg.get("timestamp"))
reasoning_details = msg.get("reasoning_details") if role == "assistant" else None
codex_reasoning_items = (
msg.get("codex_reasoning_items") if role == "assistant" else None
)
codex_message_items = (
msg.get("codex_message_items") if role == "assistant" else None
)
reasoning_details_json = (
json.dumps(reasoning_details) if reasoning_details else None
)
codex_items_json = (
json.dumps(codex_reasoning_items) if codex_reasoning_items else None
)
codex_message_items_json = (
json.dumps(codex_message_items) if codex_message_items else None
)
tool_calls_json = json.dumps(tool_calls) if tool_calls else None
# Accept either `platform_message_id` (new explicit name) or
# `message_id` (yuanbao's existing convention on message dicts).
platform_msg_id = (
msg.get("platform_message_id") or msg.get("message_id")
)
conn.execute(
"""INSERT INTO messages (session_id, role, content, tool_call_id,
tool_calls, tool_name, timestamp, token_count, finish_reason,
reasoning, reasoning_content, reasoning_details, codex_reasoning_items,
codex_message_items, platform_message_id, observed)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
session_id,
role,
self._encode_content(msg.get("content")),
msg.get("tool_call_id"),
tool_calls_json,
msg.get("tool_name"),
message_timestamp,
msg.get("token_count"),
msg.get("finish_reason"),
msg.get("reasoning") if role == "assistant" else None,
msg.get("reasoning_content") if role == "assistant" else None,
reasoning_details_json,
codex_items_json,
codex_message_items_json,
platform_msg_id,
1 if msg.get("observed") else 0,
),
)
total_messages += 1
if tool_calls is not None:
total_tool_calls += (
len(tool_calls) if isinstance(tool_calls, list) else 1
)
now_ts = max(now_ts + 1e-6, message_timestamp + 1e-6)
total_messages, total_tool_calls = self._insert_message_rows(
conn, session_id, messages
)
conn.execute(
"UPDATE sessions SET message_count = ?, tool_call_count = ? WHERE id = ?",
(total_messages, total_tool_calls, session_id),
@ -2681,6 +2696,49 @@ class SessionDB:
self._execute_write(_do)
def archive_and_compact(
self, session_id: str, compacted_messages: List[Dict[str, Any]]
) -> int:
"""Non-destructive in-place compaction for a single durable session id.
Soft-archives every currently-active message (``active = 0``) and
inserts *compacted_messages* as fresh active rows atomically, in one
write transaction. The conversation keeps ONE session id for life
(#38763) WITHOUT destroying history:
- The live-context load (:meth:`get_messages_as_conversation`,
:meth:`get_messages`) filters ``active = 1`` by default, so the model
reloads ONLY the compacted set.
- The archived pre-compaction turns stay on disk and remain
FTS-searchable (the ``messages_fts*`` triggers index on INSERT / drop
on DELETE and do NOT key on ``active``; flipping to ``active = 0`` is a
content-preserving UPDATE), and are recoverable via
``get_messages(..., include_inactive=True)`` / ``restore_rewound``.
This is the durability-preserving alternative to :meth:`replace_messages`
for compaction. ``message_count`` is set to the ACTIVE (compacted) count,
matching what the live load returns. Returns the new active count.
"""
def _do(conn):
conn.execute(
"UPDATE messages SET active = 0 WHERE session_id = ? AND active = 1",
(session_id,),
)
inserted, tool_calls_total = self._insert_message_rows(
conn, session_id, compacted_messages
)
# message_count / tool_call_count reflect the LIVE (active) set —
# the archived rows are still on disk but not part of the live count.
conn.execute(
"UPDATE sessions SET message_count = ?, tool_call_count = ? WHERE id = ?",
(inserted, tool_calls_total, session_id),
)
return inserted
return self._execute_write(_do)
def get_messages(
self, session_id: str, include_inactive: bool = False
) -> List[Dict[str, Any]]:

View file

@ -87,24 +87,41 @@ class TestInPlaceCompaction:
row = db.get_session(sid)
assert row["end_reason"] is None
assert row["title"] == "my-research"
# DURABLE REPLACE (the core invariant): the persisted transcript is
# now the COMPACTED set, not "full history + summary". A resume must
# reload the compacted transcript so compaction actually shrinks the
# session and doesn't immediately re-compact (#38763).
# DURABLE, NON-DESTRUCTIVE compaction (the core invariant, per
# Teknium's review): the LIVE context is the compacted set, but the
# pre-compaction turns are PRESERVED on disk (active=0), not deleted
# — searchable + recoverable under the SAME id. A resume reloads the
# compacted set so compaction actually shrinks the live session and
# doesn't immediately re-compact (#38763).
reloaded = db.get_messages_as_conversation(sid)
assert len(reloaded) == 2
assert [m.get("content") for m in reloaded] == [
"[CONTEXT COMPACTION] summary of prior turns",
"recent reply",
]
assert row["message_count"] == 2
assert row["message_count"] == 2 # live (active) count
# NON-DESTRUCTIVE: the 8 seeded originals survive at active=0
# alongside the 2 compacted rows — nothing was DELETEd.
all_rows = db.get_messages(sid, include_inactive=True)
assert len(all_rows) == 10
archived = [m for m in all_rows if not m.get("active", 1)]
assert len(archived) == 8
# The originals remain FTS-searchable (active=0 is a content-
# preserving UPDATE; the fts triggers don't key on active).
hit = db._conn.execute(
"SELECT 1 FROM messages_fts f JOIN messages m ON m.id = f.rowid "
"WHERE m.session_id = ? AND messages_fts MATCH 'msg' AND m.active = 0 "
"LIMIT 1",
(sid,),
).fetchone()
assert hit is not None
# Flush identity/cursor reset so next-turn appends diff against the
# compacted transcript (rebuilds the identity set on next flush).
assert agent._last_flushed_db_idx == 0
assert agent._flushed_db_message_ids == set()
# Rotation-independent in-place signal set for the gateway.
assert agent._last_compaction_in_place is True
# Transcript actually shrank.
# Live transcript actually shrank.
assert len(compressed) == 2
def test_in_place_alternation_preserved(self):