From ca8126bd5338794e3b51c3e804d70a4932199484 Mon Sep 17 00:00:00 2001 From: psionic73 <157839748+psionic73@users.noreply.github.com> Date: Mon, 18 May 2026 20:17:42 -0700 Subject: [PATCH] fix(kanban): serialize DB initialization --- hermes_cli/kanban_db.py | 42 +++++++++++++++---------- tests/hermes_cli/test_kanban_db_init.py | 38 ++++++++++++++++++++++ 2 files changed, 64 insertions(+), 16 deletions(-) create mode 100644 tests/hermes_cli/test_kanban_db_init.py diff --git a/hermes_cli/kanban_db.py b/hermes_cli/kanban_db.py index 46a5f7df89d..4bc1641b476 100644 --- a/hermes_cli/kanban_db.py +++ b/hermes_cli/kanban_db.py @@ -78,6 +78,7 @@ import secrets import sqlite3 import subprocess import sys +import threading import time from dataclasses import dataclass, field from pathlib import Path @@ -899,6 +900,7 @@ CREATE INDEX IF NOT EXISTS idx_notify_task ON kanban_notify_subs(task_ # --------------------------------------------------------------------------- _INITIALIZED_PATHS: set[str] = set() +_INIT_LOCK = threading.RLock() def connect( @@ -930,23 +932,30 @@ def connect( path = kanban_db_path(board=board) path.parent.mkdir(parents=True, exist_ok=True) resolved = str(path.resolve()) - needs_init = resolved not in _INITIALIZED_PATHS conn = sqlite3.connect(str(path), isolation_level=None, timeout=30) conn.row_factory = sqlite3.Row - # WAL doesn't work on network filesystems (NFS/SMB/FUSE). Shared helper - # falls back to DELETE with one WARNING so kanban stays usable there. - # See hermes_state._WAL_INCOMPAT_MARKERS for detection logic. - from hermes_state import apply_wal_with_fallback - apply_wal_with_fallback(conn, db_label=f"kanban.db ({path.name})") - conn.execute("PRAGMA synchronous=NORMAL") - conn.execute("PRAGMA foreign_keys=ON") - if needs_init: - # Idempotent: runs CREATE TABLE IF NOT EXISTS + the additive - # migrations. Cached so subsequent connect() calls in the same - # process are cheap. - conn.executescript(SCHEMA_SQL) - _migrate_add_optional_columns(conn) - _INITIALIZED_PATHS.add(resolved) + with _INIT_LOCK: + # WAL activation can take an exclusive lock while SQLite creates the + # sidecar files for a fresh database. Keep it in the same process-local + # critical section as schema initialization so concurrent gateway + # startup threads do not race before _INITIALIZED_PATHS is populated. + # WAL doesn't work on network filesystems (NFS/SMB/FUSE). Shared helper + # falls back to DELETE with one WARNING so kanban stays usable there. + # See hermes_state._WAL_INCOMPAT_MARKERS for detection logic. + from hermes_state import apply_wal_with_fallback + apply_wal_with_fallback(conn, db_label=f"kanban.db ({path.name})") + conn.execute("PRAGMA synchronous=NORMAL") + conn.execute("PRAGMA foreign_keys=ON") + needs_init = resolved not in _INITIALIZED_PATHS + if needs_init: + # Idempotent: runs CREATE TABLE IF NOT EXISTS + the additive + # migrations. Cached so subsequent connect() calls in the same + # process are cheap. The lock prevents same-process dispatcher + # threads from racing through the additive ALTER TABLE pass with + # stale PRAGMA snapshots during gateway startup. + conn.executescript(SCHEMA_SQL) + _migrate_add_optional_columns(conn) + _INITIALIZED_PATHS.add(resolved) return conn @@ -973,7 +982,8 @@ def init_db( resolved = str(path.resolve()) # Clear the cache entry so the underlying connect() re-runs the # schema + migration pass unconditionally. - _INITIALIZED_PATHS.discard(resolved) + with _INIT_LOCK: + _INITIALIZED_PATHS.discard(resolved) with contextlib.closing(connect(path)): pass return path diff --git a/tests/hermes_cli/test_kanban_db_init.py b/tests/hermes_cli/test_kanban_db_init.py new file mode 100644 index 00000000000..c400b1d90f9 --- /dev/null +++ b/tests/hermes_cli/test_kanban_db_init.py @@ -0,0 +1,38 @@ +from __future__ import annotations + +import threading +from pathlib import Path + +from hermes_cli import kanban_db as kb + + +def test_connect_initialization_is_thread_safe(tmp_path, monkeypatch): + home = tmp_path / ".hermes" + home.mkdir() + monkeypatch.setenv("HERMES_HOME", str(home)) + monkeypatch.setattr(Path, "home", lambda: tmp_path) + + db_path = kb.kanban_db_path(board="default") + kb._INITIALIZED_PATHS.discard(str(db_path.resolve())) + + errors: list[BaseException] = [] + barrier = threading.Barrier(8) + + def worker() -> None: + try: + barrier.wait(timeout=5) + conn = kb.connect(board="default") + conn.close() + except BaseException as exc: # pragma: no cover - surfaced below + errors.append(exc) + + threads = [threading.Thread(target=worker) for _ in range(8)] + for thread in threads: + thread.start() + for thread in threads: + thread.join(timeout=10) + + assert errors == [] + with kb.connect(board="default") as conn: + cols = {row["name"] for row in conn.execute("PRAGMA table_info(tasks)")} + assert "max_retries" in cols