From 90b6b3d18f0925da267dacee211418d2ecbd4d6f Mon Sep 17 00:00:00 2001 From: Squiddy Date: Mon, 25 May 2026 21:37:06 -0400 Subject: [PATCH] fix(kanban): harden sqlite connection concurrency --- hermes_cli/kanban_db.py | 155 ++++++++++++++++++++--------- tests/hermes_cli/test_kanban_db.py | 16 +++ 2 files changed, 125 insertions(+), 46 deletions(-) diff --git a/hermes_cli/kanban_db.py b/hermes_cli/kanban_db.py index aa28b07e2db..858abeca6b2 100644 --- a/hermes_cli/kanban_db.py +++ b/hermes_cli/kanban_db.py @@ -982,6 +982,68 @@ CREATE INDEX IF NOT EXISTS idx_notify_task ON kanban_notify_subs(task_ _INITIALIZED_PATHS: set[str] = set() _INIT_LOCK = threading.RLock() _SQLITE_HEADER = b"SQLite format 3\x00" +DEFAULT_BUSY_TIMEOUT_MS = 120_000 + + +def _resolve_busy_timeout_ms() -> int: + """Return the SQLite busy timeout for Kanban connections. + + Kanban is the shared cross-profile dispatch bus, so worker stampedes are + expected. A long busy timeout lets SQLite serialize writers via WAL rather + than surfacing transient ``database is locked`` failures during bursts. + """ + raw = os.environ.get("HERMES_KANBAN_BUSY_TIMEOUT_MS", "").strip() + if raw: + try: + parsed = int(raw) + except ValueError: + parsed = 0 + if parsed > 0: + return parsed + return DEFAULT_BUSY_TIMEOUT_MS + + +def _sqlite_connect(path: Path) -> sqlite3.Connection: + """Open a Kanban SQLite connection with consistent lock waiting.""" + busy_timeout_ms = _resolve_busy_timeout_ms() + conn = sqlite3.connect( + str(path), + isolation_level=None, + timeout=busy_timeout_ms / 1000.0, + ) + # ``sqlite3.connect(timeout=...)`` normally maps to busy_timeout, but set + # the PRAGMA explicitly so it is observable and survives future wrapper + # changes. Parameter binding is not supported for PRAGMA assignments. + conn.execute(f"PRAGMA busy_timeout={busy_timeout_ms}") + return conn + + +@contextlib.contextmanager +def _cross_process_init_lock(path: Path): + """Serialize first-connect WAL/schema/integrity setup across processes. + + ``_INIT_LOCK`` only protects threads inside one Python process. During a + dispatcher burst, many worker processes can all hit a fresh/legacy board at + once and each process has an empty ``_INITIALIZED_PATHS`` cache. This file + lock keeps header validation, integrity probing, WAL activation, and + additive migrations single-file/single-writer across the whole host while + leaving normal post-init DB usage concurrent under SQLite WAL. + """ + path.parent.mkdir(parents=True, exist_ok=True) + lock_path = path.with_name(path.name + ".init.lock") + handle = lock_path.open("a+b") + try: + if not _IS_WINDOWS: + import fcntl + fcntl.flock(handle.fileno(), fcntl.LOCK_EX) + yield + finally: + try: + if not _IS_WINDOWS: + import fcntl + fcntl.flock(handle.fileno(), fcntl.LOCK_UN) + finally: + handle.close() def _looks_like_tls_record_at(data: bytes, offset: int) -> bool: @@ -1142,7 +1204,7 @@ def _guard_existing_db_is_healthy(path: Path) -> None: return reason: Optional[str] = None try: - probe = sqlite3.connect(str(resolved), timeout=5, isolation_level=None) + probe = _sqlite_connect(resolved) try: row = probe.execute("PRAGMA integrity_check").fetchone() finally: @@ -1188,51 +1250,52 @@ def connect( else: path = kanban_db_path(board=board) path.parent.mkdir(parents=True, exist_ok=True) - # Cheap byte-level check first — catches the #29507 TLS-overwrite shape - # and other invalid-header cases without opening a sqlite connection. - _validate_sqlite_header(path) - # Full integrity probe — catches corruption past the header (malformed - # pages, broken internal metadata). Cached per-path after first success - # via _INITIALIZED_PATHS so it only runs once per process per path. - _guard_existing_db_is_healthy(path) - resolved = str(path.resolve()) - conn = sqlite3.connect(str(path), isolation_level=None, timeout=30) - try: - conn.row_factory = sqlite3.Row - with _INIT_LOCK: - # WAL activation can take an exclusive lock while SQLite creates the - # sidecar files for a fresh database. Keep it in the same process-local - # critical section as schema initialization so concurrent gateway - # startup threads do not race before _INITIALIZED_PATHS is populated. - # WAL doesn't work on network filesystems (NFS/SMB/FUSE). Shared helper - # falls back to DELETE with one WARNING so kanban stays usable there. - # See hermes_state._WAL_INCOMPAT_MARKERS for detection logic. - from hermes_state import apply_wal_with_fallback - apply_wal_with_fallback(conn, db_label=f"kanban.db ({path.name})") - # FULL (was NORMAL): fsync before each checkpoint to narrow the - # crash window that can leave a b-tree page header torn. - conn.execute("PRAGMA synchronous=FULL") - conn.execute("PRAGMA wal_autocheckpoint=100") - conn.execute("PRAGMA foreign_keys=ON") - # Zero freed pages so a later torn write cannot expose stale - # cell content; persisted in the DB header for new DBs. - conn.execute("PRAGMA secure_delete=ON") - # Surface corrupt cells as read errors instead of silent - # wrong-data returns. - conn.execute("PRAGMA cell_size_check=ON") - needs_init = resolved not in _INITIALIZED_PATHS - if needs_init: - # Idempotent: runs CREATE TABLE IF NOT EXISTS + the additive - # migrations. Cached so subsequent connect() calls in the same - # process are cheap. The lock prevents same-process dispatcher - # threads from racing through the additive ALTER TABLE pass with - # stale PRAGMA snapshots during gateway startup. - conn.executescript(SCHEMA_SQL) - _migrate_add_optional_columns(conn) - _INITIALIZED_PATHS.add(resolved) - except Exception: - conn.close() - raise + with _cross_process_init_lock(path): + # Cheap byte-level check first — catches the #29507 TLS-overwrite shape + # and other invalid-header cases without opening a sqlite connection. + _validate_sqlite_header(path) + # Full integrity probe — catches corruption past the header (malformed + # pages, broken internal metadata). Cached per-path after first success + # via _INITIALIZED_PATHS so it only runs once per process per path. + _guard_existing_db_is_healthy(path) + resolved = str(path.resolve()) + conn = _sqlite_connect(path) + try: + conn.row_factory = sqlite3.Row + with _INIT_LOCK: + # WAL activation can take an exclusive lock while SQLite creates the + # sidecar files for a fresh database. Keep it in the same process-local + # critical section as schema initialization so concurrent gateway + # startup threads do not race before _INITIALIZED_PATHS is populated. + # WAL doesn't work on network filesystems (NFS/SMB/FUSE). Shared helper + # falls back to DELETE with one WARNING so kanban stays usable there. + # See hermes_state._WAL_INCOMPAT_MARKERS for detection logic. + from hermes_state import apply_wal_with_fallback + apply_wal_with_fallback(conn, db_label=f"kanban.db ({path.name})") + # FULL (was NORMAL): fsync before each checkpoint to narrow the + # crash window that can leave a b-tree page header torn. + conn.execute("PRAGMA synchronous=FULL") + conn.execute("PRAGMA wal_autocheckpoint=100") + conn.execute("PRAGMA foreign_keys=ON") + # Zero freed pages so a later torn write cannot expose stale + # cell content; persisted in the DB header for new DBs. + conn.execute("PRAGMA secure_delete=ON") + # Surface corrupt cells as read errors instead of silent + # wrong-data returns. + conn.execute("PRAGMA cell_size_check=ON") + needs_init = resolved not in _INITIALIZED_PATHS + if needs_init: + # Idempotent: runs CREATE TABLE IF NOT EXISTS + the additive + # migrations. Cached so subsequent connect() calls in the same + # process are cheap. The lock prevents same-process dispatcher + # threads from racing through the additive ALTER TABLE pass with + # stale PRAGMA snapshots during gateway startup. + conn.executescript(SCHEMA_SQL) + _migrate_add_optional_columns(conn) + _INITIALIZED_PATHS.add(resolved) + except Exception: + conn.close() + raise return conn diff --git a/tests/hermes_cli/test_kanban_db.py b/tests/hermes_cli/test_kanban_db.py index b9e9a7ee9c1..92ac3b7a4a6 100644 --- a/tests/hermes_cli/test_kanban_db.py +++ b/tests/hermes_cli/test_kanban_db.py @@ -49,6 +49,22 @@ def test_init_creates_expected_tables(kanban_home): assert {"tasks", "task_links", "task_comments", "task_events"} <= names +def test_connect_honors_kanban_busy_timeout_env(kanban_home, monkeypatch): + """All kanban connections should use the explicit busy-timeout knob. + + A worker stampede should wait for SQLite's writer lock instead of failing + immediately with ``database is locked`` during first-connect/WAL/schema + setup. The timeout must be queryable via PRAGMA so CLI, gateway, and tool + connections behave the same way. + """ + monkeypatch.setenv("HERMES_KANBAN_BUSY_TIMEOUT_MS", "123456") + + with kb.connect() as conn: + row = conn.execute("PRAGMA busy_timeout").fetchone() + + assert row[0] == 123456 + + def test_connect_rejects_tls_record_in_sqlite_header(tmp_path, monkeypatch): """Kanban should classify TLS-looking page-0 clobbers before WAL setup.""" home = tmp_path / ".hermes"