mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-23 10:42:00 +00:00
fix(kanban): single-writer dispatch lock to prevent orphan-dispatcher DB corruption (#50331)
A shell-launched 'hermes gateway run --replace' / 'gateway restart' on a systemd/launchd host can leave an orphan gateway whose kanban dispatcher escapes the service cgroup, survives 'systemctl restart', and becomes a second long-lived writer on the shared kanban.db. Two dispatchers that each believe they own the file both pass SQLite busy_timeout and then race on WAL frames — the documented root cause of multi-writer corruption (issue #35240). The existing _guard_supervised_gateway_conflict startup guard blocks the common way an orphan is born, but does nothing once a second dispatcher already exists. This adds the defense-in-depth: dispatch_once now wraps every tick in a non-blocking, board-scoped flock (_dispatch_tick_lock). A losing dispatcher returns DispatchResult(skipped_locked=True) and does zero DB writes this tick — so two dispatchers can never run a reclaim/spawn/write sequence concurrently regardless of how the second one got there. - Non-blocking (LOCK_NB): never stalls the gateway's async watcher. - Board-scoped: lock file is a .dispatch.lock sibling of each board's kanban.db, so unrelated boards tick in parallel. - POSIX + Windows (fcntl / msvcrt LK_NBLCK), no-op degrade where neither exists — mirrors the existing _cross_process_init_lock pattern. Verified with a real two-process orphan repro: while a separate process holds the lock, dispatch_once skips; after release it runs.
This commit is contained in:
parent
587b5b9ac2
commit
e581740aa1
2 changed files with 260 additions and 0 deletions
|
|
@ -1240,6 +1240,91 @@ def _cross_process_init_lock(path: Path):
|
|||
handle.close()
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def _dispatch_tick_lock(db_path: Path):
|
||||
"""Non-blocking single-writer guard around one dispatcher tick.
|
||||
|
||||
Yields ``True`` when this process holds the board's dispatch lock and
|
||||
may proceed with the tick, or ``False`` when another process already
|
||||
holds it (the caller should skip the tick this round).
|
||||
|
||||
Motivation (issue #35240): a ``hermes gateway run --replace`` /
|
||||
``gateway restart`` invoked from a shell on a systemd/launchd host can
|
||||
leave an orphan gateway whose dispatcher escapes the service cgroup,
|
||||
survives ``systemctl restart``, and becomes a *second* long-lived
|
||||
writer on the same ``kanban.db``. Two dispatchers that each believe
|
||||
they own the file both pass SQLite ``busy_timeout`` and then race on
|
||||
WAL frames — the documented root cause of multi-writer corruption.
|
||||
The startup guard (``_guard_supervised_gateway_conflict``) blocks the
|
||||
common way an orphan is born, but this lock is the defense-in-depth
|
||||
that prevents two dispatchers from ever writing concurrently
|
||||
*regardless of how the second one got there*.
|
||||
|
||||
The lock is **non-blocking** on purpose: the gateway's async watcher
|
||||
must never stall on a held lock. A losing dispatcher simply skips its
|
||||
tick (the winner is making progress on the same board), and tries
|
||||
again next interval.
|
||||
|
||||
Board-scoped: the lock file is a ``.dispatch.lock`` sibling of the
|
||||
board's ``kanban.db``, so unrelated boards tick independently. On
|
||||
platforms without ``fcntl``/``msvcrt`` the guard degrades to a no-op
|
||||
(yields ``True``) — single-writer enforcement is best-effort and the
|
||||
orphan-dispatcher scenario is specific to POSIX service managers.
|
||||
"""
|
||||
lock_path = db_path.with_name(db_path.name + ".dispatch.lock")
|
||||
handle = None
|
||||
acquired = False
|
||||
try:
|
||||
lock_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
handle = lock_path.open("a+b")
|
||||
if _IS_WINDOWS:
|
||||
try:
|
||||
import msvcrt
|
||||
|
||||
handle.seek(0)
|
||||
locking = getattr(msvcrt, "locking")
|
||||
# LK_NBLCK = non-blocking exclusive byte-range lock.
|
||||
nb_lock = getattr(msvcrt, "LK_NBLCK")
|
||||
locking(handle.fileno(), nb_lock, 1)
|
||||
acquired = True
|
||||
except (OSError, AttributeError):
|
||||
acquired = False
|
||||
else:
|
||||
try:
|
||||
import fcntl
|
||||
|
||||
fcntl.flock(handle.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
|
||||
acquired = True
|
||||
except (BlockingIOError, OSError):
|
||||
acquired = False
|
||||
except OSError:
|
||||
# Could not even open the lock file (permissions, read-only FS).
|
||||
# Degrade to a no-op so a probe failure never blocks dispatch.
|
||||
acquired = True
|
||||
handle = None
|
||||
try:
|
||||
yield acquired
|
||||
finally:
|
||||
if handle is not None:
|
||||
try:
|
||||
if acquired:
|
||||
if _IS_WINDOWS:
|
||||
import msvcrt
|
||||
|
||||
handle.seek(0)
|
||||
locking = getattr(msvcrt, "locking")
|
||||
unlock_mode = getattr(msvcrt, "LK_UNLCK")
|
||||
locking(handle.fileno(), unlock_mode, 1)
|
||||
else:
|
||||
import fcntl
|
||||
|
||||
fcntl.flock(handle.fileno(), fcntl.LOCK_UN)
|
||||
except (OSError, AttributeError):
|
||||
pass
|
||||
finally:
|
||||
handle.close()
|
||||
|
||||
|
||||
def _looks_like_tls_record_at(data: bytes, offset: int) -> bool:
|
||||
"""Return True for a TLS record header at ``data[offset:]``."""
|
||||
if len(data) < offset + 5:
|
||||
|
|
@ -5157,6 +5242,12 @@ class DispatchResult:
|
|||
(EX_TEMPFAIL sentinel exit) and were released back to ``ready`` WITHOUT
|
||||
counting a failure. These never trip the circuit breaker — a long quota
|
||||
window just makes the task bounce cheaply until the window clears."""
|
||||
skipped_locked: bool = False
|
||||
"""True when this tick was skipped because another process already held
|
||||
the board's dispatch lock (issue #35240). A losing dispatcher does no
|
||||
DB writes this tick — the lock holder is making progress on the same
|
||||
board. This is the steady-state signal that a single-writer guard is
|
||||
actively preventing two dispatchers from racing on ``kanban.db``."""
|
||||
|
||||
|
||||
# Bounded registry of recently-reaped worker child exits, populated by the
|
||||
|
|
@ -6352,6 +6443,72 @@ def dispatch_once(
|
|||
board: Optional[str] = None,
|
||||
default_assignee: Optional[str] = None,
|
||||
max_in_progress_per_profile: Optional[int] = None,
|
||||
) -> DispatchResult:
|
||||
"""Run one dispatcher tick under the board's single-writer lock.
|
||||
|
||||
Thin wrapper around :func:`_dispatch_once_locked`. It acquires a
|
||||
non-blocking, board-scoped dispatch lock (issue #35240) so that two
|
||||
dispatchers pointed at the same ``kanban.db`` — e.g. the service-
|
||||
managed gateway and a shell-spawned orphan that escaped the service
|
||||
cgroup — can never run a reclaim/spawn/write tick concurrently and
|
||||
race on WAL frames. The losing dispatcher returns an empty
|
||||
``DispatchResult`` with ``skipped_locked=True`` and does no DB writes;
|
||||
the holder is already making progress on the same board.
|
||||
|
||||
The lock is keyed off the board's resolved DB path, so unrelated
|
||||
boards tick in parallel. See :func:`_dispatch_tick_lock` for the
|
||||
cross-process / cross-platform mechanics.
|
||||
"""
|
||||
try:
|
||||
db_path = kanban_db_path(board=board)
|
||||
except Exception:
|
||||
# Path resolution should never fail, but if it somehow does we
|
||||
# must not lose the tick — fall through to an unguarded dispatch
|
||||
# rather than dropping work.
|
||||
return _dispatch_once_locked(
|
||||
conn,
|
||||
spawn_fn=spawn_fn,
|
||||
ttl_seconds=ttl_seconds,
|
||||
dry_run=dry_run,
|
||||
max_spawn=max_spawn,
|
||||
max_in_progress=max_in_progress,
|
||||
failure_limit=failure_limit,
|
||||
stale_timeout_seconds=stale_timeout_seconds,
|
||||
board=board,
|
||||
default_assignee=default_assignee,
|
||||
max_in_progress_per_profile=max_in_progress_per_profile,
|
||||
)
|
||||
with _dispatch_tick_lock(db_path) as held:
|
||||
if not held:
|
||||
return DispatchResult(skipped_locked=True)
|
||||
return _dispatch_once_locked(
|
||||
conn,
|
||||
spawn_fn=spawn_fn,
|
||||
ttl_seconds=ttl_seconds,
|
||||
dry_run=dry_run,
|
||||
max_spawn=max_spawn,
|
||||
max_in_progress=max_in_progress,
|
||||
failure_limit=failure_limit,
|
||||
stale_timeout_seconds=stale_timeout_seconds,
|
||||
board=board,
|
||||
default_assignee=default_assignee,
|
||||
max_in_progress_per_profile=max_in_progress_per_profile,
|
||||
)
|
||||
|
||||
|
||||
def _dispatch_once_locked(
|
||||
conn: sqlite3.Connection,
|
||||
*,
|
||||
spawn_fn=None,
|
||||
ttl_seconds: Optional[int] = None,
|
||||
dry_run: bool = False,
|
||||
max_spawn: Optional[int] = None,
|
||||
max_in_progress: Optional[int] = None,
|
||||
failure_limit: int = DEFAULT_SPAWN_FAILURE_LIMIT,
|
||||
stale_timeout_seconds: int = 0,
|
||||
board: Optional[str] = None,
|
||||
default_assignee: Optional[str] = None,
|
||||
max_in_progress_per_profile: Optional[int] = None,
|
||||
) -> DispatchResult:
|
||||
"""Run one dispatcher tick.
|
||||
|
||||
|
|
|
|||
103
tests/hermes_cli/test_kanban_dispatch_lock.py
Normal file
103
tests/hermes_cli/test_kanban_dispatch_lock.py
Normal file
|
|
@ -0,0 +1,103 @@
|
|||
"""Tests for the kanban dispatcher single-writer lock (issue #35240).
|
||||
|
||||
A ``hermes gateway run --replace`` / ``gateway restart`` from a shell on a
|
||||
systemd/launchd host can leave an orphan dispatcher that escapes the
|
||||
service cgroup, survives ``systemctl restart``, and becomes a second
|
||||
long-lived writer on the same ``kanban.db`` — the documented root cause of
|
||||
multi-writer SQLite WAL corruption. ``dispatch_once`` now wraps each tick in
|
||||
a non-blocking, board-scoped dispatch lock so two dispatchers can never run
|
||||
a reclaim/spawn/write tick concurrently. The losing dispatcher returns an
|
||||
empty ``DispatchResult`` with ``skipped_locked=True`` and does no DB writes.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from hermes_cli import kanban_db as kb
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def kanban_home(tmp_path, monkeypatch):
|
||||
home = tmp_path / ".hermes"
|
||||
home.mkdir()
|
||||
monkeypatch.setenv("HERMES_HOME", str(home))
|
||||
monkeypatch.setenv("HERMES_KANBAN_HOME", str(home))
|
||||
monkeypatch.setattr(Path, "home", lambda: tmp_path)
|
||||
db_path = kb.kanban_db_path(board="default")
|
||||
kb._INITIALIZED_PATHS.discard(str(db_path.resolve()))
|
||||
kb.init_db()
|
||||
return home
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def conn(kanban_home):
|
||||
with kb.connect() as c:
|
||||
yield c
|
||||
|
||||
|
||||
def test_uncontended_tick_runs_and_is_not_skipped(conn):
|
||||
"""With no other holder, a tick runs normally and skipped_locked is False."""
|
||||
kb.create_task(conn, title="t", assignee="w")
|
||||
result = kb.dispatch_once(conn)
|
||||
assert result.skipped_locked is False
|
||||
|
||||
|
||||
def test_held_lock_skips_the_tick_without_writes(conn):
|
||||
"""While another holder owns the board lock, dispatch_once must skip and
|
||||
must NOT invoke spawn_fn (no DB writes happen on a skipped tick)."""
|
||||
kb.create_task(conn, title="t", assignee="w")
|
||||
db_path = kb.kanban_db_path(board="default")
|
||||
|
||||
spawn_calls: list = []
|
||||
|
||||
def spy_spawn(task, workspace_path, board=None):
|
||||
spawn_calls.append(getattr(task, "id", task))
|
||||
return 999999
|
||||
|
||||
# Hold the lock, then attempt a contended tick.
|
||||
with kb._dispatch_tick_lock(db_path) as held:
|
||||
assert held is True # we genuinely acquired it
|
||||
result = kb.dispatch_once(conn, spawn_fn=spy_spawn)
|
||||
|
||||
assert result.skipped_locked is True
|
||||
assert result.spawned == []
|
||||
assert spawn_calls == [], "spawn_fn must not run while the tick is locked out"
|
||||
|
||||
|
||||
def test_lock_releases_so_next_tick_runs(conn):
|
||||
"""After the holder releases, the next tick is no longer skipped."""
|
||||
kb.create_task(conn, title="t", assignee="w")
|
||||
db_path = kb.kanban_db_path(board="default")
|
||||
|
||||
with kb._dispatch_tick_lock(db_path) as held:
|
||||
assert held is True
|
||||
assert kb.dispatch_once(conn).skipped_locked is True
|
||||
|
||||
# Lock released — a fresh tick proceeds.
|
||||
assert kb.dispatch_once(conn).skipped_locked is False
|
||||
|
||||
|
||||
def test_lock_is_board_scoped(conn):
|
||||
"""Holding board A's dispatch lock must not block a tick on board B —
|
||||
distinct boards have distinct DB files and tick independently."""
|
||||
db_default = kb.kanban_db_path(board="default")
|
||||
db_other = db_default.with_name("other-board-kanban.db")
|
||||
|
||||
# Two different lock files → both acquirable simultaneously.
|
||||
with kb._dispatch_tick_lock(db_default) as held_a:
|
||||
assert held_a is True
|
||||
with kb._dispatch_tick_lock(db_other) as held_b:
|
||||
assert held_b is True, "a lock on a different board must be independent"
|
||||
|
||||
|
||||
def test_reentrant_same_path_lock_is_exclusive(conn):
|
||||
"""A second acquisition of the SAME board's lock from a sibling context
|
||||
must report not-held (the flock is exclusive within the host)."""
|
||||
db_path = kb.kanban_db_path(board="default")
|
||||
with kb._dispatch_tick_lock(db_path) as held_a:
|
||||
assert held_a is True
|
||||
with kb._dispatch_tick_lock(db_path) as held_b:
|
||||
assert held_b is False, "same-board lock must be exclusive"
|
||||
Loading…
Add table
Add a link
Reference in a new issue