diff --git a/hermes_cli/kanban_db.py b/hermes_cli/kanban_db.py index 808f64ba8a8..721403892c9 100644 --- a/hermes_cli/kanban_db.py +++ b/hermes_cli/kanban_db.py @@ -1240,6 +1240,91 @@ def _cross_process_init_lock(path: Path): handle.close() +@contextlib.contextmanager +def _dispatch_tick_lock(db_path: Path): + """Non-blocking single-writer guard around one dispatcher tick. + + Yields ``True`` when this process holds the board's dispatch lock and + may proceed with the tick, or ``False`` when another process already + holds it (the caller should skip the tick this round). + + Motivation (issue #35240): a ``hermes gateway run --replace`` / + ``gateway restart`` invoked from a shell on a systemd/launchd host can + leave an orphan gateway whose dispatcher escapes the service cgroup, + survives ``systemctl restart``, and becomes a *second* long-lived + writer on the same ``kanban.db``. Two dispatchers that each believe + they own the file both pass SQLite ``busy_timeout`` and then race on + WAL frames — the documented root cause of multi-writer corruption. + The startup guard (``_guard_supervised_gateway_conflict``) blocks the + common way an orphan is born, but this lock is the defense-in-depth + that prevents two dispatchers from ever writing concurrently + *regardless of how the second one got there*. + + The lock is **non-blocking** on purpose: the gateway's async watcher + must never stall on a held lock. A losing dispatcher simply skips its + tick (the winner is making progress on the same board), and tries + again next interval. + + Board-scoped: the lock file is a ``.dispatch.lock`` sibling of the + board's ``kanban.db``, so unrelated boards tick independently. On + platforms without ``fcntl``/``msvcrt`` the guard degrades to a no-op + (yields ``True``) — single-writer enforcement is best-effort and the + orphan-dispatcher scenario is specific to POSIX service managers. + """ + lock_path = db_path.with_name(db_path.name + ".dispatch.lock") + handle = None + acquired = False + try: + lock_path.parent.mkdir(parents=True, exist_ok=True) + handle = lock_path.open("a+b") + if _IS_WINDOWS: + try: + import msvcrt + + handle.seek(0) + locking = getattr(msvcrt, "locking") + # LK_NBLCK = non-blocking exclusive byte-range lock. + nb_lock = getattr(msvcrt, "LK_NBLCK") + locking(handle.fileno(), nb_lock, 1) + acquired = True + except (OSError, AttributeError): + acquired = False + else: + try: + import fcntl + + fcntl.flock(handle.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) + acquired = True + except (BlockingIOError, OSError): + acquired = False + except OSError: + # Could not even open the lock file (permissions, read-only FS). + # Degrade to a no-op so a probe failure never blocks dispatch. + acquired = True + handle = None + try: + yield acquired + finally: + if handle is not None: + try: + if acquired: + if _IS_WINDOWS: + import msvcrt + + handle.seek(0) + locking = getattr(msvcrt, "locking") + unlock_mode = getattr(msvcrt, "LK_UNLCK") + locking(handle.fileno(), unlock_mode, 1) + else: + import fcntl + + fcntl.flock(handle.fileno(), fcntl.LOCK_UN) + except (OSError, AttributeError): + pass + finally: + handle.close() + + def _looks_like_tls_record_at(data: bytes, offset: int) -> bool: """Return True for a TLS record header at ``data[offset:]``.""" if len(data) < offset + 5: @@ -5157,6 +5242,12 @@ class DispatchResult: (EX_TEMPFAIL sentinel exit) and were released back to ``ready`` WITHOUT counting a failure. These never trip the circuit breaker — a long quota window just makes the task bounce cheaply until the window clears.""" + skipped_locked: bool = False + """True when this tick was skipped because another process already held + the board's dispatch lock (issue #35240). A losing dispatcher does no + DB writes this tick — the lock holder is making progress on the same + board. This is the steady-state signal that a single-writer guard is + actively preventing two dispatchers from racing on ``kanban.db``.""" # Bounded registry of recently-reaped worker child exits, populated by the @@ -6352,6 +6443,72 @@ def dispatch_once( board: Optional[str] = None, default_assignee: Optional[str] = None, max_in_progress_per_profile: Optional[int] = None, +) -> DispatchResult: + """Run one dispatcher tick under the board's single-writer lock. + + Thin wrapper around :func:`_dispatch_once_locked`. It acquires a + non-blocking, board-scoped dispatch lock (issue #35240) so that two + dispatchers pointed at the same ``kanban.db`` — e.g. the service- + managed gateway and a shell-spawned orphan that escaped the service + cgroup — can never run a reclaim/spawn/write tick concurrently and + race on WAL frames. The losing dispatcher returns an empty + ``DispatchResult`` with ``skipped_locked=True`` and does no DB writes; + the holder is already making progress on the same board. + + The lock is keyed off the board's resolved DB path, so unrelated + boards tick in parallel. See :func:`_dispatch_tick_lock` for the + cross-process / cross-platform mechanics. + """ + try: + db_path = kanban_db_path(board=board) + except Exception: + # Path resolution should never fail, but if it somehow does we + # must not lose the tick — fall through to an unguarded dispatch + # rather than dropping work. + return _dispatch_once_locked( + conn, + spawn_fn=spawn_fn, + ttl_seconds=ttl_seconds, + dry_run=dry_run, + max_spawn=max_spawn, + max_in_progress=max_in_progress, + failure_limit=failure_limit, + stale_timeout_seconds=stale_timeout_seconds, + board=board, + default_assignee=default_assignee, + max_in_progress_per_profile=max_in_progress_per_profile, + ) + with _dispatch_tick_lock(db_path) as held: + if not held: + return DispatchResult(skipped_locked=True) + return _dispatch_once_locked( + conn, + spawn_fn=spawn_fn, + ttl_seconds=ttl_seconds, + dry_run=dry_run, + max_spawn=max_spawn, + max_in_progress=max_in_progress, + failure_limit=failure_limit, + stale_timeout_seconds=stale_timeout_seconds, + board=board, + default_assignee=default_assignee, + max_in_progress_per_profile=max_in_progress_per_profile, + ) + + +def _dispatch_once_locked( + conn: sqlite3.Connection, + *, + spawn_fn=None, + ttl_seconds: Optional[int] = None, + dry_run: bool = False, + max_spawn: Optional[int] = None, + max_in_progress: Optional[int] = None, + failure_limit: int = DEFAULT_SPAWN_FAILURE_LIMIT, + stale_timeout_seconds: int = 0, + board: Optional[str] = None, + default_assignee: Optional[str] = None, + max_in_progress_per_profile: Optional[int] = None, ) -> DispatchResult: """Run one dispatcher tick. diff --git a/tests/hermes_cli/test_kanban_dispatch_lock.py b/tests/hermes_cli/test_kanban_dispatch_lock.py new file mode 100644 index 00000000000..6acbf2ac216 --- /dev/null +++ b/tests/hermes_cli/test_kanban_dispatch_lock.py @@ -0,0 +1,103 @@ +"""Tests for the kanban dispatcher single-writer lock (issue #35240). + +A ``hermes gateway run --replace`` / ``gateway restart`` from a shell on a +systemd/launchd host can leave an orphan dispatcher that escapes the +service cgroup, survives ``systemctl restart``, and becomes a second +long-lived writer on the same ``kanban.db`` — the documented root cause of +multi-writer SQLite WAL corruption. ``dispatch_once`` now wraps each tick in +a non-blocking, board-scoped dispatch lock so two dispatchers can never run +a reclaim/spawn/write tick concurrently. The losing dispatcher returns an +empty ``DispatchResult`` with ``skipped_locked=True`` and does no DB writes. +""" + +from __future__ import annotations + +from pathlib import Path + +import pytest + +from hermes_cli import kanban_db as kb + + +@pytest.fixture +def kanban_home(tmp_path, monkeypatch): + home = tmp_path / ".hermes" + home.mkdir() + monkeypatch.setenv("HERMES_HOME", str(home)) + monkeypatch.setenv("HERMES_KANBAN_HOME", str(home)) + monkeypatch.setattr(Path, "home", lambda: tmp_path) + db_path = kb.kanban_db_path(board="default") + kb._INITIALIZED_PATHS.discard(str(db_path.resolve())) + kb.init_db() + return home + + +@pytest.fixture +def conn(kanban_home): + with kb.connect() as c: + yield c + + +def test_uncontended_tick_runs_and_is_not_skipped(conn): + """With no other holder, a tick runs normally and skipped_locked is False.""" + kb.create_task(conn, title="t", assignee="w") + result = kb.dispatch_once(conn) + assert result.skipped_locked is False + + +def test_held_lock_skips_the_tick_without_writes(conn): + """While another holder owns the board lock, dispatch_once must skip and + must NOT invoke spawn_fn (no DB writes happen on a skipped tick).""" + kb.create_task(conn, title="t", assignee="w") + db_path = kb.kanban_db_path(board="default") + + spawn_calls: list = [] + + def spy_spawn(task, workspace_path, board=None): + spawn_calls.append(getattr(task, "id", task)) + return 999999 + + # Hold the lock, then attempt a contended tick. + with kb._dispatch_tick_lock(db_path) as held: + assert held is True # we genuinely acquired it + result = kb.dispatch_once(conn, spawn_fn=spy_spawn) + + assert result.skipped_locked is True + assert result.spawned == [] + assert spawn_calls == [], "spawn_fn must not run while the tick is locked out" + + +def test_lock_releases_so_next_tick_runs(conn): + """After the holder releases, the next tick is no longer skipped.""" + kb.create_task(conn, title="t", assignee="w") + db_path = kb.kanban_db_path(board="default") + + with kb._dispatch_tick_lock(db_path) as held: + assert held is True + assert kb.dispatch_once(conn).skipped_locked is True + + # Lock released — a fresh tick proceeds. + assert kb.dispatch_once(conn).skipped_locked is False + + +def test_lock_is_board_scoped(conn): + """Holding board A's dispatch lock must not block a tick on board B — + distinct boards have distinct DB files and tick independently.""" + db_default = kb.kanban_db_path(board="default") + db_other = db_default.with_name("other-board-kanban.db") + + # Two different lock files → both acquirable simultaneously. + with kb._dispatch_tick_lock(db_default) as held_a: + assert held_a is True + with kb._dispatch_tick_lock(db_other) as held_b: + assert held_b is True, "a lock on a different board must be independent" + + +def test_reentrant_same_path_lock_is_exclusive(conn): + """A second acquisition of the SAME board's lock from a sibling context + must report not-held (the flock is exclusive within the host).""" + db_path = kb.kanban_db_path(board="default") + with kb._dispatch_tick_lock(db_path) as held_a: + assert held_a is True + with kb._dispatch_tick_lock(db_path) as held_b: + assert held_b is False, "same-board lock must be exclusive"