From 0b7128582fad94d9bb5e9b36de4f937162fab7f2 Mon Sep 17 00:00:00 2001 From: Teknium <127238744+teknium1@users.noreply.github.com> Date: Thu, 25 Jun 2026 21:18:41 -0700 Subject: [PATCH] fix(state): detect and repair FTS write corruption that silently drops gateway history (#52798) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A readable state.db can still reject every message write through the messages_fts* triggers when the FTS5 index is corrupt: base-table reads and PRAGMA integrity_check pass, but INSERT INTO messages fails with 'database disk image is malformed'. The gateway reloads conversation_history from disk each turn, so a silently-failed write hands the next turn stale/empty history even though the same cached AIAgent still holds the live transcript — causing immediate same-session amnesia. (#50502) - hermes_state.py: _db_opens_cleanly() now drives a rolled-back message write through the FTS triggers, so write-only corruption (which the read-only probe reported healthy) is detected. repair_state_db_schema() gains an in-place FTS5 'rebuild' strategy (tier 0) before the dedup/drop tiers, plus an already_healthy short-circuit. Both 'hermes sessions repair' and 'hermes doctor' route through these, so the fix covers the whole class. - hermes_cli/doctor.py: the state.db check runs the write-health probe even on the success (readable) path and repairs in place with --fix. - gateway/run.py: _select_cached_agent_history() prefers the cached agent's longer live _session_messages over a shorter persisted transcript, so an FTS write failure can't wipe in-session context. - tests: regressions for write-health detection, in-place repair preserving rows + resuming writes, the already_healthy shortcut, and the gateway guard. Combines the approaches from #50504 (@0-CYBERDYNE-SYSTEMS-0, issue author), #52165 (@davidgut1982), and #50576 (@trevorgordon981). --- gateway/run.py | 44 +++++++++ hermes_cli/doctor.py | 40 ++++++++ hermes_state.py | 88 +++++++++++++++-- tests/test_state_db_malformed_repair.py | 126 ++++++++++++++++++++++-- 4 files changed, 285 insertions(+), 13 deletions(-) diff --git a/gateway/run.py b/gateway/run.py index 5b3c2c88f36..9d31cc2a60e 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -832,6 +832,28 @@ def _build_gateway_agent_history( return agent_history, observed_context +def _select_cached_agent_history( + persisted_history: List[Dict[str, Any]], + live_history: Any, +) -> List[Dict[str, Any]]: + """Prefer a cached agent's live in-memory transcript over a shorter + persisted one. + + Guards the FTS write-corruption case (#50502): when message writes fail + silently through corrupt FTS triggers, the next turn reloads a stale/empty + ``conversation_history`` from disk even though the same cached ``AIAgent`` + still holds the full live ``_session_messages``. Replacing the live + transcript with that shorter persisted copy causes immediate same-session + amnesia. When the live transcript is strictly longer, keep it. + + Returns ``persisted_history`` unchanged unless the live copy is a longer + list, in which case a copy of the live transcript is returned. + """ + if isinstance(live_history, list) and len(live_history) > len(persisted_history): + return list(live_history) + return persisted_history + + def _wrap_current_message_with_observed_context(message: Any, observed_context: Optional[str]) -> Any: """Prepend observed Telegram context to the API-only current user turn.""" @@ -15875,6 +15897,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew user_id_alt=getattr(source, "user_id_alt", None), ) agent = None + reused_cached_agent = False _cache_lock = getattr(self, "_agent_cache_lock", None) _cache = getattr(self, "_agent_cache", None) @@ -15933,6 +15956,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew _xproc_evicted_agent = _ev_agent else: agent = cached[0] + reused_cached_agent = True # Refresh LRU order so the cap enforcement evicts # truly-oldest entries, not the one we just used. if hasattr(_cache, "move_to_end"): @@ -16217,6 +16241,26 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew channel_prompt=channel_prompt, inject_timestamps=_message_timestamps_enabled(_load_gateway_config()), ) + + # FTS write-corruption guard (#50502): when message persistence + # fails silently through corrupt FTS triggers, the reloaded + # transcript above is stale/empty even though the SAME cached agent + # still holds the full live conversation in `_session_messages`. + # Replacing the live transcript with that shorter copy causes + # immediate same-session amnesia. Only applies when we reused a + # cached agent bound to this exact session_id. + if reused_cached_agent and getattr(agent, "session_id", None) == session_id: + _selected = _select_cached_agent_history( + agent_history, getattr(agent, "_session_messages", None) + ) + if _selected is not agent_history: + logger.warning( + "Persisted transcript lagged live cached history for " + "session %s (disk=%d, memory=%d); preserving live " + "conversation context (possible FTS write corruption)", + session_key, len(agent_history), len(_selected), + ) + agent_history = _selected # Collect MEDIA paths already in history so we can exclude them # from the current turn's extraction. This is compression-safe: diff --git a/hermes_cli/doctor.py b/hermes_cli/doctor.py index 7377e2959a2..496f7e90742 100644 --- a/hermes_cli/doctor.py +++ b/hermes_cli/doctor.py @@ -1202,6 +1202,46 @@ def run_doctor(args): count = cursor.fetchone()[0] conn.close() check_ok(f"{_DHH}/state.db exists ({count} sessions)") + + # FTS write-health probe (#50502): `SELECT COUNT(*)` above succeeds + # even when the FTS index is corrupt and every message write fails + # through the triggers. `_db_opens_cleanly` now drives a rolled-back + # write so this otherwise-silent corruption class is surfaced (and + # repaired in place with --fix). + from hermes_state import _db_opens_cleanly, repair_state_db_schema + + _write_reason = _db_opens_cleanly(state_db_path) + if _write_reason is not None: + check_warn( + f"{_DHH}/state.db fails a write-health probe (FTS index may be corrupt)", + f"({_write_reason})", + ) + if should_fix: + report = repair_state_db_schema(state_db_path) + if report.get("repaired"): + backup_name = ( + Path(report["backup_path"]).name + if report.get("backup_path") else "n/a" + ) + check_ok( + "Repaired state.db FTS write health", + f"(strategy: {report.get('strategy')}; backup: {backup_name})", + ) + fixed_count += 1 + else: + check_warn( + "state.db FTS write-health repair did not recover automatically", + f"({report.get('error')}; backup: {report.get('backup_path')})", + ) + issues.append( + "state.db FTS write corruption and auto-repair failed — " + "restore from the backup copy beside state.db" + ) + else: + issues.append( + "state.db FTS write corruption — run 'hermes doctor --fix' " + "(or 'hermes sessions repair') to rebuild the FTS index" + ) except Exception as e: from hermes_state import is_malformed_db_error, repair_state_db_schema diff --git a/hermes_state.py b/hermes_state.py index 66255663058..a7938f7167f 100644 --- a/hermes_state.py +++ b/hermes_state.py @@ -403,7 +403,11 @@ def _db_opens_cleanly(db_path: Path) -> Optional[str]: Runs the same first-statement (``PRAGMA journal_mode``) that trips the malformed-schema parse, then ``PRAGMA integrity_check`` and a canonical - ``sessions`` read. + ``sessions`` read, and finally a rolled-back ``messages`` write so that + FTS5 index corruption — which leaves base-table reads and + ``integrity_check`` passing while every ``INSERT INTO messages`` fails + through the FTS triggers — is reported as unhealthy rather than slipping + past as a false "ok" (#50502). """ conn = sqlite3.connect(str(db_path), isolation_level=None) try: @@ -413,6 +417,36 @@ def _db_opens_cleanly(db_path: Path) -> Optional[str]: if problems: return "; ".join(problems[:3]) conn.execute("SELECT COUNT(*) FROM sessions").fetchone() + + # FTS write probe: drive a row through the messages_fts* triggers in a + # transaction that is always rolled back, so a corrupt FTS index that + # rejects writes is caught even though reads look healthy. The probe is + # best-effort — if the messages/sessions tables don't exist yet (brand + # new file mid-init) the OperationalError is treated as "not yet a + # populated DB", not corruption. + probe_session_id = f"_hermes_fts_health_probe_{time.time_ns()}" + try: + conn.execute("BEGIN IMMEDIATE") + conn.execute( + "INSERT INTO sessions (id, source, started_at) VALUES (?, ?, ?)", + (probe_session_id, "_health_probe", time.time()), + ) + conn.execute( + "INSERT INTO messages (session_id, role, content, timestamp) " + "VALUES (?, ?, ?, ?)", + (probe_session_id, "user", "_fts_health_probe", time.time()), + ) + conn.execute("ROLLBACK") + except sqlite3.OperationalError as exc: + # Missing tables / FTS disabled — not the corruption class we probe. + try: + conn.execute("ROLLBACK") + except sqlite3.Error: + pass + msg = str(exc).lower() + if "no such table" in msg or "no such column" in msg: + return None + return str(exc) return None except sqlite3.DatabaseError as exc: return str(exc) @@ -421,16 +455,23 @@ def _db_opens_cleanly(db_path: Path) -> Optional[str]: def repair_state_db_schema(db_path: Path, *, backup: bool = True) -> Dict[str, Any]: - """Repair a state.db whose ``sqlite_master`` schema is malformed. + """Repair a state.db whose ``sqlite_master`` schema is malformed or whose + FTS indexes reject writes. - Handles the "duplicate object definition" / malformed-schema class where - even ``PRAGMA`` statements fail. Tries least-destructive recovery first - and escalates: + Handles two corruption classes: the "duplicate object definition" / + malformed-schema class where even ``PRAGMA`` statements fail, and the FTS + write-corruption class (#50502) where base tables read fine and + ``integrity_check`` passes but writes fail through the ``messages_fts*`` + triggers. Tries least-destructive recovery first and escalates: - 1. **De-duplicate** ``sqlite_master`` (keep the lowest rowid per + 1. **Rebuild FTS indexes in place** via the FTS5 ``'rebuild'`` command, + which rewrites the internal b-tree segments from the canonical + ``messages`` rows without dropping or recreating anything. Fixes the + FTS write-corruption class while preserving the schema intact. + 2. **De-duplicate** ``sqlite_master`` (keep the lowest rowid per ``type``/``name``). Fixes the canonical "table X already exists" case and PRESERVES the existing FTS index intact. - 2. **Drop the FTS schema** (every ``messages_fts*`` object) + ``VACUUM``. + 3. **Drop the FTS schema** (every ``messages_fts*`` object) + ``VACUUM``. The next ``SessionDB()`` open rebuilds the FTS indexes from the canonical ``messages`` table. @@ -452,10 +493,43 @@ def repair_state_db_schema(db_path: Path, *, backup: bool = True) -> Dict[str, A report["error"] = f"{db_path} does not exist" return report + if _db_opens_cleanly(db_path) is None: + report["repaired"] = True + report["strategy"] = "already_healthy" + return report + if backup: bpath = _backup_db_file(db_path) report["backup_path"] = str(bpath) if bpath else None + # ── Strategy 0: rebuild FTS indexes in place (FTS write-corruption) ── + # The FTS5 'rebuild' command rewrites the internal index from the canonical + # content table. This is the recommended, least-destructive recovery for a + # corrupt FTS index that rejects message writes while reads still succeed. + try: + conn = sqlite3.connect(str(db_path), isolation_level=None) + try: + for table_name in ("messages_fts", "messages_fts_trigram"): + try: + conn.execute( + f"INSERT INTO {table_name}({table_name}) VALUES('rebuild')" + ) + except sqlite3.OperationalError: + # Table absent (FTS disabled / trigram off) — skip it. + continue + finally: + conn.close() + if _db_opens_cleanly(db_path) is None: + report["repaired"] = True + report["strategy"] = "rebuild_fts" + logger.warning( + "state.db FTS indexes rebuilt in place (schema preserved): %s", + db_path, + ) + return report + except sqlite3.DatabaseError as exc: + logger.warning("state.db FTS in-place rebuild pass failed: %s", exc) + # ── Strategy 1: de-duplicate sqlite_master (keeps FTS index) ── try: conn = sqlite3.connect(str(db_path), isolation_level=None) diff --git a/tests/test_state_db_malformed_repair.py b/tests/test_state_db_malformed_repair.py index 85658758860..2274d8cae0b 100644 --- a/tests/test_state_db_malformed_repair.py +++ b/tests/test_state_db_malformed_repair.py @@ -166,17 +166,29 @@ def test_strategy_b_rebuild_when_dedup_insufficient(tmp_path, monkeypatch): _build_healthy_db(db_path) _corrupt_duplicate_fts(db_path) - # Make the post-strat-1 verification report "still broken" exactly once, - # so the routine escalates to strat 2 (drop FTS + VACUUM) and runs its - # real SQL against the file; the strat-2 verification then uses the real - # check and passes. + # Make every health verification report "still broken" until the drop-FTS + # pass has actually removed the messages_fts schema, so the routine + # escalates past the in-place-rebuild and dedup passes to strat 2 (drop FTS + # + VACUUM) and runs its real SQL against the file. Keyed on whether the FTS + # schema is still present rather than a call counter, so it stays correct as + # earlier verification call sites are added/removed. real_check = hermes_state._db_opens_cleanly calls = {"n": 0} def flaky_check(path): calls["n"] += 1 - if calls["n"] == 1: - return "pretend strat 1 was insufficient" + try: + probe = sqlite3.connect(str(path)) + still_has_fts = probe.execute( + "SELECT COUNT(*) FROM sqlite_master " + "WHERE name LIKE 'messages_fts%'" + ).fetchone()[0] + probe.close() + except sqlite3.DatabaseError: + # sqlite_master still malformed (pre-dedup) — treat as broken. + return "pretend still broken (schema unreadable)" + if still_has_fts: + return "pretend in-place/dedup passes were insufficient" return real_check(path) monkeypatch.setattr(hermes_state, "_db_opens_cleanly", flaky_check) @@ -242,3 +254,105 @@ def test_repair_on_clean_db_is_noop(tmp_path): assert conn.execute("SELECT COUNT(*) FROM messages").fetchone()[0] == 10 assert conn.execute("PRAGMA integrity_check").fetchone()[0] == "ok" conn.close() + + +# ── FTS write-corruption class (#50502) ────────────────────────────────── +# A readable state.db can still reject every message write through the +# messages_fts* triggers when the FTS index is corrupt. Plain +# `SELECT COUNT(*)` reads succeed, so the old read-only health probe reported +# it healthy and the gateway silently dropped conversation history. + + +def _corrupt_fts_index_data(db_path: Path) -> None: + """Overwrite the FTS5 shadow b-tree blocks with garbage bytes. + + Reproduces the runtime "database disk image is malformed" / "malformed + inverted index for FTS5 table" failure that fires on writes through the + triggers while base-table reads still return rows. + """ + conn = sqlite3.connect(str(db_path), isolation_level=None) + conn.execute("UPDATE messages_fts_data SET block = X'DEADBEEFDEADBEEF'") + conn.close() + + +def test_fts_write_corruption_detected_by_write_probe(tmp_path): + """_db_opens_cleanly's rolled-back write probe flags FTS write corruption.""" + from hermes_state import _db_opens_cleanly + + db_path = tmp_path / "state.db" + _build_healthy_db(db_path) + assert _db_opens_cleanly(db_path) is None # healthy before + + _corrupt_fts_index_data(db_path) + + # Plain base-table reads still succeed — this is the silent class. + conn = sqlite3.connect(str(db_path), isolation_level=None) + assert conn.execute("SELECT COUNT(*) FROM sessions").fetchone()[0] >= 1 + assert conn.execute("SELECT COUNT(*) FROM messages").fetchone()[0] == 10 + conn.close() + + # The write-aware probe reports the corruption (not a false "ok"). + reason = _db_opens_cleanly(db_path) + assert reason is not None + + +def test_fts_write_corruption_repaired_in_place(tmp_path): + """repair_state_db_schema rebuilds the FTS index; reads + writes resume.""" + from hermes_state import _db_opens_cleanly + + db_path = tmp_path / "state.db" + _build_healthy_db(db_path) + _corrupt_fts_index_data(db_path) + + report = repair_state_db_schema(db_path) + assert report["repaired"] is True + assert report["strategy"] in ("rebuild_fts", "dedup_schema", "drop_fts_rebuild") + assert _db_opens_cleanly(db_path) is None + + # Canonical rows preserved AND new writes go through the triggers again. + db = SessionDB(db_path=db_path) + try: + assert db._conn.execute("SELECT COUNT(*) FROM messages").fetchone()[0] == 10 + sid = db._conn.execute("SELECT id FROM sessions LIMIT 1").fetchone()[0] + db.append_message(sid, role="user", content="post repair pizza message") + assert db._conn.execute("SELECT COUNT(*) FROM messages").fetchone()[0] == 11 + hits = db._conn.execute( + "SELECT COUNT(*) FROM messages_fts WHERE messages_fts MATCH 'pizza'" + ).fetchone()[0] + assert hits >= 5 + finally: + db.close() + + +def test_repair_noop_db_uses_already_healthy_shortcut(tmp_path): + """A healthy DB returns the cheap already_healthy strategy, no surgery.""" + db_path = tmp_path / "state.db" + _build_healthy_db(db_path) + report = repair_state_db_schema(db_path, backup=False) + assert report["repaired"] is True + assert report["strategy"] == "already_healthy" + + +def test_select_cached_agent_history_prefers_longer_live_transcript(): + """Gateway guard keeps the live transcript when persisted history lags.""" + from gateway.run import _select_cached_agent_history + + persisted = [{"role": "user", "content": "only one"}] + live = [ + {"role": "user", "content": "one"}, + {"role": "assistant", "content": "two"}, + {"role": "user", "content": "three"}, + ] + # Persisted lags (FTS write failed) → keep the longer live copy. + out = _select_cached_agent_history(persisted, live) + assert out == live + assert out is not live # returns a copy, not the live list + + # Persisted is current/longer → leave it untouched (identity preserved). + longer_persisted = live + [{"role": "assistant", "content": "four"}] + out2 = _select_cached_agent_history(longer_persisted, live) + assert out2 is longer_persisted + + # No live transcript / not a list → no-op. + assert _select_cached_agent_history(persisted, None) is persisted + assert _select_cached_agent_history(persisted, "nope") is persisted