mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-26 11:12:03 +00:00
fix(state): detect and repair FTS write corruption that silently drops gateway history (#52798)
A readable state.db can still reject every message write through the messages_fts* triggers when the FTS5 index is corrupt: base-table reads and PRAGMA integrity_check pass, but INSERT INTO messages fails with 'database disk image is malformed'. The gateway reloads conversation_history from disk each turn, so a silently-failed write hands the next turn stale/empty history even though the same cached AIAgent still holds the live transcript — causing immediate same-session amnesia. (#50502) - hermes_state.py: _db_opens_cleanly() now drives a rolled-back message write through the FTS triggers, so write-only corruption (which the read-only probe reported healthy) is detected. repair_state_db_schema() gains an in-place FTS5 'rebuild' strategy (tier 0) before the dedup/drop tiers, plus an already_healthy short-circuit. Both 'hermes sessions repair' and 'hermes doctor' route through these, so the fix covers the whole class. - hermes_cli/doctor.py: the state.db check runs the write-health probe even on the success (readable) path and repairs in place with --fix. - gateway/run.py: _select_cached_agent_history() prefers the cached agent's longer live _session_messages over a shorter persisted transcript, so an FTS write failure can't wipe in-session context. - tests: regressions for write-health detection, in-place repair preserving rows + resuming writes, the already_healthy shortcut, and the gateway guard. Combines the approaches from #50504 (@0-CYBERDYNE-SYSTEMS-0, issue author), #52165 (@davidgut1982), and #50576 (@trevorgordon981).
This commit is contained in:
parent
85e084d60d
commit
0b7128582f
4 changed files with 285 additions and 13 deletions
|
|
@ -832,6 +832,28 @@ def _build_gateway_agent_history(
|
|||
return agent_history, observed_context
|
||||
|
||||
|
||||
def _select_cached_agent_history(
|
||||
persisted_history: List[Dict[str, Any]],
|
||||
live_history: Any,
|
||||
) -> List[Dict[str, Any]]:
|
||||
"""Prefer a cached agent's live in-memory transcript over a shorter
|
||||
persisted one.
|
||||
|
||||
Guards the FTS write-corruption case (#50502): when message writes fail
|
||||
silently through corrupt FTS triggers, the next turn reloads a stale/empty
|
||||
``conversation_history`` from disk even though the same cached ``AIAgent``
|
||||
still holds the full live ``_session_messages``. Replacing the live
|
||||
transcript with that shorter persisted copy causes immediate same-session
|
||||
amnesia. When the live transcript is strictly longer, keep it.
|
||||
|
||||
Returns ``persisted_history`` unchanged unless the live copy is a longer
|
||||
list, in which case a copy of the live transcript is returned.
|
||||
"""
|
||||
if isinstance(live_history, list) and len(live_history) > len(persisted_history):
|
||||
return list(live_history)
|
||||
return persisted_history
|
||||
|
||||
|
||||
def _wrap_current_message_with_observed_context(message: Any, observed_context: Optional[str]) -> Any:
|
||||
"""Prepend observed Telegram context to the API-only current user turn."""
|
||||
|
||||
|
|
@ -15875,6 +15897,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
|
|||
user_id_alt=getattr(source, "user_id_alt", None),
|
||||
)
|
||||
agent = None
|
||||
reused_cached_agent = False
|
||||
_cache_lock = getattr(self, "_agent_cache_lock", None)
|
||||
_cache = getattr(self, "_agent_cache", None)
|
||||
|
||||
|
|
@ -15933,6 +15956,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
|
|||
_xproc_evicted_agent = _ev_agent
|
||||
else:
|
||||
agent = cached[0]
|
||||
reused_cached_agent = True
|
||||
# Refresh LRU order so the cap enforcement evicts
|
||||
# truly-oldest entries, not the one we just used.
|
||||
if hasattr(_cache, "move_to_end"):
|
||||
|
|
@ -16217,6 +16241,26 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
|
|||
channel_prompt=channel_prompt,
|
||||
inject_timestamps=_message_timestamps_enabled(_load_gateway_config()),
|
||||
)
|
||||
|
||||
# FTS write-corruption guard (#50502): when message persistence
|
||||
# fails silently through corrupt FTS triggers, the reloaded
|
||||
# transcript above is stale/empty even though the SAME cached agent
|
||||
# still holds the full live conversation in `_session_messages`.
|
||||
# Replacing the live transcript with that shorter copy causes
|
||||
# immediate same-session amnesia. Only applies when we reused a
|
||||
# cached agent bound to this exact session_id.
|
||||
if reused_cached_agent and getattr(agent, "session_id", None) == session_id:
|
||||
_selected = _select_cached_agent_history(
|
||||
agent_history, getattr(agent, "_session_messages", None)
|
||||
)
|
||||
if _selected is not agent_history:
|
||||
logger.warning(
|
||||
"Persisted transcript lagged live cached history for "
|
||||
"session %s (disk=%d, memory=%d); preserving live "
|
||||
"conversation context (possible FTS write corruption)",
|
||||
session_key, len(agent_history), len(_selected),
|
||||
)
|
||||
agent_history = _selected
|
||||
|
||||
# Collect MEDIA paths already in history so we can exclude them
|
||||
# from the current turn's extraction. This is compression-safe:
|
||||
|
|
|
|||
|
|
@ -1202,6 +1202,46 @@ def run_doctor(args):
|
|||
count = cursor.fetchone()[0]
|
||||
conn.close()
|
||||
check_ok(f"{_DHH}/state.db exists ({count} sessions)")
|
||||
|
||||
# FTS write-health probe (#50502): `SELECT COUNT(*)` above succeeds
|
||||
# even when the FTS index is corrupt and every message write fails
|
||||
# through the triggers. `_db_opens_cleanly` now drives a rolled-back
|
||||
# write so this otherwise-silent corruption class is surfaced (and
|
||||
# repaired in place with --fix).
|
||||
from hermes_state import _db_opens_cleanly, repair_state_db_schema
|
||||
|
||||
_write_reason = _db_opens_cleanly(state_db_path)
|
||||
if _write_reason is not None:
|
||||
check_warn(
|
||||
f"{_DHH}/state.db fails a write-health probe (FTS index may be corrupt)",
|
||||
f"({_write_reason})",
|
||||
)
|
||||
if should_fix:
|
||||
report = repair_state_db_schema(state_db_path)
|
||||
if report.get("repaired"):
|
||||
backup_name = (
|
||||
Path(report["backup_path"]).name
|
||||
if report.get("backup_path") else "n/a"
|
||||
)
|
||||
check_ok(
|
||||
"Repaired state.db FTS write health",
|
||||
f"(strategy: {report.get('strategy')}; backup: {backup_name})",
|
||||
)
|
||||
fixed_count += 1
|
||||
else:
|
||||
check_warn(
|
||||
"state.db FTS write-health repair did not recover automatically",
|
||||
f"({report.get('error')}; backup: {report.get('backup_path')})",
|
||||
)
|
||||
issues.append(
|
||||
"state.db FTS write corruption and auto-repair failed — "
|
||||
"restore from the backup copy beside state.db"
|
||||
)
|
||||
else:
|
||||
issues.append(
|
||||
"state.db FTS write corruption — run 'hermes doctor --fix' "
|
||||
"(or 'hermes sessions repair') to rebuild the FTS index"
|
||||
)
|
||||
except Exception as e:
|
||||
from hermes_state import is_malformed_db_error, repair_state_db_schema
|
||||
|
||||
|
|
|
|||
|
|
@ -403,7 +403,11 @@ def _db_opens_cleanly(db_path: Path) -> Optional[str]:
|
|||
|
||||
Runs the same first-statement (``PRAGMA journal_mode``) that trips the
|
||||
malformed-schema parse, then ``PRAGMA integrity_check`` and a canonical
|
||||
``sessions`` read.
|
||||
``sessions`` read, and finally a rolled-back ``messages`` write so that
|
||||
FTS5 index corruption — which leaves base-table reads and
|
||||
``integrity_check`` passing while every ``INSERT INTO messages`` fails
|
||||
through the FTS triggers — is reported as unhealthy rather than slipping
|
||||
past as a false "ok" (#50502).
|
||||
"""
|
||||
conn = sqlite3.connect(str(db_path), isolation_level=None)
|
||||
try:
|
||||
|
|
@ -413,6 +417,36 @@ def _db_opens_cleanly(db_path: Path) -> Optional[str]:
|
|||
if problems:
|
||||
return "; ".join(problems[:3])
|
||||
conn.execute("SELECT COUNT(*) FROM sessions").fetchone()
|
||||
|
||||
# FTS write probe: drive a row through the messages_fts* triggers in a
|
||||
# transaction that is always rolled back, so a corrupt FTS index that
|
||||
# rejects writes is caught even though reads look healthy. The probe is
|
||||
# best-effort — if the messages/sessions tables don't exist yet (brand
|
||||
# new file mid-init) the OperationalError is treated as "not yet a
|
||||
# populated DB", not corruption.
|
||||
probe_session_id = f"_hermes_fts_health_probe_{time.time_ns()}"
|
||||
try:
|
||||
conn.execute("BEGIN IMMEDIATE")
|
||||
conn.execute(
|
||||
"INSERT INTO sessions (id, source, started_at) VALUES (?, ?, ?)",
|
||||
(probe_session_id, "_health_probe", time.time()),
|
||||
)
|
||||
conn.execute(
|
||||
"INSERT INTO messages (session_id, role, content, timestamp) "
|
||||
"VALUES (?, ?, ?, ?)",
|
||||
(probe_session_id, "user", "_fts_health_probe", time.time()),
|
||||
)
|
||||
conn.execute("ROLLBACK")
|
||||
except sqlite3.OperationalError as exc:
|
||||
# Missing tables / FTS disabled — not the corruption class we probe.
|
||||
try:
|
||||
conn.execute("ROLLBACK")
|
||||
except sqlite3.Error:
|
||||
pass
|
||||
msg = str(exc).lower()
|
||||
if "no such table" in msg or "no such column" in msg:
|
||||
return None
|
||||
return str(exc)
|
||||
return None
|
||||
except sqlite3.DatabaseError as exc:
|
||||
return str(exc)
|
||||
|
|
@ -421,16 +455,23 @@ def _db_opens_cleanly(db_path: Path) -> Optional[str]:
|
|||
|
||||
|
||||
def repair_state_db_schema(db_path: Path, *, backup: bool = True) -> Dict[str, Any]:
|
||||
"""Repair a state.db whose ``sqlite_master`` schema is malformed.
|
||||
"""Repair a state.db whose ``sqlite_master`` schema is malformed or whose
|
||||
FTS indexes reject writes.
|
||||
|
||||
Handles the "duplicate object definition" / malformed-schema class where
|
||||
even ``PRAGMA`` statements fail. Tries least-destructive recovery first
|
||||
and escalates:
|
||||
Handles two corruption classes: the "duplicate object definition" /
|
||||
malformed-schema class where even ``PRAGMA`` statements fail, and the FTS
|
||||
write-corruption class (#50502) where base tables read fine and
|
||||
``integrity_check`` passes but writes fail through the ``messages_fts*``
|
||||
triggers. Tries least-destructive recovery first and escalates:
|
||||
|
||||
1. **De-duplicate** ``sqlite_master`` (keep the lowest rowid per
|
||||
1. **Rebuild FTS indexes in place** via the FTS5 ``'rebuild'`` command,
|
||||
which rewrites the internal b-tree segments from the canonical
|
||||
``messages`` rows without dropping or recreating anything. Fixes the
|
||||
FTS write-corruption class while preserving the schema intact.
|
||||
2. **De-duplicate** ``sqlite_master`` (keep the lowest rowid per
|
||||
``type``/``name``). Fixes the canonical "table X already exists"
|
||||
case and PRESERVES the existing FTS index intact.
|
||||
2. **Drop the FTS schema** (every ``messages_fts*`` object) + ``VACUUM``.
|
||||
3. **Drop the FTS schema** (every ``messages_fts*`` object) + ``VACUUM``.
|
||||
The next ``SessionDB()`` open rebuilds the FTS indexes from the
|
||||
canonical ``messages`` table.
|
||||
|
||||
|
|
@ -452,10 +493,43 @@ def repair_state_db_schema(db_path: Path, *, backup: bool = True) -> Dict[str, A
|
|||
report["error"] = f"{db_path} does not exist"
|
||||
return report
|
||||
|
||||
if _db_opens_cleanly(db_path) is None:
|
||||
report["repaired"] = True
|
||||
report["strategy"] = "already_healthy"
|
||||
return report
|
||||
|
||||
if backup:
|
||||
bpath = _backup_db_file(db_path)
|
||||
report["backup_path"] = str(bpath) if bpath else None
|
||||
|
||||
# ── Strategy 0: rebuild FTS indexes in place (FTS write-corruption) ──
|
||||
# The FTS5 'rebuild' command rewrites the internal index from the canonical
|
||||
# content table. This is the recommended, least-destructive recovery for a
|
||||
# corrupt FTS index that rejects message writes while reads still succeed.
|
||||
try:
|
||||
conn = sqlite3.connect(str(db_path), isolation_level=None)
|
||||
try:
|
||||
for table_name in ("messages_fts", "messages_fts_trigram"):
|
||||
try:
|
||||
conn.execute(
|
||||
f"INSERT INTO {table_name}({table_name}) VALUES('rebuild')"
|
||||
)
|
||||
except sqlite3.OperationalError:
|
||||
# Table absent (FTS disabled / trigram off) — skip it.
|
||||
continue
|
||||
finally:
|
||||
conn.close()
|
||||
if _db_opens_cleanly(db_path) is None:
|
||||
report["repaired"] = True
|
||||
report["strategy"] = "rebuild_fts"
|
||||
logger.warning(
|
||||
"state.db FTS indexes rebuilt in place (schema preserved): %s",
|
||||
db_path,
|
||||
)
|
||||
return report
|
||||
except sqlite3.DatabaseError as exc:
|
||||
logger.warning("state.db FTS in-place rebuild pass failed: %s", exc)
|
||||
|
||||
# ── Strategy 1: de-duplicate sqlite_master (keeps FTS index) ──
|
||||
try:
|
||||
conn = sqlite3.connect(str(db_path), isolation_level=None)
|
||||
|
|
|
|||
|
|
@ -166,17 +166,29 @@ def test_strategy_b_rebuild_when_dedup_insufficient(tmp_path, monkeypatch):
|
|||
_build_healthy_db(db_path)
|
||||
_corrupt_duplicate_fts(db_path)
|
||||
|
||||
# Make the post-strat-1 verification report "still broken" exactly once,
|
||||
# so the routine escalates to strat 2 (drop FTS + VACUUM) and runs its
|
||||
# real SQL against the file; the strat-2 verification then uses the real
|
||||
# check and passes.
|
||||
# Make every health verification report "still broken" until the drop-FTS
|
||||
# pass has actually removed the messages_fts schema, so the routine
|
||||
# escalates past the in-place-rebuild and dedup passes to strat 2 (drop FTS
|
||||
# + VACUUM) and runs its real SQL against the file. Keyed on whether the FTS
|
||||
# schema is still present rather than a call counter, so it stays correct as
|
||||
# earlier verification call sites are added/removed.
|
||||
real_check = hermes_state._db_opens_cleanly
|
||||
calls = {"n": 0}
|
||||
|
||||
def flaky_check(path):
|
||||
calls["n"] += 1
|
||||
if calls["n"] == 1:
|
||||
return "pretend strat 1 was insufficient"
|
||||
try:
|
||||
probe = sqlite3.connect(str(path))
|
||||
still_has_fts = probe.execute(
|
||||
"SELECT COUNT(*) FROM sqlite_master "
|
||||
"WHERE name LIKE 'messages_fts%'"
|
||||
).fetchone()[0]
|
||||
probe.close()
|
||||
except sqlite3.DatabaseError:
|
||||
# sqlite_master still malformed (pre-dedup) — treat as broken.
|
||||
return "pretend still broken (schema unreadable)"
|
||||
if still_has_fts:
|
||||
return "pretend in-place/dedup passes were insufficient"
|
||||
return real_check(path)
|
||||
|
||||
monkeypatch.setattr(hermes_state, "_db_opens_cleanly", flaky_check)
|
||||
|
|
@ -242,3 +254,105 @@ def test_repair_on_clean_db_is_noop(tmp_path):
|
|||
assert conn.execute("SELECT COUNT(*) FROM messages").fetchone()[0] == 10
|
||||
assert conn.execute("PRAGMA integrity_check").fetchone()[0] == "ok"
|
||||
conn.close()
|
||||
|
||||
|
||||
# ── FTS write-corruption class (#50502) ──────────────────────────────────
|
||||
# A readable state.db can still reject every message write through the
|
||||
# messages_fts* triggers when the FTS index is corrupt. Plain
|
||||
# `SELECT COUNT(*)` reads succeed, so the old read-only health probe reported
|
||||
# it healthy and the gateway silently dropped conversation history.
|
||||
|
||||
|
||||
def _corrupt_fts_index_data(db_path: Path) -> None:
|
||||
"""Overwrite the FTS5 shadow b-tree blocks with garbage bytes.
|
||||
|
||||
Reproduces the runtime "database disk image is malformed" / "malformed
|
||||
inverted index for FTS5 table" failure that fires on writes through the
|
||||
triggers while base-table reads still return rows.
|
||||
"""
|
||||
conn = sqlite3.connect(str(db_path), isolation_level=None)
|
||||
conn.execute("UPDATE messages_fts_data SET block = X'DEADBEEFDEADBEEF'")
|
||||
conn.close()
|
||||
|
||||
|
||||
def test_fts_write_corruption_detected_by_write_probe(tmp_path):
|
||||
"""_db_opens_cleanly's rolled-back write probe flags FTS write corruption."""
|
||||
from hermes_state import _db_opens_cleanly
|
||||
|
||||
db_path = tmp_path / "state.db"
|
||||
_build_healthy_db(db_path)
|
||||
assert _db_opens_cleanly(db_path) is None # healthy before
|
||||
|
||||
_corrupt_fts_index_data(db_path)
|
||||
|
||||
# Plain base-table reads still succeed — this is the silent class.
|
||||
conn = sqlite3.connect(str(db_path), isolation_level=None)
|
||||
assert conn.execute("SELECT COUNT(*) FROM sessions").fetchone()[0] >= 1
|
||||
assert conn.execute("SELECT COUNT(*) FROM messages").fetchone()[0] == 10
|
||||
conn.close()
|
||||
|
||||
# The write-aware probe reports the corruption (not a false "ok").
|
||||
reason = _db_opens_cleanly(db_path)
|
||||
assert reason is not None
|
||||
|
||||
|
||||
def test_fts_write_corruption_repaired_in_place(tmp_path):
|
||||
"""repair_state_db_schema rebuilds the FTS index; reads + writes resume."""
|
||||
from hermes_state import _db_opens_cleanly
|
||||
|
||||
db_path = tmp_path / "state.db"
|
||||
_build_healthy_db(db_path)
|
||||
_corrupt_fts_index_data(db_path)
|
||||
|
||||
report = repair_state_db_schema(db_path)
|
||||
assert report["repaired"] is True
|
||||
assert report["strategy"] in ("rebuild_fts", "dedup_schema", "drop_fts_rebuild")
|
||||
assert _db_opens_cleanly(db_path) is None
|
||||
|
||||
# Canonical rows preserved AND new writes go through the triggers again.
|
||||
db = SessionDB(db_path=db_path)
|
||||
try:
|
||||
assert db._conn.execute("SELECT COUNT(*) FROM messages").fetchone()[0] == 10
|
||||
sid = db._conn.execute("SELECT id FROM sessions LIMIT 1").fetchone()[0]
|
||||
db.append_message(sid, role="user", content="post repair pizza message")
|
||||
assert db._conn.execute("SELECT COUNT(*) FROM messages").fetchone()[0] == 11
|
||||
hits = db._conn.execute(
|
||||
"SELECT COUNT(*) FROM messages_fts WHERE messages_fts MATCH 'pizza'"
|
||||
).fetchone()[0]
|
||||
assert hits >= 5
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
|
||||
def test_repair_noop_db_uses_already_healthy_shortcut(tmp_path):
|
||||
"""A healthy DB returns the cheap already_healthy strategy, no surgery."""
|
||||
db_path = tmp_path / "state.db"
|
||||
_build_healthy_db(db_path)
|
||||
report = repair_state_db_schema(db_path, backup=False)
|
||||
assert report["repaired"] is True
|
||||
assert report["strategy"] == "already_healthy"
|
||||
|
||||
|
||||
def test_select_cached_agent_history_prefers_longer_live_transcript():
|
||||
"""Gateway guard keeps the live transcript when persisted history lags."""
|
||||
from gateway.run import _select_cached_agent_history
|
||||
|
||||
persisted = [{"role": "user", "content": "only one"}]
|
||||
live = [
|
||||
{"role": "user", "content": "one"},
|
||||
{"role": "assistant", "content": "two"},
|
||||
{"role": "user", "content": "three"},
|
||||
]
|
||||
# Persisted lags (FTS write failed) → keep the longer live copy.
|
||||
out = _select_cached_agent_history(persisted, live)
|
||||
assert out == live
|
||||
assert out is not live # returns a copy, not the live list
|
||||
|
||||
# Persisted is current/longer → leave it untouched (identity preserved).
|
||||
longer_persisted = live + [{"role": "assistant", "content": "four"}]
|
||||
out2 = _select_cached_agent_history(longer_persisted, live)
|
||||
assert out2 is longer_persisted
|
||||
|
||||
# No live transcript / not a list → no-op.
|
||||
assert _select_cached_agent_history(persisted, None) is persisted
|
||||
assert _select_cached_agent_history(persisted, "nope") is persisted
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue