mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-29 06:31:32 +00:00
fix(kanban): serialize DB initialization
This commit is contained in:
parent
917e51858d
commit
ca8126bd53
2 changed files with 64 additions and 16 deletions
|
|
@ -78,6 +78,7 @@ import secrets
|
|||
import sqlite3
|
||||
import subprocess
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
|
|
@ -899,6 +900,7 @@ CREATE INDEX IF NOT EXISTS idx_notify_task ON kanban_notify_subs(task_
|
|||
# ---------------------------------------------------------------------------
|
||||
|
||||
_INITIALIZED_PATHS: set[str] = set()
|
||||
_INIT_LOCK = threading.RLock()
|
||||
|
||||
|
||||
def connect(
|
||||
|
|
@ -930,23 +932,30 @@ def connect(
|
|||
path = kanban_db_path(board=board)
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
resolved = str(path.resolve())
|
||||
needs_init = resolved not in _INITIALIZED_PATHS
|
||||
conn = sqlite3.connect(str(path), isolation_level=None, timeout=30)
|
||||
conn.row_factory = sqlite3.Row
|
||||
# WAL doesn't work on network filesystems (NFS/SMB/FUSE). Shared helper
|
||||
# falls back to DELETE with one WARNING so kanban stays usable there.
|
||||
# See hermes_state._WAL_INCOMPAT_MARKERS for detection logic.
|
||||
from hermes_state import apply_wal_with_fallback
|
||||
apply_wal_with_fallback(conn, db_label=f"kanban.db ({path.name})")
|
||||
conn.execute("PRAGMA synchronous=NORMAL")
|
||||
conn.execute("PRAGMA foreign_keys=ON")
|
||||
if needs_init:
|
||||
# Idempotent: runs CREATE TABLE IF NOT EXISTS + the additive
|
||||
# migrations. Cached so subsequent connect() calls in the same
|
||||
# process are cheap.
|
||||
conn.executescript(SCHEMA_SQL)
|
||||
_migrate_add_optional_columns(conn)
|
||||
_INITIALIZED_PATHS.add(resolved)
|
||||
with _INIT_LOCK:
|
||||
# WAL activation can take an exclusive lock while SQLite creates the
|
||||
# sidecar files for a fresh database. Keep it in the same process-local
|
||||
# critical section as schema initialization so concurrent gateway
|
||||
# startup threads do not race before _INITIALIZED_PATHS is populated.
|
||||
# WAL doesn't work on network filesystems (NFS/SMB/FUSE). Shared helper
|
||||
# falls back to DELETE with one WARNING so kanban stays usable there.
|
||||
# See hermes_state._WAL_INCOMPAT_MARKERS for detection logic.
|
||||
from hermes_state import apply_wal_with_fallback
|
||||
apply_wal_with_fallback(conn, db_label=f"kanban.db ({path.name})")
|
||||
conn.execute("PRAGMA synchronous=NORMAL")
|
||||
conn.execute("PRAGMA foreign_keys=ON")
|
||||
needs_init = resolved not in _INITIALIZED_PATHS
|
||||
if needs_init:
|
||||
# Idempotent: runs CREATE TABLE IF NOT EXISTS + the additive
|
||||
# migrations. Cached so subsequent connect() calls in the same
|
||||
# process are cheap. The lock prevents same-process dispatcher
|
||||
# threads from racing through the additive ALTER TABLE pass with
|
||||
# stale PRAGMA snapshots during gateway startup.
|
||||
conn.executescript(SCHEMA_SQL)
|
||||
_migrate_add_optional_columns(conn)
|
||||
_INITIALIZED_PATHS.add(resolved)
|
||||
return conn
|
||||
|
||||
|
||||
|
|
@ -973,7 +982,8 @@ def init_db(
|
|||
resolved = str(path.resolve())
|
||||
# Clear the cache entry so the underlying connect() re-runs the
|
||||
# schema + migration pass unconditionally.
|
||||
_INITIALIZED_PATHS.discard(resolved)
|
||||
with _INIT_LOCK:
|
||||
_INITIALIZED_PATHS.discard(resolved)
|
||||
with contextlib.closing(connect(path)):
|
||||
pass
|
||||
return path
|
||||
|
|
|
|||
38
tests/hermes_cli/test_kanban_db_init.py
Normal file
38
tests/hermes_cli/test_kanban_db_init.py
Normal file
|
|
@ -0,0 +1,38 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import threading
|
||||
from pathlib import Path
|
||||
|
||||
from hermes_cli import kanban_db as kb
|
||||
|
||||
|
||||
def test_connect_initialization_is_thread_safe(tmp_path, monkeypatch):
|
||||
home = tmp_path / ".hermes"
|
||||
home.mkdir()
|
||||
monkeypatch.setenv("HERMES_HOME", str(home))
|
||||
monkeypatch.setattr(Path, "home", lambda: tmp_path)
|
||||
|
||||
db_path = kb.kanban_db_path(board="default")
|
||||
kb._INITIALIZED_PATHS.discard(str(db_path.resolve()))
|
||||
|
||||
errors: list[BaseException] = []
|
||||
barrier = threading.Barrier(8)
|
||||
|
||||
def worker() -> None:
|
||||
try:
|
||||
barrier.wait(timeout=5)
|
||||
conn = kb.connect(board="default")
|
||||
conn.close()
|
||||
except BaseException as exc: # pragma: no cover - surfaced below
|
||||
errors.append(exc)
|
||||
|
||||
threads = [threading.Thread(target=worker) for _ in range(8)]
|
||||
for thread in threads:
|
||||
thread.start()
|
||||
for thread in threads:
|
||||
thread.join(timeout=10)
|
||||
|
||||
assert errors == []
|
||||
with kb.connect(board="default") as conn:
|
||||
cols = {row["name"] for row in conn.execute("PRAGMA table_info(tasks)")}
|
||||
assert "max_retries" in cols
|
||||
Loading…
Add table
Add a link
Reference in a new issue