mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-29 06:31:32 +00:00
fix(kanban): harden sqlite connection concurrency
This commit is contained in:
parent
3ad46933d3
commit
90b6b3d18f
2 changed files with 125 additions and 46 deletions
|
|
@ -982,6 +982,68 @@ CREATE INDEX IF NOT EXISTS idx_notify_task ON kanban_notify_subs(task_
|
|||
_INITIALIZED_PATHS: set[str] = set()
|
||||
_INIT_LOCK = threading.RLock()
|
||||
_SQLITE_HEADER = b"SQLite format 3\x00"
|
||||
DEFAULT_BUSY_TIMEOUT_MS = 120_000
|
||||
|
||||
|
||||
def _resolve_busy_timeout_ms() -> int:
|
||||
"""Return the SQLite busy timeout for Kanban connections.
|
||||
|
||||
Kanban is the shared cross-profile dispatch bus, so worker stampedes are
|
||||
expected. A long busy timeout lets SQLite serialize writers via WAL rather
|
||||
than surfacing transient ``database is locked`` failures during bursts.
|
||||
"""
|
||||
raw = os.environ.get("HERMES_KANBAN_BUSY_TIMEOUT_MS", "").strip()
|
||||
if raw:
|
||||
try:
|
||||
parsed = int(raw)
|
||||
except ValueError:
|
||||
parsed = 0
|
||||
if parsed > 0:
|
||||
return parsed
|
||||
return DEFAULT_BUSY_TIMEOUT_MS
|
||||
|
||||
|
||||
def _sqlite_connect(path: Path) -> sqlite3.Connection:
|
||||
"""Open a Kanban SQLite connection with consistent lock waiting."""
|
||||
busy_timeout_ms = _resolve_busy_timeout_ms()
|
||||
conn = sqlite3.connect(
|
||||
str(path),
|
||||
isolation_level=None,
|
||||
timeout=busy_timeout_ms / 1000.0,
|
||||
)
|
||||
# ``sqlite3.connect(timeout=...)`` normally maps to busy_timeout, but set
|
||||
# the PRAGMA explicitly so it is observable and survives future wrapper
|
||||
# changes. Parameter binding is not supported for PRAGMA assignments.
|
||||
conn.execute(f"PRAGMA busy_timeout={busy_timeout_ms}")
|
||||
return conn
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def _cross_process_init_lock(path: Path):
|
||||
"""Serialize first-connect WAL/schema/integrity setup across processes.
|
||||
|
||||
``_INIT_LOCK`` only protects threads inside one Python process. During a
|
||||
dispatcher burst, many worker processes can all hit a fresh/legacy board at
|
||||
once and each process has an empty ``_INITIALIZED_PATHS`` cache. This file
|
||||
lock keeps header validation, integrity probing, WAL activation, and
|
||||
additive migrations single-file/single-writer across the whole host while
|
||||
leaving normal post-init DB usage concurrent under SQLite WAL.
|
||||
"""
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
lock_path = path.with_name(path.name + ".init.lock")
|
||||
handle = lock_path.open("a+b")
|
||||
try:
|
||||
if not _IS_WINDOWS:
|
||||
import fcntl
|
||||
fcntl.flock(handle.fileno(), fcntl.LOCK_EX)
|
||||
yield
|
||||
finally:
|
||||
try:
|
||||
if not _IS_WINDOWS:
|
||||
import fcntl
|
||||
fcntl.flock(handle.fileno(), fcntl.LOCK_UN)
|
||||
finally:
|
||||
handle.close()
|
||||
|
||||
|
||||
def _looks_like_tls_record_at(data: bytes, offset: int) -> bool:
|
||||
|
|
@ -1142,7 +1204,7 @@ def _guard_existing_db_is_healthy(path: Path) -> None:
|
|||
return
|
||||
reason: Optional[str] = None
|
||||
try:
|
||||
probe = sqlite3.connect(str(resolved), timeout=5, isolation_level=None)
|
||||
probe = _sqlite_connect(resolved)
|
||||
try:
|
||||
row = probe.execute("PRAGMA integrity_check").fetchone()
|
||||
finally:
|
||||
|
|
@ -1188,51 +1250,52 @@ def connect(
|
|||
else:
|
||||
path = kanban_db_path(board=board)
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
# Cheap byte-level check first — catches the #29507 TLS-overwrite shape
|
||||
# and other invalid-header cases without opening a sqlite connection.
|
||||
_validate_sqlite_header(path)
|
||||
# Full integrity probe — catches corruption past the header (malformed
|
||||
# pages, broken internal metadata). Cached per-path after first success
|
||||
# via _INITIALIZED_PATHS so it only runs once per process per path.
|
||||
_guard_existing_db_is_healthy(path)
|
||||
resolved = str(path.resolve())
|
||||
conn = sqlite3.connect(str(path), isolation_level=None, timeout=30)
|
||||
try:
|
||||
conn.row_factory = sqlite3.Row
|
||||
with _INIT_LOCK:
|
||||
# WAL activation can take an exclusive lock while SQLite creates the
|
||||
# sidecar files for a fresh database. Keep it in the same process-local
|
||||
# critical section as schema initialization so concurrent gateway
|
||||
# startup threads do not race before _INITIALIZED_PATHS is populated.
|
||||
# WAL doesn't work on network filesystems (NFS/SMB/FUSE). Shared helper
|
||||
# falls back to DELETE with one WARNING so kanban stays usable there.
|
||||
# See hermes_state._WAL_INCOMPAT_MARKERS for detection logic.
|
||||
from hermes_state import apply_wal_with_fallback
|
||||
apply_wal_with_fallback(conn, db_label=f"kanban.db ({path.name})")
|
||||
# FULL (was NORMAL): fsync before each checkpoint to narrow the
|
||||
# crash window that can leave a b-tree page header torn.
|
||||
conn.execute("PRAGMA synchronous=FULL")
|
||||
conn.execute("PRAGMA wal_autocheckpoint=100")
|
||||
conn.execute("PRAGMA foreign_keys=ON")
|
||||
# Zero freed pages so a later torn write cannot expose stale
|
||||
# cell content; persisted in the DB header for new DBs.
|
||||
conn.execute("PRAGMA secure_delete=ON")
|
||||
# Surface corrupt cells as read errors instead of silent
|
||||
# wrong-data returns.
|
||||
conn.execute("PRAGMA cell_size_check=ON")
|
||||
needs_init = resolved not in _INITIALIZED_PATHS
|
||||
if needs_init:
|
||||
# Idempotent: runs CREATE TABLE IF NOT EXISTS + the additive
|
||||
# migrations. Cached so subsequent connect() calls in the same
|
||||
# process are cheap. The lock prevents same-process dispatcher
|
||||
# threads from racing through the additive ALTER TABLE pass with
|
||||
# stale PRAGMA snapshots during gateway startup.
|
||||
conn.executescript(SCHEMA_SQL)
|
||||
_migrate_add_optional_columns(conn)
|
||||
_INITIALIZED_PATHS.add(resolved)
|
||||
except Exception:
|
||||
conn.close()
|
||||
raise
|
||||
with _cross_process_init_lock(path):
|
||||
# Cheap byte-level check first — catches the #29507 TLS-overwrite shape
|
||||
# and other invalid-header cases without opening a sqlite connection.
|
||||
_validate_sqlite_header(path)
|
||||
# Full integrity probe — catches corruption past the header (malformed
|
||||
# pages, broken internal metadata). Cached per-path after first success
|
||||
# via _INITIALIZED_PATHS so it only runs once per process per path.
|
||||
_guard_existing_db_is_healthy(path)
|
||||
resolved = str(path.resolve())
|
||||
conn = _sqlite_connect(path)
|
||||
try:
|
||||
conn.row_factory = sqlite3.Row
|
||||
with _INIT_LOCK:
|
||||
# WAL activation can take an exclusive lock while SQLite creates the
|
||||
# sidecar files for a fresh database. Keep it in the same process-local
|
||||
# critical section as schema initialization so concurrent gateway
|
||||
# startup threads do not race before _INITIALIZED_PATHS is populated.
|
||||
# WAL doesn't work on network filesystems (NFS/SMB/FUSE). Shared helper
|
||||
# falls back to DELETE with one WARNING so kanban stays usable there.
|
||||
# See hermes_state._WAL_INCOMPAT_MARKERS for detection logic.
|
||||
from hermes_state import apply_wal_with_fallback
|
||||
apply_wal_with_fallback(conn, db_label=f"kanban.db ({path.name})")
|
||||
# FULL (was NORMAL): fsync before each checkpoint to narrow the
|
||||
# crash window that can leave a b-tree page header torn.
|
||||
conn.execute("PRAGMA synchronous=FULL")
|
||||
conn.execute("PRAGMA wal_autocheckpoint=100")
|
||||
conn.execute("PRAGMA foreign_keys=ON")
|
||||
# Zero freed pages so a later torn write cannot expose stale
|
||||
# cell content; persisted in the DB header for new DBs.
|
||||
conn.execute("PRAGMA secure_delete=ON")
|
||||
# Surface corrupt cells as read errors instead of silent
|
||||
# wrong-data returns.
|
||||
conn.execute("PRAGMA cell_size_check=ON")
|
||||
needs_init = resolved not in _INITIALIZED_PATHS
|
||||
if needs_init:
|
||||
# Idempotent: runs CREATE TABLE IF NOT EXISTS + the additive
|
||||
# migrations. Cached so subsequent connect() calls in the same
|
||||
# process are cheap. The lock prevents same-process dispatcher
|
||||
# threads from racing through the additive ALTER TABLE pass with
|
||||
# stale PRAGMA snapshots during gateway startup.
|
||||
conn.executescript(SCHEMA_SQL)
|
||||
_migrate_add_optional_columns(conn)
|
||||
_INITIALIZED_PATHS.add(resolved)
|
||||
except Exception:
|
||||
conn.close()
|
||||
raise
|
||||
return conn
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -49,6 +49,22 @@ def test_init_creates_expected_tables(kanban_home):
|
|||
assert {"tasks", "task_links", "task_comments", "task_events"} <= names
|
||||
|
||||
|
||||
def test_connect_honors_kanban_busy_timeout_env(kanban_home, monkeypatch):
|
||||
"""All kanban connections should use the explicit busy-timeout knob.
|
||||
|
||||
A worker stampede should wait for SQLite's writer lock instead of failing
|
||||
immediately with ``database is locked`` during first-connect/WAL/schema
|
||||
setup. The timeout must be queryable via PRAGMA so CLI, gateway, and tool
|
||||
connections behave the same way.
|
||||
"""
|
||||
monkeypatch.setenv("HERMES_KANBAN_BUSY_TIMEOUT_MS", "123456")
|
||||
|
||||
with kb.connect() as conn:
|
||||
row = conn.execute("PRAGMA busy_timeout").fetchone()
|
||||
|
||||
assert row[0] == 123456
|
||||
|
||||
|
||||
def test_connect_rejects_tls_record_in_sqlite_header(tmp_path, monkeypatch):
|
||||
"""Kanban should classify TLS-looking page-0 clobbers before WAL setup."""
|
||||
home = tmp_path / ".hermes"
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue