From 226e9322e16d78466aa0e59ff6453712988fc2bc Mon Sep 17 00:00:00 2001 From: Sahil Saghir <218421507+Sahil-SS9@users.noreply.github.com> Date: Fri, 12 Jun 2026 19:32:19 +0100 Subject: [PATCH] fix(kanban): cross-platform dispatcher lock + explicit release MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two robustness gaps from community review (#44919): 1. Windows dead-path: replaced bespoke fcntl.flock with gateway.status _try_acquire_file_lock / _release_file_lock — already cross-platform (msvcrt on Windows, fcntl on POSIX). Added _release_singleton_lock helper. 2. Lock fd never released: stored handle is now released explicitly in both exit paths — CancelledError handler and normal while-loop exit. Allows in-process stop/restart (tests, embedded use). Also tightened docstrings — 'corrupt the SQLite DBs' is now specific (wal_autocheckpoint=0 + concurrent manual WAL checkpoints can corrupt index pages), matching the module's own concurrency claims. --- gateway/kanban_watchers.py | 70 ++++++++++++++------- tests/gateway/test_kanban_watchers_mixin.py | 21 ++++--- 2 files changed, 58 insertions(+), 33 deletions(-) diff --git a/gateway/kanban_watchers.py b/gateway/kanban_watchers.py index b9b224e31fc..123b8aafdd5 100644 --- a/gateway/kanban_watchers.py +++ b/gateway/kanban_watchers.py @@ -23,39 +23,56 @@ from typing import Any, Optional logger = logging.getLogger("gateway.run") -def _acquire_singleton_lock(lock_path) -> "tuple[int | None, str]": +def _acquire_singleton_lock(lock_path) -> "tuple[Optional[object], str]": """Take an exclusive, non-blocking advisory lock for the sole dispatcher. Only one gateway process machine-wide may run the embedded kanban - dispatcher: concurrent dispatchers each sweep every board and corrupt the - shared kanban SQLite DBs (and double reclaim frequency). The + dispatcher: concurrent dispatchers double the reclaim frequency (each + runs its own ``release_stale_claims`` → promote → dispatch loop), double + claim-attempt events in the event log, and — with ``wal_autocheckpoint=0`` — + concurrent manual WAL checkpoints can corrupt index pages. The ``dispatch_in_gateway`` config flag is the primary control; this lock is the backstop that survives config drift and same-profile restart races. - Returns ``(fd, "held")`` on success — the caller keeps the fd for the - process lifetime. ``(None, "contended")`` when another process holds it - (caller must NOT dispatch). ``(None, "unavailable")`` when locking cannot be - performed (non-POSIX, or a filesystem without flock) — caller falls back to - config-only control. + Delegates to :func:`gateway.status._try_acquire_file_lock` (``fcntl`` on + POSIX, ``msvcrt`` on Windows) so the guard is cross-platform. + + Returns ``(handle, "held")`` on success — the caller keeps the file handle + for the process lifetime and **must** release it via + :func:`_release_singleton_lock` when done. ``(None, "contended")`` when + another process holds the lock (caller must NOT dispatch). ``(None, + "unavailable")`` when locking cannot be performed (non-POSIX filesystem + without flock, or the status.py helpers are unimportable) — caller falls + back to config-only control. """ try: - import fcntl + from gateway.status import _try_acquire_file_lock # deferred; same package except ImportError: return None, "unavailable" try: Path(lock_path).parent.mkdir(parents=True, exist_ok=True) - fd = os.open(str(lock_path), os.O_WRONLY | os.O_CREAT, 0o644) + handle = open(str(lock_path), "a+") except OSError: return None, "unavailable" - try: - fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB) - except BlockingIOError: - os.close(fd) + if not _try_acquire_file_lock(handle): + handle.close() return None, "contended" - except OSError: - os.close(fd) - return None, "unavailable" - return fd, "held" + return handle, "held" + + +def _release_singleton_lock(handle) -> None: + """Release a dispatcher singleton lock acquired via :func:`_acquire_singleton_lock`.""" + if handle is None: + return + try: + from gateway.status import _release_file_lock + _release_file_lock(handle) + except Exception: + pass + try: + handle.close() + except Exception: + pass class GatewayKanbanWatchersMixin: @@ -643,12 +660,14 @@ class GatewayKanbanWatchersMixin: # Single-dispatcher backstop. dispatch_in_gateway defaults to true, so a # new profile gateway (or a same-profile restart race) can silently - # start a second dispatcher; concurrent dispatchers corrupt the shared - # kanban SQLite DBs. The lock lives at the machine-global kanban root + # start a second dispatcher; concurrent dispatchers double reclaim + # frequency, double claim-attempt events, and — with + # wal_autocheckpoint=0 — concurrent manual WAL checkpoints can corrupt + # index pages. The lock lives at the machine-global kanban root # (shared across profiles by design), so it serialises ALL gateways. - self._kanban_dispatcher_lock_fd = None + self._kanban_dispatcher_lock_handle = None _lock_path = _kb.kanban_home() / "kanban" / ".dispatcher.lock" - _lock_fd, _lock_state = _acquire_singleton_lock(_lock_path) + _lock_handle, _lock_state = _acquire_singleton_lock(_lock_path) if _lock_state == "contended": logger.info( "kanban dispatcher: another gateway already holds the dispatcher " @@ -656,7 +675,7 @@ class GatewayKanbanWatchersMixin: ) return if _lock_state == "held": - self._kanban_dispatcher_lock_fd = _lock_fd # hold for process lifetime + self._kanban_dispatcher_lock_handle = _lock_handle # hold for process lifetime logger.info("kanban dispatcher: holding singleton dispatcher lock (%s)", _lock_path) else: logger.warning( @@ -1110,6 +1129,8 @@ class GatewayKanbanWatchersMixin: last_warn_at = now except asyncio.CancelledError: logger.debug("kanban dispatcher: cancelled") + _release_singleton_lock(self._kanban_dispatcher_lock_handle) + self._kanban_dispatcher_lock_handle = None raise except Exception: logger.exception("kanban dispatcher: unexpected watcher error") @@ -1120,3 +1141,6 @@ class GatewayKanbanWatchersMixin: while slept < interval and self._running: await asyncio.sleep(min(1.0, interval - slept)) slept += 1.0 + + _release_singleton_lock(self._kanban_dispatcher_lock_handle) + self._kanban_dispatcher_lock_handle = None diff --git a/tests/gateway/test_kanban_watchers_mixin.py b/tests/gateway/test_kanban_watchers_mixin.py index a0ca76d4988..061b528e79e 100644 --- a/tests/gateway/test_kanban_watchers_mixin.py +++ b/tests/gateway/test_kanban_watchers_mixin.py @@ -47,22 +47,23 @@ def test_watcher_loops_are_coroutines(): def test_singleton_dispatcher_lock_is_exclusive(tmp_path): """Only one holder of the dispatcher lock at a time — the backstop that - stops concurrent dispatchers corrupting the shared kanban SQLite DBs.""" + stops concurrent dispatchers double reclaiming and corrupting shared + kanban SQLite index pages under wal_autocheckpoint=0.""" import os - from gateway.kanban_watchers import _acquire_singleton_lock + from gateway.kanban_watchers import _acquire_singleton_lock, _release_singleton_lock lock = tmp_path / "kanban" / ".dispatcher.lock" - fd1, st1 = _acquire_singleton_lock(lock) - assert st1 == "held" and fd1 is not None + h1, st1 = _acquire_singleton_lock(lock) + assert st1 == "held" and h1 is not None # A second acquire while the first is held must be refused, not granted. - fd2, st2 = _acquire_singleton_lock(lock) - assert st2 == "contended" and fd2 is None + h2, st2 = _acquire_singleton_lock(lock) + assert st2 == "contended" and h2 is None # Releasing the first lets a fresh acquire succeed (lock is reusable). - os.close(fd1) - fd3, st3 = _acquire_singleton_lock(lock) - assert st3 == "held" and fd3 is not None - os.close(fd3) + _release_singleton_lock(h1) + h3, st3 = _acquire_singleton_lock(lock) + assert st3 == "held" and h3 is not None + _release_singleton_lock(h3)