mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-23 10:42:00 +00:00
fix(kanban): bound the cross-process init lock so connect() can't hang forever (#50353)
connect() wrapped its entire body in an unbounded blocking flock(LOCK_EX) on every call (_cross_process_init_lock). A single process stalled inside the critical section — or a stale lock held by a wedged worker — blocked every other connect(), including the long-lived gateway dispatcher's next-tick connect, forever. No timeout, no traceback, no recovery: the board silently stopped being worked until a manual restart (issue #36644). Two fixes: 1. Fast-path skip: once THIS process has initialized a path, the expensive first-open work (header validation, integrity probe, schema + additive migrations) is already cached in _INITIALIZED_PATHS. The steady-state connect has nothing for the cross-process lock to protect, so it now opens the connection (WAL + pragmas) under only the cheap in-process _INIT_LOCK and never touches the file lock. This removes the lock from the dispatcher's hot path entirely — a stalled external 'hermes kanban list' can no longer block ticks. 2. Bounded acquire: even on first-init, _cross_process_init_lock now retries a non-blocking acquire up to a 10s deadline, then logs a WARNING and proceeds WITHOUT the cross-process lock. Safe because the in-process _INIT_LOCK still serializes same-process threads and the init work is idempotent (CREATE TABLE IF NOT EXISTS + additive migrations) — worst case is redundant work, not corruption. A bounded 'proceed anyway' beats an unbounded hang. Windows path switched LK_LOCK -> LK_NBLCK (non-blocking) to match. Closes #36644.
This commit is contained in:
parent
9630ec6c19
commit
84ba83b09a
3 changed files with 193 additions and 22 deletions
|
|
@ -1183,6 +1183,14 @@ _INIT_LOCK = threading.RLock()
|
|||
_SQLITE_HEADER = b"SQLite format 3\x00"
|
||||
DEFAULT_BUSY_TIMEOUT_MS = 120_000
|
||||
|
||||
# Bounded acquire for the cross-process init lock (#36644). The original bare
|
||||
# blocking flock had no timeout, so a wedged holder blocked the dispatcher's
|
||||
# next-tick connect forever. We retry a non-blocking acquire up to this
|
||||
# deadline, polling at this interval, then proceed without the cross-process
|
||||
# lock (the in-process _INIT_LOCK + idempotent init remain the backstop).
|
||||
_INIT_LOCK_TIMEOUT_SECONDS = 10.0
|
||||
_INIT_LOCK_POLL_SECONDS = 0.05
|
||||
|
||||
|
||||
def _resolve_busy_timeout_ms() -> int:
|
||||
"""Return the SQLite busy timeout for Kanban connections.
|
||||
|
|
@ -1227,41 +1235,76 @@ def _cross_process_init_lock(path: Path):
|
|||
lock keeps header validation, integrity probing, WAL activation, and
|
||||
additive migrations single-file/single-writer across the whole host while
|
||||
leaving normal post-init DB usage concurrent under SQLite WAL.
|
||||
|
||||
The acquire is **bounded** (issue #36644): the original bare blocking
|
||||
``flock(LOCK_EX)`` had no timeout, so a single process stalled inside the
|
||||
critical section (or a stale lock held by a wedged worker) blocked every
|
||||
other ``connect()`` — including the long-lived gateway dispatcher's
|
||||
next-tick connect — forever, with no traceback and no recovery short of a
|
||||
restart. We now retry a non-blocking acquire up to a deadline; on timeout
|
||||
we log a WARNING and proceed WITHOUT the cross-process lock. That is safe:
|
||||
the in-process ``_INIT_LOCK`` still serializes same-process threads, and
|
||||
the init work itself is idempotent (``CREATE TABLE IF NOT EXISTS`` +
|
||||
additive migrations), so the worst case of two processes racing first-init
|
||||
is redundant work, not corruption. A bounded "proceed anyway" beats an
|
||||
unbounded hang that silently stops the board.
|
||||
"""
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
lock_path = path.with_name(path.name + ".init.lock")
|
||||
handle = lock_path.open("a+b")
|
||||
acquired = False
|
||||
try:
|
||||
deadline = time.monotonic() + _INIT_LOCK_TIMEOUT_SECONDS
|
||||
if _IS_WINDOWS:
|
||||
import msvcrt
|
||||
|
||||
# Lock a single byte in the sidecar file. ``msvcrt.locking`` starts
|
||||
# at the current file position, so seek explicitly before both
|
||||
# lock and unlock. The file is opened in append/read binary mode so
|
||||
# it always exists but the byte-range lock is the synchronization
|
||||
# primitive; no payload needs to be written.
|
||||
handle.seek(0)
|
||||
locking = getattr(msvcrt, "locking")
|
||||
lock_mode = getattr(msvcrt, "LK_LOCK")
|
||||
locking(handle.fileno(), lock_mode, 1)
|
||||
nb_lock = getattr(msvcrt, "LK_NBLCK")
|
||||
while True:
|
||||
try:
|
||||
handle.seek(0)
|
||||
locking(handle.fileno(), nb_lock, 1)
|
||||
acquired = True
|
||||
break
|
||||
except OSError:
|
||||
if time.monotonic() >= deadline:
|
||||
break
|
||||
time.sleep(_INIT_LOCK_POLL_SECONDS)
|
||||
else:
|
||||
import fcntl
|
||||
|
||||
fcntl.flock(handle.fileno(), fcntl.LOCK_EX)
|
||||
while True:
|
||||
try:
|
||||
fcntl.flock(handle.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
|
||||
acquired = True
|
||||
break
|
||||
except (BlockingIOError, OSError):
|
||||
if time.monotonic() >= deadline:
|
||||
break
|
||||
time.sleep(_INIT_LOCK_POLL_SECONDS)
|
||||
if not acquired:
|
||||
_log.warning(
|
||||
"kanban init lock for %s not acquired within %.0fs — proceeding "
|
||||
"without the cross-process lock (in-process lock + idempotent "
|
||||
"init are the correctness backstop). A stuck holder is no longer "
|
||||
"able to block this connect indefinitely (#36644).",
|
||||
lock_path, _INIT_LOCK_TIMEOUT_SECONDS,
|
||||
)
|
||||
yield
|
||||
finally:
|
||||
try:
|
||||
if _IS_WINDOWS:
|
||||
import msvcrt
|
||||
if acquired:
|
||||
if _IS_WINDOWS:
|
||||
import msvcrt
|
||||
|
||||
handle.seek(0)
|
||||
locking = getattr(msvcrt, "locking")
|
||||
unlock_mode = getattr(msvcrt, "LK_UNLCK")
|
||||
locking(handle.fileno(), unlock_mode, 1)
|
||||
else:
|
||||
import fcntl
|
||||
handle.seek(0)
|
||||
locking = getattr(msvcrt, "locking")
|
||||
unlock_mode = getattr(msvcrt, "LK_UNLCK")
|
||||
locking(handle.fileno(), unlock_mode, 1)
|
||||
else:
|
||||
import fcntl
|
||||
|
||||
fcntl.flock(handle.fileno(), fcntl.LOCK_UN)
|
||||
fcntl.flock(handle.fileno(), fcntl.LOCK_UN)
|
||||
finally:
|
||||
handle.close()
|
||||
|
||||
|
|
@ -1561,6 +1604,35 @@ def connect(
|
|||
else:
|
||||
path = kanban_db_path(board=board)
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Fast path: once THIS process has initialized this path, the expensive
|
||||
# first-open work (header validation, integrity probe, schema + additive
|
||||
# migrations) is already done and cached in _INITIALIZED_PATHS. Acquiring
|
||||
# the cross-process init lock on every connect is what let a single stalled
|
||||
# holder (e.g. an external `hermes kanban list` mid-integrity-probe) block
|
||||
# the long-lived gateway dispatcher's next-tick connect() forever — an
|
||||
# unbounded flock with no timeout, no LOCK_NB, no recovery (#36644). On the
|
||||
# steady-state path there is nothing for the cross-process lock to protect
|
||||
# (no schema/migration writes run), so skip it entirely and just open the
|
||||
# connection with WAL/pragmas under the cheap in-process _INIT_LOCK.
|
||||
resolved = str(path.resolve())
|
||||
if resolved in _INITIALIZED_PATHS:
|
||||
conn = _sqlite_connect(path)
|
||||
try:
|
||||
conn.row_factory = sqlite3.Row
|
||||
with _INIT_LOCK:
|
||||
from hermes_state import apply_wal_with_fallback
|
||||
apply_wal_with_fallback(conn, db_label=f"kanban.db ({path.name})")
|
||||
conn.execute("PRAGMA synchronous=FULL")
|
||||
conn.execute("PRAGMA wal_autocheckpoint=100")
|
||||
conn.execute("PRAGMA foreign_keys=ON")
|
||||
conn.execute("PRAGMA secure_delete=ON")
|
||||
conn.execute("PRAGMA cell_size_check=ON")
|
||||
except Exception:
|
||||
conn.close()
|
||||
raise
|
||||
return conn
|
||||
|
||||
with _cross_process_init_lock(path):
|
||||
# Cheap byte-level check first — catches the #29507 TLS-overwrite shape
|
||||
# and other invalid-header cases without opening a sqlite connection.
|
||||
|
|
|
|||
|
|
@ -79,10 +79,15 @@ def test_connect_honors_kanban_busy_timeout_env(kanban_home, monkeypatch):
|
|||
|
||||
|
||||
def test_cross_process_init_lock_uses_windows_byte_range_lock(tmp_path, monkeypatch):
|
||||
"""Windows must use a real process lock, not a no-op sidecar open."""
|
||||
"""Windows must use a real (non-blocking) process lock, not a no-op open.
|
||||
|
||||
The init lock acquires with LK_NBLCK in a bounded retry loop (#36644) so a
|
||||
wedged holder can never block connect() forever; a clean acquire takes the
|
||||
lock once and releases it once.
|
||||
"""
|
||||
calls: list[tuple[int, int, int]] = []
|
||||
fake_msvcrt = types.SimpleNamespace(
|
||||
LK_LOCK=1,
|
||||
LK_NBLCK=3,
|
||||
LK_UNLCK=2,
|
||||
locking=lambda fd, mode, nbytes: calls.append((fd, mode, nbytes)),
|
||||
)
|
||||
|
|
@ -91,10 +96,12 @@ def test_cross_process_init_lock_uses_windows_byte_range_lock(tmp_path, monkeypa
|
|||
|
||||
db_path = tmp_path / "kanban.db"
|
||||
with kb._cross_process_init_lock(db_path):
|
||||
assert calls == [(calls[0][0], fake_msvcrt.LK_LOCK, 1)]
|
||||
# Acquired exactly once via the non-blocking byte-range lock.
|
||||
assert [call[1:] for call in calls] == [(fake_msvcrt.LK_NBLCK, 1)]
|
||||
|
||||
# Released once on exit.
|
||||
assert [call[1:] for call in calls] == [
|
||||
(fake_msvcrt.LK_LOCK, 1),
|
||||
(fake_msvcrt.LK_NBLCK, 1),
|
||||
(fake_msvcrt.LK_UNLCK, 1),
|
||||
]
|
||||
|
||||
|
|
|
|||
92
tests/hermes_cli/test_kanban_init_lock_bounded.py
Normal file
92
tests/hermes_cli/test_kanban_init_lock_bounded.py
Normal file
|
|
@ -0,0 +1,92 @@
|
|||
"""Tests for the bounded kanban init lock (issue #36644).
|
||||
|
||||
`connect()` wrapped its entire body in an unbounded blocking `flock(LOCK_EX)`
|
||||
on every call. A single process stalled inside the critical section blocked the
|
||||
long-lived gateway dispatcher's next-tick `connect()` forever — no timeout, no
|
||||
recovery, board silently stops being worked.
|
||||
|
||||
Two fixes, both covered here:
|
||||
1. Fast path: once a path is initialized in this process, `connect()` skips the
|
||||
cross-process init lock entirely (nothing left to serialize), so a held lock
|
||||
cannot block a steady-state connect.
|
||||
2. Bounded acquire: even on first-init, `_cross_process_init_lock` retries a
|
||||
non-blocking acquire up to a deadline, then proceeds (with a WARNING) rather
|
||||
than hanging.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import threading
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from hermes_cli import kanban_db as kb
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def kanban_home(tmp_path, monkeypatch):
|
||||
home = tmp_path / ".hermes"
|
||||
home.mkdir()
|
||||
monkeypatch.setenv("HERMES_HOME", str(home))
|
||||
monkeypatch.setenv("HERMES_KANBAN_HOME", str(home))
|
||||
monkeypatch.setattr(Path, "home", lambda: tmp_path)
|
||||
db_path = kb.kanban_db_path(board="default")
|
||||
kb._INITIALIZED_PATHS.discard(str(db_path.resolve()))
|
||||
return home
|
||||
|
||||
|
||||
def _hold_init_lock(db_path: Path):
|
||||
"""Return (start_event, release_event, thread) holding the init lock."""
|
||||
holding = threading.Event()
|
||||
release = threading.Event()
|
||||
|
||||
def _holder():
|
||||
with kb._cross_process_init_lock(db_path):
|
||||
holding.set()
|
||||
release.wait(timeout=10)
|
||||
|
||||
t = threading.Thread(target=_holder, daemon=True)
|
||||
t.start()
|
||||
assert holding.wait(timeout=5), "holder thread never acquired the lock"
|
||||
return release, t
|
||||
|
||||
|
||||
def test_initialized_path_connect_skips_init_lock(kanban_home):
|
||||
"""A connect to an already-initialized path must not block on the init lock."""
|
||||
db_path = kb.kanban_db_path(board="default")
|
||||
# Initialize once.
|
||||
kb.connect().close()
|
||||
assert str(db_path.resolve()) in kb._INITIALIZED_PATHS
|
||||
|
||||
# Hold the init lock; a fast-path connect must return promptly anyway.
|
||||
release, t = _hold_init_lock(db_path)
|
||||
try:
|
||||
start = time.monotonic()
|
||||
kb.connect().close()
|
||||
elapsed = time.monotonic() - start
|
||||
assert elapsed < 1.0, f"fast-path connect blocked on the init lock ({elapsed:.2f}s)"
|
||||
finally:
|
||||
release.set()
|
||||
t.join(timeout=5)
|
||||
|
||||
|
||||
def test_first_init_connect_is_bounded_when_lock_held(kanban_home, monkeypatch):
|
||||
"""First-init connect must time out the cross-process lock and proceed,
|
||||
not hang forever, when another holder owns it."""
|
||||
monkeypatch.setattr(kb, "_INIT_LOCK_TIMEOUT_SECONDS", 0.6)
|
||||
db_path = kb.kanban_db_path(board="default")
|
||||
|
||||
release, t = _hold_init_lock(db_path)
|
||||
try:
|
||||
start = time.monotonic()
|
||||
conn = kb.connect() # path NOT yet initialized — must take the bounded path
|
||||
conn.close()
|
||||
elapsed = time.monotonic() - start
|
||||
# Proceeded within roughly the timeout window (not unbounded).
|
||||
assert 0.4 <= elapsed < 3.0, f"expected bounded ~0.6s acquire, got {elapsed:.2f}s"
|
||||
assert str(db_path.resolve()) in kb._INITIALIZED_PATHS
|
||||
finally:
|
||||
release.set()
|
||||
t.join(timeout=5)
|
||||
Loading…
Add table
Add a link
Reference in a new issue