fix(kanban): machine-global singleton lock for the embedded dispatcher (#41448)

The gateway's embedded dispatcher has no guard against more than one dispatcher
running concurrently. dispatch_in_gateway defaults to true, so a second gateway
for the same profile (a restart race where the old process is slow to exit) — or
any deployment that runs multiple profile gateways with the default — starts a
second dispatcher loop. As #41448 describes, concurrent dispatchers each run
release_stale_claims() against the same boards, double reclaim frequency, and
re-dispatch slow workers before they finish. In practice they also corrupt the
shared kanban SQLite DBs under concurrent write load.

Add _acquire_singleton_lock(): an exclusive, non-blocking fcntl.flock at the
machine-global kanban root (kanban_home()/kanban/.dispatcher.lock — the board is
shared across profiles by design, so this serialises every gateway, not just one
profile). The first gateway to start its dispatcher holds the lock for its
process lifetime; any other gateway finds it contended, logs, and skips
dispatching while still running for messaging. Falls back to config-only control
on non-POSIX or filesystems without flock.

This is more robust than a per-profile guard because the documented model is
"one dispatcher sweeps all boards" — the contention is across profiles, not just
within one. Closes #41448.

Test: lock is exclusive (held, then contended while held, then held again after
release).
This commit is contained in:
Sahil Saghir 2026-06-12 14:16:47 +01:00 committed by Teknium
parent a5e06078b2
commit dfa561092a
2 changed files with 81 additions and 0 deletions

View file

@ -23,6 +23,41 @@ from typing import Any, Optional
logger = logging.getLogger("gateway.run")
def _acquire_singleton_lock(lock_path) -> "tuple[int | None, str]":
"""Take an exclusive, non-blocking advisory lock for the sole dispatcher.
Only one gateway process machine-wide may run the embedded kanban
dispatcher: concurrent dispatchers each sweep every board and corrupt the
shared kanban SQLite DBs (and double reclaim frequency). The
``dispatch_in_gateway`` config flag is the primary control; this lock is the
backstop that survives config drift and same-profile restart races.
Returns ``(fd, "held")`` on success the caller keeps the fd for the
process lifetime. ``(None, "contended")`` when another process holds it
(caller must NOT dispatch). ``(None, "unavailable")`` when locking cannot be
performed (non-POSIX, or a filesystem without flock) caller falls back to
config-only control.
"""
try:
import fcntl
except ImportError:
return None, "unavailable"
try:
Path(lock_path).parent.mkdir(parents=True, exist_ok=True)
fd = os.open(str(lock_path), os.O_WRONLY | os.O_CREAT, 0o644)
except OSError:
return None, "unavailable"
try:
fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
except BlockingIOError:
os.close(fd)
return None, "contended"
except OSError:
os.close(fd)
return None, "unavailable"
return fd, "held"
class GatewayKanbanWatchersMixin:
"""Kanban watcher / notifier / dispatcher loops for GatewayRunner."""
@ -606,6 +641,29 @@ class GatewayKanbanWatchersMixin:
logger.warning("kanban dispatcher: kanban_db not importable; dispatcher disabled")
return
# Single-dispatcher backstop. dispatch_in_gateway defaults to true, so a
# new profile gateway (or a same-profile restart race) can silently
# start a second dispatcher; concurrent dispatchers corrupt the shared
# kanban SQLite DBs. The lock lives at the machine-global kanban root
# (shared across profiles by design), so it serialises ALL gateways.
self._kanban_dispatcher_lock_fd = None
_lock_path = _kb.kanban_home() / "kanban" / ".dispatcher.lock"
_lock_fd, _lock_state = _acquire_singleton_lock(_lock_path)
if _lock_state == "contended":
logger.info(
"kanban dispatcher: another gateway already holds the dispatcher "
"lock (%s); this gateway will NOT dispatch.", _lock_path,
)
return
if _lock_state == "held":
self._kanban_dispatcher_lock_fd = _lock_fd # hold for process lifetime
logger.info("kanban dispatcher: holding singleton dispatcher lock (%s)", _lock_path)
else:
logger.warning(
"kanban dispatcher: advisory lock unavailable at %s; proceeding "
"on config control alone.", _lock_path,
)
try:
interval = float(kanban_cfg.get("dispatch_interval_seconds", 60) or 60)
except (ValueError, TypeError):

View file

@ -43,3 +43,26 @@ def test_watcher_loops_are_coroutines():
# The two long-running watchers are async loops.
assert inspect.iscoroutinefunction(GatewayKanbanWatchersMixin._kanban_notifier_watcher)
assert inspect.iscoroutinefunction(GatewayKanbanWatchersMixin._kanban_dispatcher_watcher)
def test_singleton_dispatcher_lock_is_exclusive(tmp_path):
"""Only one holder of the dispatcher lock at a time — the backstop that
stops concurrent dispatchers corrupting the shared kanban SQLite DBs."""
import os
from gateway.kanban_watchers import _acquire_singleton_lock
lock = tmp_path / "kanban" / ".dispatcher.lock"
fd1, st1 = _acquire_singleton_lock(lock)
assert st1 == "held" and fd1 is not None
# A second acquire while the first is held must be refused, not granted.
fd2, st2 = _acquire_singleton_lock(lock)
assert st2 == "contended" and fd2 is None
# Releasing the first lets a fresh acquire succeed (lock is reusable).
os.close(fd1)
fd3, st3 = _acquire_singleton_lock(lock)
assert st3 == "held" and fd3 is not None
os.close(fd3)