diff --git a/gateway/kanban_watchers.py b/gateway/kanban_watchers.py index 328cbd7fb5b..b9b224e31fc 100644 --- a/gateway/kanban_watchers.py +++ b/gateway/kanban_watchers.py @@ -23,6 +23,41 @@ from typing import Any, Optional logger = logging.getLogger("gateway.run") +def _acquire_singleton_lock(lock_path) -> "tuple[int | None, str]": + """Take an exclusive, non-blocking advisory lock for the sole dispatcher. + + Only one gateway process machine-wide may run the embedded kanban + dispatcher: concurrent dispatchers each sweep every board and corrupt the + shared kanban SQLite DBs (and double reclaim frequency). The + ``dispatch_in_gateway`` config flag is the primary control; this lock is the + backstop that survives config drift and same-profile restart races. + + Returns ``(fd, "held")`` on success — the caller keeps the fd for the + process lifetime. ``(None, "contended")`` when another process holds it + (caller must NOT dispatch). ``(None, "unavailable")`` when locking cannot be + performed (non-POSIX, or a filesystem without flock) — caller falls back to + config-only control. + """ + try: + import fcntl + except ImportError: + return None, "unavailable" + try: + Path(lock_path).parent.mkdir(parents=True, exist_ok=True) + fd = os.open(str(lock_path), os.O_WRONLY | os.O_CREAT, 0o644) + except OSError: + return None, "unavailable" + try: + fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB) + except BlockingIOError: + os.close(fd) + return None, "contended" + except OSError: + os.close(fd) + return None, "unavailable" + return fd, "held" + + class GatewayKanbanWatchersMixin: """Kanban watcher / notifier / dispatcher loops for GatewayRunner.""" @@ -606,6 +641,29 @@ class GatewayKanbanWatchersMixin: logger.warning("kanban dispatcher: kanban_db not importable; dispatcher disabled") return + # Single-dispatcher backstop. dispatch_in_gateway defaults to true, so a + # new profile gateway (or a same-profile restart race) can silently + # start a second dispatcher; concurrent dispatchers corrupt the shared + # kanban SQLite DBs. The lock lives at the machine-global kanban root + # (shared across profiles by design), so it serialises ALL gateways. + self._kanban_dispatcher_lock_fd = None + _lock_path = _kb.kanban_home() / "kanban" / ".dispatcher.lock" + _lock_fd, _lock_state = _acquire_singleton_lock(_lock_path) + if _lock_state == "contended": + logger.info( + "kanban dispatcher: another gateway already holds the dispatcher " + "lock (%s); this gateway will NOT dispatch.", _lock_path, + ) + return + if _lock_state == "held": + self._kanban_dispatcher_lock_fd = _lock_fd # hold for process lifetime + logger.info("kanban dispatcher: holding singleton dispatcher lock (%s)", _lock_path) + else: + logger.warning( + "kanban dispatcher: advisory lock unavailable at %s; proceeding " + "on config control alone.", _lock_path, + ) + try: interval = float(kanban_cfg.get("dispatch_interval_seconds", 60) or 60) except (ValueError, TypeError): diff --git a/tests/gateway/test_kanban_watchers_mixin.py b/tests/gateway/test_kanban_watchers_mixin.py index e4666e15255..a0ca76d4988 100644 --- a/tests/gateway/test_kanban_watchers_mixin.py +++ b/tests/gateway/test_kanban_watchers_mixin.py @@ -43,3 +43,26 @@ def test_watcher_loops_are_coroutines(): # The two long-running watchers are async loops. assert inspect.iscoroutinefunction(GatewayKanbanWatchersMixin._kanban_notifier_watcher) assert inspect.iscoroutinefunction(GatewayKanbanWatchersMixin._kanban_dispatcher_watcher) + + +def test_singleton_dispatcher_lock_is_exclusive(tmp_path): + """Only one holder of the dispatcher lock at a time — the backstop that + stops concurrent dispatchers corrupting the shared kanban SQLite DBs.""" + import os + + from gateway.kanban_watchers import _acquire_singleton_lock + + lock = tmp_path / "kanban" / ".dispatcher.lock" + + fd1, st1 = _acquire_singleton_lock(lock) + assert st1 == "held" and fd1 is not None + + # A second acquire while the first is held must be refused, not granted. + fd2, st2 = _acquire_singleton_lock(lock) + assert st2 == "contended" and fd2 is None + + # Releasing the first lets a fresh acquire succeed (lock is reusable). + os.close(fd1) + fd3, st3 = _acquire_singleton_lock(lock) + assert st3 == "held" and fd3 is not None + os.close(fd3)