mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-18 04:41:56 +00:00
fix(sqlite): fall back to journal_mode=DELETE on NFS/SMB/FUSE (#22043)
SQLite's WAL mode requires shared-memory (mmap) coordination and fcntl byte-range locks that don't reliably work on network filesystems. Upstream documents this explicitly: https://www.sqlite.org/wal.html#sometimes_queries_return_sqlite_busy_in_wal_mode On NFS / SMB / some FUSE mounts / WSL1, 'PRAGMA journal_mode=WAL' raises 'sqlite3.OperationalError: locking protocol' (SQLITE_PROTOCOL). Before this change, every feature backed by state.db or kanban.db broke silently: - /resume, /title, /history, /branch returned 'Session database not available.' with no cause - gateway logged the init failure at DEBUG (invisible in errors.log) - kanban dispatcher crashed every 60s, driving the known migration race (duplicate column name: consecutive_failures, #21708 / #21374) Changes: - hermes_state.apply_wal_with_fallback(): shared helper that tries WAL and falls back to DELETE on SQLITE_PROTOCOL-style errors with one WARNING explaining why - hermes_state.get_last_init_error() + format_session_db_unavailable(): capture the init failure cause and surface it in user-facing strings (with an NFS/SMB pointer for 'locking protocol') - hermes_cli/kanban_db.connect(): use the shared helper - gateway/run.py: bump SessionDB init failure log DEBUG -> WARNING (matches cli.py's existing correct behavior) - cli.py (4 sites) + gateway/run.py (5 sites): replace bare 'Session database not available.' with format_session_db_unavailable() Tests: 12 new tests in tests/test_hermes_state_wal_fallback.py + 1 new test in tests/hermes_cli/test_kanban_db.py. Existing suites (state, kanban, gateway, cli) remain green for all tests unrelated to pre-existing failures on main. Evidence: real-world user on NFSv3 mount (172.26.224.200:d2dfac12/home, local_lock=none) reporting 'Session database not available.' on /resume; 'locking protocol' appears in 4 distinct log entries across backup, kanban, TUI, and CLI paths in the same session. closes #22032
This commit is contained in:
parent
ae005ec588
commit
2a7047c2ed
10 changed files with 584 additions and 32 deletions
12
cli.py
12
cli.py
|
|
@ -5463,7 +5463,8 @@ class HermesCLI:
|
||||||
return
|
return
|
||||||
|
|
||||||
if not self._session_db:
|
if not self._session_db:
|
||||||
_cprint(" Session database not available.")
|
from hermes_state import format_session_db_unavailable
|
||||||
|
_cprint(f" {format_session_db_unavailable()}")
|
||||||
return
|
return
|
||||||
|
|
||||||
# Resolve title or ID
|
# Resolve title or ID
|
||||||
|
|
@ -5574,7 +5575,8 @@ class HermesCLI:
|
||||||
return
|
return
|
||||||
|
|
||||||
if not self._session_db:
|
if not self._session_db:
|
||||||
_cprint(" Session database not available.")
|
from hermes_state import format_session_db_unavailable
|
||||||
|
_cprint(f" {format_session_db_unavailable()}")
|
||||||
return
|
return
|
||||||
|
|
||||||
parts = cmd_original.split(None, 1)
|
parts = cmd_original.split(None, 1)
|
||||||
|
|
@ -6850,7 +6852,8 @@ class HermesCLI:
|
||||||
self._pending_title = new_title
|
self._pending_title = new_title
|
||||||
_cprint(f" Session title queued: {new_title} (will be saved on first message)")
|
_cprint(f" Session title queued: {new_title} (will be saved on first message)")
|
||||||
else:
|
else:
|
||||||
_cprint(" Session database not available.")
|
from hermes_state import format_session_db_unavailable
|
||||||
|
_cprint(f" {format_session_db_unavailable()}")
|
||||||
else:
|
else:
|
||||||
_cprint(" Usage: /title <your session title>")
|
_cprint(" Usage: /title <your session title>")
|
||||||
else:
|
else:
|
||||||
|
|
@ -6865,7 +6868,8 @@ class HermesCLI:
|
||||||
else:
|
else:
|
||||||
_cprint(" No title set. Usage: /title <your session title>")
|
_cprint(" No title set. Usage: /title <your session title>")
|
||||||
else:
|
else:
|
||||||
_cprint(" Session database not available.")
|
from hermes_state import format_session_db_unavailable
|
||||||
|
_cprint(f" {format_session_db_unavailable()}")
|
||||||
elif canonical == "new":
|
elif canonical == "new":
|
||||||
parts = cmd_original.split(maxsplit=1)
|
parts = cmd_original.split(maxsplit=1)
|
||||||
title = parts[1].strip() if len(parts) > 1 else None
|
title = parts[1].strip() if len(parts) > 1 else None
|
||||||
|
|
|
||||||
|
|
@ -312,7 +312,12 @@ class ResponseStore:
|
||||||
self._conn = sqlite3.connect(db_path, check_same_thread=False)
|
self._conn = sqlite3.connect(db_path, check_same_thread=False)
|
||||||
except Exception:
|
except Exception:
|
||||||
self._conn = sqlite3.connect(":memory:", check_same_thread=False)
|
self._conn = sqlite3.connect(":memory:", check_same_thread=False)
|
||||||
self._conn.execute("PRAGMA journal_mode=WAL")
|
# Use shared WAL-fallback helper so response_store.db degrades
|
||||||
|
# gracefully on NFS/SMB/FUSE-mounted HERMES_HOME (same filesystem
|
||||||
|
# issue addressed for state.db/kanban.db — see
|
||||||
|
# hermes_state._WAL_INCOMPAT_MARKERS).
|
||||||
|
from hermes_state import apply_wal_with_fallback
|
||||||
|
apply_wal_with_fallback(self._conn, db_label="response_store.db")
|
||||||
self._conn.execute(
|
self._conn.execute(
|
||||||
"""CREATE TABLE IF NOT EXISTS responses (
|
"""CREATE TABLE IF NOT EXISTS responses (
|
||||||
response_id TEXT PRIMARY KEY,
|
response_id TEXT PRIMARY KEY,
|
||||||
|
|
|
||||||
|
|
@ -1218,7 +1218,13 @@ class GatewayRunner:
|
||||||
from hermes_state import SessionDB
|
from hermes_state import SessionDB
|
||||||
self._session_db = SessionDB()
|
self._session_db = SessionDB()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.debug("SQLite session store not available: %s", e)
|
# WARNING (not DEBUG) so the failure appears in errors.log — matches
|
||||||
|
# cli.py's handling of the same init path. Users hitting NFS-mounted
|
||||||
|
# HERMES_HOME silently lost /resume, /title, /history, /branch, and
|
||||||
|
# session search without this. The underlying cause (usually
|
||||||
|
# "locking protocol" from NFS) is now also captured by
|
||||||
|
# hermes_state.get_last_init_error() for slash-command error strings.
|
||||||
|
logger.warning("SQLite session store not available: %s", e)
|
||||||
|
|
||||||
# Opportunistic state.db maintenance: prune ended sessions older
|
# Opportunistic state.db maintenance: prune ended sessions older
|
||||||
# than sessions.retention_days + optional VACUUM. Tracks last-run
|
# than sessions.retention_days + optional VACUUM. Tracks last-run
|
||||||
|
|
@ -10374,7 +10380,8 @@ class GatewayRunner:
|
||||||
def _disable_telegram_topic_mode_for_chat(self, source: SessionSource) -> str:
|
def _disable_telegram_topic_mode_for_chat(self, source: SessionSource) -> str:
|
||||||
"""Cleanly disable topic mode for a chat via /topic off."""
|
"""Cleanly disable topic mode for a chat via /topic off."""
|
||||||
if not self._session_db:
|
if not self._session_db:
|
||||||
return "Session database not available."
|
from hermes_state import format_session_db_unavailable
|
||||||
|
return format_session_db_unavailable()
|
||||||
chat_id = str(source.chat_id or "")
|
chat_id = str(source.chat_id or "")
|
||||||
if not chat_id:
|
if not chat_id:
|
||||||
return "Could not determine chat ID."
|
return "Could not determine chat ID."
|
||||||
|
|
@ -10412,7 +10419,8 @@ class GatewayRunner:
|
||||||
if source.platform != Platform.TELEGRAM or source.chat_type != "dm":
|
if source.platform != Platform.TELEGRAM or source.chat_type != "dm":
|
||||||
return "The /topic command is only available in Telegram private chats."
|
return "The /topic command is only available in Telegram private chats."
|
||||||
if not self._session_db:
|
if not self._session_db:
|
||||||
return "Session database not available."
|
from hermes_state import format_session_db_unavailable
|
||||||
|
return format_session_db_unavailable()
|
||||||
|
|
||||||
# Authorization: /topic activates multi-session mode and mutates
|
# Authorization: /topic activates multi-session mode and mutates
|
||||||
# SQLite side tables. Unauthorized senders (not in allowlist) must
|
# SQLite side tables. Unauthorized senders (not in allowlist) must
|
||||||
|
|
@ -10626,7 +10634,8 @@ class GatewayRunner:
|
||||||
session_id = session_entry.session_id
|
session_id = session_entry.session_id
|
||||||
|
|
||||||
if not self._session_db:
|
if not self._session_db:
|
||||||
return "Session database not available."
|
from hermes_state import format_session_db_unavailable
|
||||||
|
return format_session_db_unavailable()
|
||||||
|
|
||||||
# Ensure session exists in SQLite DB (it may only exist in session_store
|
# Ensure session exists in SQLite DB (it may only exist in session_store
|
||||||
# if this is the first command in a new session)
|
# if this is the first command in a new session)
|
||||||
|
|
@ -10670,7 +10679,8 @@ class GatewayRunner:
|
||||||
async def _handle_resume_command(self, event: MessageEvent) -> str:
|
async def _handle_resume_command(self, event: MessageEvent) -> str:
|
||||||
"""Handle /resume command — switch to a previously-named session."""
|
"""Handle /resume command — switch to a previously-named session."""
|
||||||
if not self._session_db:
|
if not self._session_db:
|
||||||
return "Session database not available."
|
from hermes_state import format_session_db_unavailable
|
||||||
|
return format_session_db_unavailable()
|
||||||
|
|
||||||
source = event.source
|
source = event.source
|
||||||
session_key = self._session_key_for_source(source)
|
session_key = self._session_key_for_source(source)
|
||||||
|
|
@ -10757,7 +10767,8 @@ class GatewayRunner:
|
||||||
import uuid as _uuid
|
import uuid as _uuid
|
||||||
|
|
||||||
if not self._session_db:
|
if not self._session_db:
|
||||||
return "Session database not available."
|
from hermes_state import format_session_db_unavailable
|
||||||
|
return format_session_db_unavailable()
|
||||||
|
|
||||||
source = event.source
|
source = event.source
|
||||||
session_key = self._session_key_for_source(source)
|
session_key = self._session_key_for_source(source)
|
||||||
|
|
|
||||||
|
|
@ -917,7 +917,11 @@ def connect(
|
||||||
needs_init = resolved not in _INITIALIZED_PATHS
|
needs_init = resolved not in _INITIALIZED_PATHS
|
||||||
conn = sqlite3.connect(str(path), isolation_level=None, timeout=30)
|
conn = sqlite3.connect(str(path), isolation_level=None, timeout=30)
|
||||||
conn.row_factory = sqlite3.Row
|
conn.row_factory = sqlite3.Row
|
||||||
conn.execute("PRAGMA journal_mode=WAL")
|
# WAL doesn't work on network filesystems (NFS/SMB/FUSE). Shared helper
|
||||||
|
# falls back to DELETE with one WARNING so kanban stays usable there.
|
||||||
|
# See hermes_state._WAL_INCOMPAT_MARKERS for detection logic.
|
||||||
|
from hermes_state import apply_wal_with_fallback
|
||||||
|
apply_wal_with_fallback(conn, db_label=f"kanban.db ({path.name})")
|
||||||
conn.execute("PRAGMA synchronous=NORMAL")
|
conn.execute("PRAGMA synchronous=NORMAL")
|
||||||
conn.execute("PRAGMA foreign_keys=ON")
|
conn.execute("PRAGMA foreign_keys=ON")
|
||||||
if needs_init:
|
if needs_init:
|
||||||
|
|
|
||||||
196
hermes_state.py
196
hermes_state.py
|
|
@ -35,6 +35,153 @@ DEFAULT_DB_PATH = get_hermes_home() / "state.db"
|
||||||
|
|
||||||
SCHEMA_VERSION = 11
|
SCHEMA_VERSION = 11
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# WAL-compatibility fallback
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# SQLite's WAL mode requires shared-memory (mmap) coordination and fcntl
|
||||||
|
# byte-range locks that don't reliably work on network filesystems (NFS,
|
||||||
|
# SMB/CIFS, some FUSE mounts, WSL1). Upstream documents this explicitly:
|
||||||
|
# https://www.sqlite.org/wal.html#sometimes_queries_return_sqlite_busy_in_wal_mode
|
||||||
|
#
|
||||||
|
# On those filesystems ``PRAGMA journal_mode=WAL`` raises
|
||||||
|
# ``sqlite3.OperationalError: locking protocol`` (SQLITE_PROTOCOL). If we
|
||||||
|
# propagate that, every feature backed by state.db / kanban.db breaks
|
||||||
|
# silently — /resume, /title, /history, /branch, kanban dispatcher, etc.
|
||||||
|
#
|
||||||
|
# Instead, fall back to ``journal_mode=DELETE`` (the pre-WAL default) which
|
||||||
|
# works on NFS. Concurrency drops — concurrent readers are blocked during
|
||||||
|
# a write — but the feature works.
|
||||||
|
_WAL_INCOMPAT_MARKERS = (
|
||||||
|
"locking protocol", # SQLITE_PROTOCOL on NFS/SMB
|
||||||
|
"not authorized", # Some FUSE mounts block WAL pragma outright
|
||||||
|
"disk i/o error", # Flaky network FS during WAL setup
|
||||||
|
)
|
||||||
|
|
||||||
|
# Last SessionDB() init error, per-process. Surfaced in /resume and
|
||||||
|
# related slash-command error strings so users know WHY the DB is
|
||||||
|
# unavailable instead of getting a bare "Session database not available."
|
||||||
|
# Only SessionDB.__init__ writes to this; kanban_db.connect() failures
|
||||||
|
# do not update it (by design — kanban failures are reported via their
|
||||||
|
# own caller's error handling, not via /resume-style slash commands).
|
||||||
|
_last_init_error: Optional[str] = None
|
||||||
|
_last_init_error_lock = threading.Lock()
|
||||||
|
|
||||||
|
# Paths for which we've already logged a WAL-fallback WARNING. Without
|
||||||
|
# this, kanban_db.connect() (called on every kanban operation — see
|
||||||
|
# hermes_cli/kanban_db.py for ~30 call sites) would re-log the same
|
||||||
|
# filesystem-incompat warning on every connection, filling errors.log.
|
||||||
|
_wal_fallback_warned_paths: set[str] = set()
|
||||||
|
_wal_fallback_warned_lock = threading.Lock()
|
||||||
|
|
||||||
|
|
||||||
|
def _set_last_init_error(msg: Optional[str]) -> None:
|
||||||
|
"""Record (or clear) the most recent state.db init failure.
|
||||||
|
|
||||||
|
Thread-safe via _last_init_error_lock. Callers pass a message to
|
||||||
|
record a failure or None to clear. SessionDB.__init__ only calls
|
||||||
|
this to SET on failure — it deliberately does NOT clear on success,
|
||||||
|
because in a multi-threaded caller (e.g. gateway / web_server per-
|
||||||
|
request SessionDB() instantiation), a concurrent successful open
|
||||||
|
racing past a different thread's failure would erase the cause
|
||||||
|
string that thread's /resume handler is about to format. Explicit
|
||||||
|
clears (e.g. test fixtures) are still supported by passing None.
|
||||||
|
"""
|
||||||
|
global _last_init_error
|
||||||
|
with _last_init_error_lock:
|
||||||
|
_last_init_error = msg
|
||||||
|
|
||||||
|
|
||||||
|
def get_last_init_error() -> Optional[str]:
|
||||||
|
"""Return the most recent state.db init failure, if any.
|
||||||
|
|
||||||
|
Slash-command handlers (``/resume``, ``/title``, ``/history``, ``/branch``)
|
||||||
|
call this to surface the underlying cause in their error messages when
|
||||||
|
``_session_db is None``. Returns ``None`` if SessionDB initialized
|
||||||
|
successfully (or hasn't been attempted).
|
||||||
|
"""
|
||||||
|
return _last_init_error
|
||||||
|
|
||||||
|
|
||||||
|
def format_session_db_unavailable(prefix: str = "Session database not available") -> str:
|
||||||
|
"""Format a user-facing 'session DB unavailable' message with cause.
|
||||||
|
|
||||||
|
When ``SessionDB()`` init fails, callers set ``_session_db = None`` and
|
||||||
|
several slash commands (/resume, /title, /history, /branch) previously
|
||||||
|
responded with a bare ``"Session database not available."`` — no
|
||||||
|
indication of WHY. This helper includes the captured cause (typically
|
||||||
|
``"locking protocol"`` from NFS/SMB) and points users at the known
|
||||||
|
culprit so they can fix it themselves.
|
||||||
|
|
||||||
|
Example output:
|
||||||
|
Session database not available: locking protocol (state.db may be
|
||||||
|
on NFS/SMB — see https://www.sqlite.org/wal.html).
|
||||||
|
"""
|
||||||
|
cause = get_last_init_error()
|
||||||
|
if not cause:
|
||||||
|
return f"{prefix}."
|
||||||
|
hint = ""
|
||||||
|
if any(marker in cause.lower() for marker in _WAL_INCOMPAT_MARKERS):
|
||||||
|
hint = " (state.db may be on NFS/SMB/FUSE — see https://www.sqlite.org/wal.html)"
|
||||||
|
return f"{prefix}: {cause}{hint}."
|
||||||
|
|
||||||
|
|
||||||
|
def apply_wal_with_fallback(
|
||||||
|
conn: sqlite3.Connection,
|
||||||
|
*,
|
||||||
|
db_label: str = "state.db",
|
||||||
|
) -> str:
|
||||||
|
"""Set ``journal_mode=WAL`` on ``conn``, falling back to DELETE on failure.
|
||||||
|
|
||||||
|
Returns the journal mode actually set (``"wal"`` or ``"delete"``).
|
||||||
|
|
||||||
|
On WAL-incompatible filesystems (NFS, SMB, some FUSE), SQLite raises
|
||||||
|
``OperationalError("locking protocol")`` when setting WAL. We fall
|
||||||
|
back to DELETE mode — the pre-WAL default, which works on NFS — and
|
||||||
|
log one WARNING explaining why.
|
||||||
|
|
||||||
|
The WARNING is deduplicated per ``db_label``: repeated connections
|
||||||
|
to the same underlying DB (e.g. kanban_db.connect() which is called
|
||||||
|
on every kanban operation) log once per process, not once per call.
|
||||||
|
Different db_labels log independently, so state.db and kanban.db
|
||||||
|
each get one warning on the same NFS mount.
|
||||||
|
|
||||||
|
Shared by :class:`SessionDB` and ``hermes_cli.kanban_db.connect`` so
|
||||||
|
both databases get identical fallback behavior.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
conn.execute("PRAGMA journal_mode=WAL")
|
||||||
|
return "wal"
|
||||||
|
except sqlite3.OperationalError as exc:
|
||||||
|
msg = str(exc).lower()
|
||||||
|
if not any(marker in msg for marker in _WAL_INCOMPAT_MARKERS):
|
||||||
|
# Unrelated OperationalError — don't silently swallow.
|
||||||
|
raise
|
||||||
|
_log_wal_fallback_once(db_label, exc)
|
||||||
|
conn.execute("PRAGMA journal_mode=DELETE")
|
||||||
|
return "delete"
|
||||||
|
|
||||||
|
|
||||||
|
def _log_wal_fallback_once(db_label: str, exc: Exception) -> None:
|
||||||
|
"""Log a single WARNING per (process, db_label) about WAL fallback.
|
||||||
|
|
||||||
|
Without this dedup, NFS users running kanban (which opens a fresh
|
||||||
|
connection on every operation — see hermes_cli/kanban_db.py) would
|
||||||
|
fill errors.log with hundreds of identical warnings per hour.
|
||||||
|
"""
|
||||||
|
with _wal_fallback_warned_lock:
|
||||||
|
if db_label in _wal_fallback_warned_paths:
|
||||||
|
return
|
||||||
|
_wal_fallback_warned_paths.add(db_label)
|
||||||
|
logger.warning(
|
||||||
|
"%s: WAL journal_mode unsupported on this filesystem (%s) — "
|
||||||
|
"falling back to journal_mode=DELETE (slower rollback-journal "
|
||||||
|
"mode; reduces concurrency but works on NFS/SMB/FUSE). See "
|
||||||
|
"https://www.sqlite.org/wal.html for details. This warning "
|
||||||
|
"fires once per process per database.",
|
||||||
|
db_label,
|
||||||
|
exc,
|
||||||
|
)
|
||||||
|
|
||||||
SCHEMA_SQL = """
|
SCHEMA_SQL = """
|
||||||
CREATE TABLE IF NOT EXISTS schema_version (
|
CREATE TABLE IF NOT EXISTS schema_version (
|
||||||
version INTEGER NOT NULL
|
version INTEGER NOT NULL
|
||||||
|
|
@ -185,23 +332,40 @@ class SessionDB:
|
||||||
|
|
||||||
self._lock = threading.Lock()
|
self._lock = threading.Lock()
|
||||||
self._write_count = 0
|
self._write_count = 0
|
||||||
self._conn = sqlite3.connect(
|
try:
|
||||||
str(self.db_path),
|
self._conn = sqlite3.connect(
|
||||||
check_same_thread=False,
|
str(self.db_path),
|
||||||
# Short timeout — application-level retry with random jitter
|
check_same_thread=False,
|
||||||
# handles contention instead of sitting in SQLite's internal
|
# Short timeout — application-level retry with random jitter
|
||||||
# busy handler for up to 30s.
|
# handles contention instead of sitting in SQLite's internal
|
||||||
timeout=1.0,
|
# busy handler for up to 30s.
|
||||||
# Autocommit mode: Python's default isolation_level="" auto-starts
|
timeout=1.0,
|
||||||
# transactions on DML, which conflicts with our explicit
|
# Autocommit mode: Python's default isolation_level=""
|
||||||
# BEGIN IMMEDIATE. None = we manage transactions ourselves.
|
# auto-starts transactions on DML, which conflicts with our
|
||||||
isolation_level=None,
|
# explicit BEGIN IMMEDIATE. None = we manage transactions
|
||||||
)
|
# ourselves.
|
||||||
self._conn.row_factory = sqlite3.Row
|
isolation_level=None,
|
||||||
self._conn.execute("PRAGMA journal_mode=WAL")
|
)
|
||||||
self._conn.execute("PRAGMA foreign_keys=ON")
|
self._conn.row_factory = sqlite3.Row
|
||||||
|
apply_wal_with_fallback(self._conn, db_label="state.db")
|
||||||
|
self._conn.execute("PRAGMA foreign_keys=ON")
|
||||||
|
|
||||||
self._init_schema()
|
self._init_schema()
|
||||||
|
except Exception as exc:
|
||||||
|
# Capture the cause so /resume and friends can surface WHY the
|
||||||
|
# session DB is unavailable instead of a bare "Session database
|
||||||
|
# not available." Callers that catch this exception keep their
|
||||||
|
# existing ``self._session_db = None`` degradation path.
|
||||||
|
#
|
||||||
|
# Note: we deliberately do NOT clear _last_init_error on the
|
||||||
|
# success path (no else branch). In multi-threaded callers
|
||||||
|
# (gateway, web_server per-request SessionDB()), a concurrent
|
||||||
|
# successful open racing past this failure would erase the
|
||||||
|
# cause that another thread's /resume is about to format.
|
||||||
|
# Tests that need to reset the state can call
|
||||||
|
# ``hermes_state._set_last_init_error(None)`` explicitly.
|
||||||
|
_set_last_init_error(f"{type(exc).__name__}: {exc}")
|
||||||
|
raise
|
||||||
|
|
||||||
# ── Core write helper ──
|
# ── Core write helper ──
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -127,7 +127,11 @@ class MemoryStore:
|
||||||
|
|
||||||
def _init_db(self) -> None:
|
def _init_db(self) -> None:
|
||||||
"""Create tables, indexes, and triggers if they do not exist. Enable WAL mode."""
|
"""Create tables, indexes, and triggers if they do not exist. Enable WAL mode."""
|
||||||
self._conn.execute("PRAGMA journal_mode=WAL")
|
# Use the shared WAL-fallback helper so memory_store.db degrades
|
||||||
|
# gracefully on NFS/SMB/FUSE-mounted HERMES_HOME (same issue as
|
||||||
|
# state.db / kanban.db — see hermes_state._WAL_INCOMPAT_MARKERS).
|
||||||
|
from hermes_state import apply_wal_with_fallback
|
||||||
|
apply_wal_with_fallback(self._conn, db_label="memory_store.db (holographic)")
|
||||||
self._conn.executescript(_SCHEMA)
|
self._conn.executescript(_SCHEMA)
|
||||||
# Migrate: add hrr_vector column if missing (safe for existing databases)
|
# Migrate: add hrr_vector column if missing (safe for existing databases)
|
||||||
columns = {row[1] for row in self._conn.execute("PRAGMA table_info(facts)").fetchall()}
|
columns = {row[1] for row in self._conn.execute("PRAGMA table_info(facts)").fetchall()}
|
||||||
|
|
|
||||||
|
|
@ -9869,7 +9869,8 @@ class AIAgent:
|
||||||
)
|
)
|
||||||
elif function_name == "session_search":
|
elif function_name == "session_search":
|
||||||
if not self._session_db:
|
if not self._session_db:
|
||||||
return json.dumps({"success": False, "error": "Session database not available."})
|
from hermes_state import format_session_db_unavailable
|
||||||
|
return json.dumps({"success": False, "error": format_session_db_unavailable()})
|
||||||
from tools.session_search_tool import session_search as _session_search
|
from tools.session_search_tool import session_search as _session_search
|
||||||
return _session_search(
|
return _session_search(
|
||||||
query=function_args.get("query", ""),
|
query=function_args.get("query", ""),
|
||||||
|
|
@ -10492,7 +10493,8 @@ class AIAgent:
|
||||||
self._vprint(f" {_get_cute_tool_message_impl('todo', function_args, tool_duration, result=function_result)}")
|
self._vprint(f" {_get_cute_tool_message_impl('todo', function_args, tool_duration, result=function_result)}")
|
||||||
elif function_name == "session_search":
|
elif function_name == "session_search":
|
||||||
if not self._session_db:
|
if not self._session_db:
|
||||||
function_result = json.dumps({"success": False, "error": "Session database not available."})
|
from hermes_state import format_session_db_unavailable
|
||||||
|
function_result = json.dumps({"success": False, "error": format_session_db_unavailable()})
|
||||||
else:
|
else:
|
||||||
from tools.session_search_tool import session_search as _session_search
|
from tools.session_search_tool import session_search as _session_search
|
||||||
function_result = _session_search(
|
function_result = _session_search(
|
||||||
|
|
|
||||||
|
|
@ -914,3 +914,55 @@ def test_latest_summaries_batch_omits_tasks_without_summary(kanban_home):
|
||||||
assert out == {t1: "alpha", t3: "charlie"}
|
assert out == {t1: "alpha", t3: "charlie"}
|
||||||
# Empty input → empty dict, no SQL syntax error from "IN ()".
|
# Empty input → empty dict, no SQL syntax error from "IN ()".
|
||||||
assert kb.latest_summaries(conn, []) == {}
|
assert kb.latest_summaries(conn, []) == {}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# NFS / network-filesystem fallback (see hermes_state.apply_wal_with_fallback)
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def test_connect_falls_back_to_delete_on_locking_protocol(kanban_home, caplog):
|
||||||
|
"""kanban_db.connect() must handle ``locking protocol`` on NFS/SMB.
|
||||||
|
|
||||||
|
Without this fallback, the gateway's kanban dispatcher crashes every
|
||||||
|
60s and the kanban migration (``consecutive_failures`` ADD COLUMN) is
|
||||||
|
retried forever — which is what the real-world user report shows
|
||||||
|
(see hermes-agent issue #22032).
|
||||||
|
"""
|
||||||
|
import sqlite3 as _sqlite3
|
||||||
|
from unittest.mock import patch as _patch
|
||||||
|
|
||||||
|
# Clear module cache so a fresh connect() is attempted
|
||||||
|
kb._INITIALIZED_PATHS.clear()
|
||||||
|
|
||||||
|
real_connect = _sqlite3.connect
|
||||||
|
|
||||||
|
class _WalBlockingConnection(_sqlite3.Connection):
|
||||||
|
def execute(self, sql, *args, **kwargs): # type: ignore[override]
|
||||||
|
if "journal_mode=wal" in sql.lower().replace(" ", ""):
|
||||||
|
raise _sqlite3.OperationalError("locking protocol")
|
||||||
|
return super().execute(sql, *args, **kwargs)
|
||||||
|
|
||||||
|
def wal_blocking_connect(*args, **kwargs):
|
||||||
|
return real_connect(
|
||||||
|
*args, factory=_WalBlockingConnection, **kwargs
|
||||||
|
)
|
||||||
|
|
||||||
|
with _patch("hermes_cli.kanban_db.sqlite3.connect", side_effect=wal_blocking_connect):
|
||||||
|
with caplog.at_level("WARNING", logger="hermes_state"):
|
||||||
|
conn = kb.connect()
|
||||||
|
|
||||||
|
# One fallback warning, naming kanban.db
|
||||||
|
warnings = [
|
||||||
|
r for r in caplog.records
|
||||||
|
if r.levelname == "WARNING" and "kanban.db" in r.getMessage()
|
||||||
|
]
|
||||||
|
assert len(warnings) >= 1, (
|
||||||
|
f"Expected a kanban.db WARNING, got: {[r.getMessage() for r in caplog.records]}"
|
||||||
|
)
|
||||||
|
|
||||||
|
# DB still usable end-to-end — create + list a task
|
||||||
|
t = kb.create_task(conn, title="post-fallback task")
|
||||||
|
tasks = kb.list_tasks(conn)
|
||||||
|
assert any(row.id == t for row in tasks)
|
||||||
|
conn.close()
|
||||||
|
|
|
||||||
305
tests/test_hermes_state_wal_fallback.py
Normal file
305
tests/test_hermes_state_wal_fallback.py
Normal file
|
|
@ -0,0 +1,305 @@
|
||||||
|
"""Tests for the WAL→DELETE journal-mode fallback on NFS / SMB / FUSE.
|
||||||
|
|
||||||
|
When ``PRAGMA journal_mode=WAL`` raises ``OperationalError("locking protocol")``
|
||||||
|
(SQLITE_PROTOCOL — typical on NFS/SMB), Hermes must fall back to
|
||||||
|
``journal_mode=DELETE`` so ``state.db`` / ``kanban.db`` remain usable.
|
||||||
|
|
||||||
|
Without this fallback, users on NFS-mounted ``HERMES_HOME`` silently lose
|
||||||
|
``/resume``, ``/title``, ``/history``, ``/branch``, session search, and the
|
||||||
|
kanban dispatcher — because ``SessionDB()`` init propagates the error and
|
||||||
|
every caller swallows it, leaving ``_session_db = None``.
|
||||||
|
|
||||||
|
See: https://www.sqlite.org/wal.html — "WAL does not work over a network
|
||||||
|
filesystem".
|
||||||
|
"""
|
||||||
|
|
||||||
|
import sqlite3
|
||||||
|
from unittest.mock import patch
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
import hermes_state
|
||||||
|
from hermes_state import (
|
||||||
|
SessionDB,
|
||||||
|
apply_wal_with_fallback,
|
||||||
|
format_session_db_unavailable,
|
||||||
|
get_last_init_error,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ``sqlite3.Connection.execute`` is a C-level slot and can't be monkeypatched
|
||||||
|
# directly (``'sqlite3.Connection' object attribute 'execute' is read-only``).
|
||||||
|
# A factory-built subclass lets us intercept journal_mode=WAL per-test with
|
||||||
|
# its own mutable counter, avoiding the xdist-parallel class-state race.
|
||||||
|
def _make_blocking_factory(reason: str, attempt_counter: list):
|
||||||
|
"""Return a sqlite3.Connection subclass that raises on PRAGMA journal_mode=WAL."""
|
||||||
|
|
||||||
|
class _WalBlockingConnection(sqlite3.Connection):
|
||||||
|
def execute(self, sql, *args, **kwargs): # type: ignore[override]
|
||||||
|
if "journal_mode=wal" in sql.lower().replace(" ", ""):
|
||||||
|
attempt_counter[0] += 1
|
||||||
|
raise sqlite3.OperationalError(reason)
|
||||||
|
return super().execute(sql, *args, **kwargs)
|
||||||
|
|
||||||
|
return _WalBlockingConnection
|
||||||
|
|
||||||
|
|
||||||
|
def _open_blocking(path, reason="locking protocol", **kwargs):
|
||||||
|
"""Open a connection whose WAL pragma raises ``reason``.
|
||||||
|
|
||||||
|
Returns ``(conn, attempt_counter_list)`` so callers can assert how many
|
||||||
|
times WAL was attempted.
|
||||||
|
"""
|
||||||
|
attempts = [0]
|
||||||
|
factory = _make_blocking_factory(reason, attempts)
|
||||||
|
return sqlite3.connect(str(path), factory=factory, **kwargs), attempts
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(autouse=True)
|
||||||
|
def _reset_last_init_error():
|
||||||
|
"""Reset the module-global last-error before and after each test."""
|
||||||
|
hermes_state._set_last_init_error(None)
|
||||||
|
yield
|
||||||
|
hermes_state._set_last_init_error(None)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(autouse=True)
|
||||||
|
def _reset_wal_fallback_warned_paths():
|
||||||
|
"""Reset the WAL-fallback warned-paths set so dedup doesn't leak between tests."""
|
||||||
|
hermes_state._wal_fallback_warned_paths.clear()
|
||||||
|
yield
|
||||||
|
hermes_state._wal_fallback_warned_paths.clear()
|
||||||
|
|
||||||
|
|
||||||
|
class TestApplyWalWithFallback:
|
||||||
|
def test_succeeds_on_local_fs(self, tmp_path):
|
||||||
|
"""Happy path: WAL works on a normal filesystem."""
|
||||||
|
conn = sqlite3.connect(str(tmp_path / "ok.db"), isolation_level=None)
|
||||||
|
mode = apply_wal_with_fallback(conn)
|
||||||
|
assert mode == "wal"
|
||||||
|
cur = conn.execute("PRAGMA journal_mode")
|
||||||
|
assert cur.fetchone()[0].lower() == "wal"
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
def test_falls_back_to_delete_on_locking_protocol(self, tmp_path, caplog):
|
||||||
|
"""NFS-style ``locking protocol`` error → DELETE mode + one WARNING."""
|
||||||
|
conn, _ = _open_blocking(tmp_path / "nfs.db", isolation_level=None)
|
||||||
|
with caplog.at_level("WARNING", logger="hermes_state"):
|
||||||
|
mode = apply_wal_with_fallback(conn, db_label="test.db")
|
||||||
|
|
||||||
|
assert mode == "delete"
|
||||||
|
warnings = [r for r in caplog.records if r.levelname == "WARNING"]
|
||||||
|
assert len(warnings) == 1
|
||||||
|
msg = warnings[0].getMessage()
|
||||||
|
assert "test.db" in msg
|
||||||
|
assert "journal_mode=DELETE" in msg
|
||||||
|
assert "locking protocol" in msg
|
||||||
|
|
||||||
|
# Post-fallback the DB is still usable for real writes
|
||||||
|
conn.execute("CREATE TABLE t (x INTEGER)")
|
||||||
|
conn.execute("INSERT INTO t VALUES (1)")
|
||||||
|
assert list(conn.execute("SELECT x FROM t"))[0][0] == 1
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
def test_falls_back_on_not_authorized(self, tmp_path):
|
||||||
|
"""Some FUSE mounts block WAL pragma outright ('not authorized')."""
|
||||||
|
conn, _ = _open_blocking(
|
||||||
|
tmp_path / "fuse.db", reason="not authorized", isolation_level=None
|
||||||
|
)
|
||||||
|
mode = apply_wal_with_fallback(conn)
|
||||||
|
assert mode == "delete"
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
def test_falls_back_on_disk_io_error(self, tmp_path):
|
||||||
|
"""Flaky network FS → disk I/O error → still fall back."""
|
||||||
|
conn, _ = _open_blocking(
|
||||||
|
tmp_path / "flaky.db", reason="disk I/O error", isolation_level=None
|
||||||
|
)
|
||||||
|
mode = apply_wal_with_fallback(conn)
|
||||||
|
assert mode == "delete"
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
def test_reraises_unrelated_operational_error(self, tmp_path):
|
||||||
|
"""Non-WAL-compat errors must NOT be silently swallowed by the fallback."""
|
||||||
|
conn, _ = _open_blocking(
|
||||||
|
tmp_path / "other.db",
|
||||||
|
reason="no such table: nope",
|
||||||
|
isolation_level=None,
|
||||||
|
)
|
||||||
|
with pytest.raises(sqlite3.OperationalError, match="no such table"):
|
||||||
|
apply_wal_with_fallback(conn)
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
def test_warning_deduplicated_per_db_label(self, tmp_path, caplog):
|
||||||
|
"""Repeated calls with the same db_label log exactly ONE warning.
|
||||||
|
|
||||||
|
Prevents log spam when NFS users run kanban (which opens a fresh
|
||||||
|
connection on every operation — see hermes_cli/kanban_db.py).
|
||||||
|
Regression guard: the fix for #22032 ran apply_wal_with_fallback()
|
||||||
|
on every kb.connect() call; without dedup, errors.log fills with
|
||||||
|
hundreds of identical warnings per hour.
|
||||||
|
"""
|
||||||
|
with caplog.at_level("WARNING", logger="hermes_state"):
|
||||||
|
# Three separate connections to "the same DB" via the same label
|
||||||
|
for i in range(3):
|
||||||
|
conn, _ = _open_blocking(
|
||||||
|
tmp_path / f"dup-{i}.db", isolation_level=None
|
||||||
|
)
|
||||||
|
mode = apply_wal_with_fallback(conn, db_label="shared.db")
|
||||||
|
assert mode == "delete"
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
# Exactly one warning across all three calls
|
||||||
|
warnings = [
|
||||||
|
r for r in caplog.records
|
||||||
|
if r.levelname == "WARNING" and "shared.db" in r.getMessage()
|
||||||
|
]
|
||||||
|
assert len(warnings) == 1, (
|
||||||
|
f"Expected 1 deduplicated warning, got {len(warnings)}: "
|
||||||
|
f"{[r.getMessage() for r in warnings]}"
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_warning_fires_independently_per_db_label(self, tmp_path, caplog):
|
||||||
|
"""Different db_labels each get their own one warning (not globally dedup'd)."""
|
||||||
|
with caplog.at_level("WARNING", logger="hermes_state"):
|
||||||
|
conn1, _ = _open_blocking(tmp_path / "a.db", isolation_level=None)
|
||||||
|
apply_wal_with_fallback(conn1, db_label="state.db")
|
||||||
|
conn1.close()
|
||||||
|
|
||||||
|
conn2, _ = _open_blocking(tmp_path / "b.db", isolation_level=None)
|
||||||
|
apply_wal_with_fallback(conn2, db_label="kanban.db")
|
||||||
|
conn2.close()
|
||||||
|
|
||||||
|
warnings = [r for r in caplog.records if r.levelname == "WARNING"]
|
||||||
|
labels_warned = {
|
||||||
|
lbl for r in warnings for lbl in ("state.db", "kanban.db")
|
||||||
|
if lbl in r.getMessage()
|
||||||
|
}
|
||||||
|
assert labels_warned == {"state.db", "kanban.db"}, (
|
||||||
|
f"Each db_label should warn once; got {labels_warned}"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class TestGetLastInitError:
|
||||||
|
def test_none_on_successful_init(self, tmp_path):
|
||||||
|
"""Happy-path SessionDB init does NOT clear a stale error from a prior thread.
|
||||||
|
|
||||||
|
We deliberately don't clear on success so that in multi-threaded
|
||||||
|
callers (gateway / web_server per-request SessionDB()), a concurrent
|
||||||
|
successful open racing past a different thread's failure won't
|
||||||
|
erase the cause string the failing thread's /resume is about to
|
||||||
|
format. The caller or test fixture is responsible for explicitly
|
||||||
|
calling _set_last_init_error(None) to reset.
|
||||||
|
"""
|
||||||
|
# Autouse fixture starts at None — success-path leaves it None
|
||||||
|
db = SessionDB(db_path=tmp_path / "ok.db")
|
||||||
|
try:
|
||||||
|
assert get_last_init_error() is None
|
||||||
|
finally:
|
||||||
|
db.close()
|
||||||
|
|
||||||
|
def test_success_does_not_clear_prior_error(self, tmp_path):
|
||||||
|
"""Thread-safety guard: a successful init must not erase a pre-existing error.
|
||||||
|
|
||||||
|
Simulates the multi-threaded race: thread A fails, records cause;
|
||||||
|
thread B succeeds concurrently. thread A's /resume handler must
|
||||||
|
still see A's cause — not B's None.
|
||||||
|
"""
|
||||||
|
hermes_state._set_last_init_error("OperationalError: locking protocol")
|
||||||
|
# Now a "successful" init happens on another path — must NOT clear
|
||||||
|
db = SessionDB(db_path=tmp_path / "ok2.db")
|
||||||
|
try:
|
||||||
|
assert get_last_init_error() == "OperationalError: locking protocol"
|
||||||
|
finally:
|
||||||
|
db.close()
|
||||||
|
|
||||||
|
def test_captures_cause_on_failed_init(self, tmp_path):
|
||||||
|
"""When SessionDB() raises, the cause is preserved for slash commands.
|
||||||
|
|
||||||
|
Simulates a filesystem where BOTH WAL and DELETE journal modes fail —
|
||||||
|
e.g. a read-only mount where no ``PRAGMA journal_mode=X`` works. The
|
||||||
|
fallback tries DELETE and also gets rejected; the exception bubbles
|
||||||
|
out of ``SessionDB.__init__`` and the cause is captured.
|
||||||
|
"""
|
||||||
|
target = tmp_path / "broken.db"
|
||||||
|
real_connect = sqlite3.connect
|
||||||
|
|
||||||
|
class _BothPragmasFailConnection(sqlite3.Connection):
|
||||||
|
def execute(self, sql, *args, **kwargs): # type: ignore[override]
|
||||||
|
if "journal_mode" in sql.lower():
|
||||||
|
raise sqlite3.OperationalError(
|
||||||
|
"locking protocol: read-only filesystem"
|
||||||
|
)
|
||||||
|
return super().execute(sql, *args, **kwargs)
|
||||||
|
|
||||||
|
def gated_connect(*args, **kwargs):
|
||||||
|
return real_connect(str(target), factory=_BothPragmasFailConnection, **kwargs)
|
||||||
|
|
||||||
|
with patch("hermes_state.sqlite3.connect", side_effect=gated_connect):
|
||||||
|
with pytest.raises(sqlite3.OperationalError):
|
||||||
|
SessionDB(db_path=target)
|
||||||
|
|
||||||
|
cause = get_last_init_error()
|
||||||
|
assert cause is not None
|
||||||
|
assert "OperationalError" in cause
|
||||||
|
assert "locking protocol" in cause
|
||||||
|
|
||||||
|
|
||||||
|
class TestFormatSessionDbUnavailable:
|
||||||
|
def test_bare_message_when_no_cause(self):
|
||||||
|
"""No init error recorded → generic message."""
|
||||||
|
hermes_state._set_last_init_error(None)
|
||||||
|
assert format_session_db_unavailable() == "Session database not available."
|
||||||
|
|
||||||
|
def test_includes_cause(self):
|
||||||
|
"""Cause is surfaced for slash-command error strings."""
|
||||||
|
hermes_state._set_last_init_error("OperationalError: generic SQLite error")
|
||||||
|
msg = format_session_db_unavailable()
|
||||||
|
assert "generic SQLite error" in msg
|
||||||
|
assert msg.startswith("Session database not available:")
|
||||||
|
assert msg.endswith(".")
|
||||||
|
|
||||||
|
def test_adds_nfs_hint_for_locking_protocol(self):
|
||||||
|
"""Locking-protocol cause gets an NFS/SMB pointer for the user."""
|
||||||
|
hermes_state._set_last_init_error("OperationalError: locking protocol")
|
||||||
|
msg = format_session_db_unavailable()
|
||||||
|
assert "locking protocol" in msg
|
||||||
|
assert "NFS/SMB" in msg
|
||||||
|
assert "sqlite.org/wal.html" in msg
|
||||||
|
|
||||||
|
def test_custom_prefix(self):
|
||||||
|
"""Callers can customize the prefix for context-specific messages."""
|
||||||
|
hermes_state._set_last_init_error("OperationalError: locking protocol")
|
||||||
|
msg = format_session_db_unavailable(prefix="Cannot /resume")
|
||||||
|
assert msg.startswith("Cannot /resume:")
|
||||||
|
|
||||||
|
|
||||||
|
class TestSessionDbUsesWalFallback:
|
||||||
|
def test_sessiondb_works_when_wal_unavailable(self, tmp_path):
|
||||||
|
"""E2E: SessionDB initializes and performs a write on a WAL-blocked FS."""
|
||||||
|
target = tmp_path / "nfs_style.db"
|
||||||
|
|
||||||
|
real_connect = sqlite3.connect
|
||||||
|
attempts = [0]
|
||||||
|
factory = _make_blocking_factory("locking protocol", attempts)
|
||||||
|
|
||||||
|
def gated_connect(*args, **kwargs):
|
||||||
|
return real_connect(str(target), factory=factory, **kwargs)
|
||||||
|
|
||||||
|
with patch("hermes_state.sqlite3.connect", side_effect=gated_connect):
|
||||||
|
db = SessionDB(db_path=target)
|
||||||
|
|
||||||
|
try:
|
||||||
|
# WAL was attempted and rejected — fallback kicked in
|
||||||
|
assert attempts[0] >= 1, (
|
||||||
|
"WAL pragma was never executed — check the patch target"
|
||||||
|
)
|
||||||
|
# SessionDB is usable end-to-end: create a session, read it back
|
||||||
|
db.create_session(session_id="s1", source="cli", model="test")
|
||||||
|
sess = db.get_session("s1")
|
||||||
|
assert sess is not None
|
||||||
|
assert sess["source"] == "cli"
|
||||||
|
# No init error was recorded since init succeeded via the fallback
|
||||||
|
assert get_last_init_error() is None
|
||||||
|
finally:
|
||||||
|
db.close()
|
||||||
|
|
@ -337,7 +337,8 @@ def session_search(
|
||||||
The current session is excluded from results since the agent already has that context.
|
The current session is excluded from results since the agent already has that context.
|
||||||
"""
|
"""
|
||||||
if db is None:
|
if db is None:
|
||||||
return tool_error("Session database not available.", success=False)
|
from hermes_state import format_session_db_unavailable
|
||||||
|
return tool_error(format_session_db_unavailable(), success=False)
|
||||||
|
|
||||||
# Defensive: models (especially open-source) may send non-int limit values
|
# Defensive: models (especially open-source) may send non-int limit values
|
||||||
# (None when JSON null, string "int", or even a type object). Coerce to a
|
# (None when JSON null, string "int", or even a type object). Coerce to a
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue