diff --git a/hermes_state.py b/hermes_state.py index 5fc312f703c..0c56152456b 100644 --- a/hermes_state.py +++ b/hermes_state.py @@ -51,8 +51,7 @@ _BRANCH_CHILD_SQL = ( _COMPRESSION_CHILD_SQL = ( "EXISTS (SELECT 1 FROM sessions p" " WHERE p.id = {a}.parent_session_id" - " AND p.end_reason = 'compression'" - " AND {a}.started_at >= p.ended_at)" + " AND p.end_reason = 'compression')" ) # Rows that surface in pickers: roots + branch children (subagent runs and @@ -2049,7 +2048,6 @@ class SessionDB: JOIN sessions child ON child.id = a.id JOIN sessions parent ON parent.id = child.parent_session_id WHERE parent.end_reason = 'compression' - AND child.started_at >= parent.ended_at ), descendants(id) AS ( SELECT ? @@ -2059,7 +2057,6 @@ class SessionDB: JOIN sessions parent ON parent.id = d.id JOIN sessions child ON child.parent_session_id = parent.id WHERE parent.end_reason = 'compression' - AND child.started_at >= parent.ended_at ), lineage(id) AS ( SELECT id FROM ancestors @@ -2155,37 +2152,64 @@ class SessionDB: def get_compression_tip(self, session_id: str) -> Optional[str]: """Walk the compression-continuation chain forward and return the tip. - A compression continuation is a child session where: - 1. The parent's ``end_reason = 'compression'`` - 2. The child was created AFTER the parent was ended (started_at >= ended_at) + A compression continuation is a child of a session whose + ``end_reason = 'compression'``. Older builds tried to distinguish + continuations from branches/subagents by requiring + ``child.started_at >= parent.ended_at``. That ordering is too brittle: + gateway + compression races can insert the real continuation row before + the parent row's ``ended_at`` is written, while a stale websocket later + creates/reuses a sibling that *does* satisfy the timestamp test. The + visible symptom is brutal: desktop resume follows the stale sibling and + the user's latest messages look "lost" even though they are persisted in + the real continuation chain. - The second condition distinguishes compression continuations from - delegate subagents or branch children, which can also have a - ``parent_session_id`` but were created while the parent was still live. - - Returns the session_id of the latest continuation in the chain, or the - input ``session_id`` if it isn't part of a compression chain (or if the - input itself doesn't exist). + Instead, only follow children of compression-ended parents, exclude + explicit branch/delegate/tool children, and prefer children that are + themselves continuing the compression chain (``end_reason='compression'``) + or still live over stale closed siblings such as ``ws_orphan_reap``. + Returns the latest continuation tip, or the input id when no + continuation exists. """ current = session_id + seen = {current} if current else set() # Bound the walk defensively — compression chains this deep are # pathological and shouldn't happen in practice. 100 = plenty. for _ in range(100): with self._lock: cursor = self._conn.execute( - "SELECT id FROM sessions " - "WHERE parent_session_id = ? " - " AND started_at >= (" - " SELECT ended_at FROM sessions " - " WHERE id = ? AND end_reason = 'compression'" - " ) " - "ORDER BY started_at DESC LIMIT 1", - (current, current), + """ + SELECT child.id + FROM sessions parent + JOIN sessions child ON child.parent_session_id = parent.id + WHERE parent.id = ? + AND parent.end_reason = 'compression' + AND json_extract(COALESCE(child.model_config, '{}'), '$._branched_from') IS NULL + AND json_extract(COALESCE(child.model_config, '{}'), '$._delegate_from') IS NULL + AND COALESCE(child.source, '') != 'tool' + ORDER BY + CASE + WHEN child.end_reason = 'compression' THEN 0 + WHEN child.ended_at IS NULL THEN 1 + ELSE 2 + END, + COALESCE( + (SELECT MAX(m.timestamp) FROM messages m WHERE m.session_id = child.id), + child.started_at + ) DESC, + child.started_at DESC, + child.id DESC + LIMIT 1 + """, + (current,), ) row = cursor.fetchone() if row is None: return current - current = row["id"] + child_id = row["id"] + if not child_id or child_id in seen: + return current + seen.add(child_id) + current = child_id return current def distinct_session_cwds(self, include_archived: bool = False) -> List[Dict[str, Any]]: @@ -2315,10 +2339,12 @@ class SessionDB: # still surfacing old compression roots whose live tip is fresh. # # The CTE seeds from rows the outer WHERE admits (roots + branch - # children), then recursively joins forward through - # compression-continuation edges using the same criteria as - # get_compression_tip (parent.end_reason='compression' AND - # child.started_at >= parent.ended_at). + # children), then recursively joins forward through robust + # compression-continuation edges. Do NOT require + # child.started_at >= parent.ended_at here: real desktop/gateway + # races can insert the continuation row before the parent's + # ended_at is written, while stale websocket siblings may satisfy + # the timestamp test and hijack resume/list projection. outer_where = where_sql id_params: List[Any] = [] if id_needle: @@ -2350,7 +2376,9 @@ class SessionDB: JOIN sessions parent ON parent.id = c.cur_id JOIN sessions child ON child.parent_session_id = c.cur_id WHERE parent.end_reason = 'compression' - AND child.started_at >= parent.ended_at + AND json_extract(COALESCE(child.model_config, '{{}}'), '$._branched_from') IS NULL + AND json_extract(COALESCE(child.model_config, '{{}}'), '$._delegate_from') IS NULL + AND COALESCE(child.source, '') != 'tool' ), chain_max AS ( SELECT diff --git a/tests/hermes_state/test_resolve_resume_session_id.py b/tests/hermes_state/test_resolve_resume_session_id.py index 2737415143d..2f53b1b8a72 100644 --- a/tests/hermes_state/test_resolve_resume_session_id.py +++ b/tests/hermes_state/test_resolve_resume_session_id.py @@ -159,3 +159,51 @@ def test_redirects_from_message_bearing_parent_to_child(db): db.append_message("continued", role="assistant", content="new reply") assert db.resolve_resume_session_id("original") == "continued" + + +def test_compression_tip_handles_pre_ended_real_child_and_ws_orphan_sibling(db): + # Real desktop repro shape from a long GUI session: + # + # root --compression--> real continuation --compression--> live tip + # \ + # `-- stale websocket sibling ended by ws_orphan_reap + # + # The real continuation row can be inserted before root.ended_at is written, + # so the old child.started_at >= parent.ended_at discriminator rejects it and + # follows the stale websocket sibling instead. That makes the GUI look like + # the latest conversation was lost. Resuming root must land on live_tip. + base = int(time.time()) - 10_000 + db.create_session("root", source="tui") + db.append_message("root", role="user", content="pre-compression") + db.end_session("root", "compression") + + db.create_session("real_cont", source="tui", parent_session_id="root") + db.append_message("real_cont", role="user", content="real continuation") + db.end_session("real_cont", "compression") + + db.create_session("ws_orphan", source="tui", parent_session_id="root") + db.append_message("ws_orphan", role="user", content="stale websocket") + db.end_session("ws_orphan", "ws_orphan_reap") + + db.create_session("live_tip", source="tui", parent_session_id="real_cont") + db.append_message("live_tip", role="user", content="latest real turn") + + conn = db._conn + assert conn is not None + conn.execute("UPDATE sessions SET started_at = ?, ended_at = ? WHERE id = 'root'", (base, base + 1000)) + # The real continuation starts before root.ended_at, exactly the race that + # broke the old timestamp-based chain walk. + conn.execute("UPDATE sessions SET started_at = ?, ended_at = ? WHERE id = 'real_cont'", (base + 500, base + 2000)) + conn.execute("UPDATE sessions SET started_at = ?, ended_at = ? WHERE id = 'ws_orphan'", (base + 1000, base + 3000)) + conn.execute("UPDATE sessions SET started_at = ? WHERE id = 'live_tip'", (base + 2000,)) + conn.commit() + + assert db.get_compression_tip("root") == "live_tip" + assert db.resolve_resume_session_id("root") == "live_tip" + + listed = db.list_sessions_rich(limit=10, order_by_last_active=True) + ids = {row["id"] for row in listed} + assert "live_tip" in ids + assert "real_cont" not in ids + assert "ws_orphan" not in ids + diff --git a/tui_gateway/server.py b/tui_gateway/server.py index 05106fd14a8..0d572065d44 100644 --- a/tui_gateway/server.py +++ b/tui_gateway/server.py @@ -4926,7 +4926,7 @@ def _(rid, params: dict) -> dict: fetch_limit = max(limit * 2, 200) rows = [ s - for s in db.list_sessions_rich(source=None, limit=fetch_limit) + for s in db.list_sessions_rich(source=None, limit=fetch_limit, order_by_last_active=True) if (s.get("source") or "").strip().lower() not in deny ][:limit] return _ok( @@ -4973,7 +4973,7 @@ def _(rid, params: dict) -> dict: # users (lots of recent ``tool`` rows) don't get a false # "no eligible session" answer. ``session.list`` uses a # similar over-fetch strategy. - rows = db.list_sessions_rich(source=None, limit=200) + rows = db.list_sessions_rich(source=None, limit=200, order_by_last_active=True) for row in rows: src = (row.get("source") or "").strip().lower() if src in deny: