From dfa561092a69b611e2ca84881ff0bb96f5360984 Mon Sep 17 00:00:00 2001 From: Sahil Saghir <218421507+Sahil-SS9@users.noreply.github.com> Date: Fri, 12 Jun 2026 14:16:47 +0100 Subject: [PATCH] fix(kanban): machine-global singleton lock for the embedded dispatcher (#41448) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The gateway's embedded dispatcher has no guard against more than one dispatcher running concurrently. dispatch_in_gateway defaults to true, so a second gateway for the same profile (a restart race where the old process is slow to exit) — or any deployment that runs multiple profile gateways with the default — starts a second dispatcher loop. As #41448 describes, concurrent dispatchers each run release_stale_claims() against the same boards, double reclaim frequency, and re-dispatch slow workers before they finish. In practice they also corrupt the shared kanban SQLite DBs under concurrent write load. Add _acquire_singleton_lock(): an exclusive, non-blocking fcntl.flock at the machine-global kanban root (kanban_home()/kanban/.dispatcher.lock — the board is shared across profiles by design, so this serialises every gateway, not just one profile). The first gateway to start its dispatcher holds the lock for its process lifetime; any other gateway finds it contended, logs, and skips dispatching while still running for messaging. Falls back to config-only control on non-POSIX or filesystems without flock. This is more robust than a per-profile guard because the documented model is "one dispatcher sweeps all boards" — the contention is across profiles, not just within one. Closes #41448. Test: lock is exclusive (held, then contended while held, then held again after release). --- gateway/kanban_watchers.py | 58 +++++++++++++++++++++ tests/gateway/test_kanban_watchers_mixin.py | 23 ++++++++ 2 files changed, 81 insertions(+) diff --git a/gateway/kanban_watchers.py b/gateway/kanban_watchers.py index 328cbd7fb5b..b9b224e31fc 100644 --- a/gateway/kanban_watchers.py +++ b/gateway/kanban_watchers.py @@ -23,6 +23,41 @@ from typing import Any, Optional logger = logging.getLogger("gateway.run") +def _acquire_singleton_lock(lock_path) -> "tuple[int | None, str]": + """Take an exclusive, non-blocking advisory lock for the sole dispatcher. + + Only one gateway process machine-wide may run the embedded kanban + dispatcher: concurrent dispatchers each sweep every board and corrupt the + shared kanban SQLite DBs (and double reclaim frequency). The + ``dispatch_in_gateway`` config flag is the primary control; this lock is the + backstop that survives config drift and same-profile restart races. + + Returns ``(fd, "held")`` on success — the caller keeps the fd for the + process lifetime. ``(None, "contended")`` when another process holds it + (caller must NOT dispatch). ``(None, "unavailable")`` when locking cannot be + performed (non-POSIX, or a filesystem without flock) — caller falls back to + config-only control. + """ + try: + import fcntl + except ImportError: + return None, "unavailable" + try: + Path(lock_path).parent.mkdir(parents=True, exist_ok=True) + fd = os.open(str(lock_path), os.O_WRONLY | os.O_CREAT, 0o644) + except OSError: + return None, "unavailable" + try: + fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB) + except BlockingIOError: + os.close(fd) + return None, "contended" + except OSError: + os.close(fd) + return None, "unavailable" + return fd, "held" + + class GatewayKanbanWatchersMixin: """Kanban watcher / notifier / dispatcher loops for GatewayRunner.""" @@ -606,6 +641,29 @@ class GatewayKanbanWatchersMixin: logger.warning("kanban dispatcher: kanban_db not importable; dispatcher disabled") return + # Single-dispatcher backstop. dispatch_in_gateway defaults to true, so a + # new profile gateway (or a same-profile restart race) can silently + # start a second dispatcher; concurrent dispatchers corrupt the shared + # kanban SQLite DBs. The lock lives at the machine-global kanban root + # (shared across profiles by design), so it serialises ALL gateways. + self._kanban_dispatcher_lock_fd = None + _lock_path = _kb.kanban_home() / "kanban" / ".dispatcher.lock" + _lock_fd, _lock_state = _acquire_singleton_lock(_lock_path) + if _lock_state == "contended": + logger.info( + "kanban dispatcher: another gateway already holds the dispatcher " + "lock (%s); this gateway will NOT dispatch.", _lock_path, + ) + return + if _lock_state == "held": + self._kanban_dispatcher_lock_fd = _lock_fd # hold for process lifetime + logger.info("kanban dispatcher: holding singleton dispatcher lock (%s)", _lock_path) + else: + logger.warning( + "kanban dispatcher: advisory lock unavailable at %s; proceeding " + "on config control alone.", _lock_path, + ) + try: interval = float(kanban_cfg.get("dispatch_interval_seconds", 60) or 60) except (ValueError, TypeError): diff --git a/tests/gateway/test_kanban_watchers_mixin.py b/tests/gateway/test_kanban_watchers_mixin.py index e4666e15255..a0ca76d4988 100644 --- a/tests/gateway/test_kanban_watchers_mixin.py +++ b/tests/gateway/test_kanban_watchers_mixin.py @@ -43,3 +43,26 @@ def test_watcher_loops_are_coroutines(): # The two long-running watchers are async loops. assert inspect.iscoroutinefunction(GatewayKanbanWatchersMixin._kanban_notifier_watcher) assert inspect.iscoroutinefunction(GatewayKanbanWatchersMixin._kanban_dispatcher_watcher) + + +def test_singleton_dispatcher_lock_is_exclusive(tmp_path): + """Only one holder of the dispatcher lock at a time — the backstop that + stops concurrent dispatchers corrupting the shared kanban SQLite DBs.""" + import os + + from gateway.kanban_watchers import _acquire_singleton_lock + + lock = tmp_path / "kanban" / ".dispatcher.lock" + + fd1, st1 = _acquire_singleton_lock(lock) + assert st1 == "held" and fd1 is not None + + # A second acquire while the first is held must be refused, not granted. + fd2, st2 = _acquire_singleton_lock(lock) + assert st2 == "contended" and fd2 is None + + # Releasing the first lets a fresh acquire succeed (lock is reusable). + os.close(fd1) + fd3, st3 = _acquire_singleton_lock(lock) + assert st3 == "held" and fd3 is not None + os.close(fd3)