mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-21 10:22:18 +00:00
fix(kanban): cross-platform dispatcher lock + explicit release
Two robustness gaps from community review (#44919): 1. Windows dead-path: replaced bespoke fcntl.flock with gateway.status _try_acquire_file_lock / _release_file_lock — already cross-platform (msvcrt on Windows, fcntl on POSIX). Added _release_singleton_lock helper. 2. Lock fd never released: stored handle is now released explicitly in both exit paths — CancelledError handler and normal while-loop exit. Allows in-process stop/restart (tests, embedded use). Also tightened docstrings — 'corrupt the SQLite DBs' is now specific (wal_autocheckpoint=0 + concurrent manual WAL checkpoints can corrupt index pages), matching the module's own concurrency claims.
This commit is contained in:
parent
dfa561092a
commit
226e9322e1
2 changed files with 58 additions and 33 deletions
|
|
@ -23,39 +23,56 @@ from typing import Any, Optional
|
|||
logger = logging.getLogger("gateway.run")
|
||||
|
||||
|
||||
def _acquire_singleton_lock(lock_path) -> "tuple[int | None, str]":
|
||||
def _acquire_singleton_lock(lock_path) -> "tuple[Optional[object], str]":
|
||||
"""Take an exclusive, non-blocking advisory lock for the sole dispatcher.
|
||||
|
||||
Only one gateway process machine-wide may run the embedded kanban
|
||||
dispatcher: concurrent dispatchers each sweep every board and corrupt the
|
||||
shared kanban SQLite DBs (and double reclaim frequency). The
|
||||
dispatcher: concurrent dispatchers double the reclaim frequency (each
|
||||
runs its own ``release_stale_claims`` → promote → dispatch loop), double
|
||||
claim-attempt events in the event log, and — with ``wal_autocheckpoint=0`` —
|
||||
concurrent manual WAL checkpoints can corrupt index pages. The
|
||||
``dispatch_in_gateway`` config flag is the primary control; this lock is the
|
||||
backstop that survives config drift and same-profile restart races.
|
||||
|
||||
Returns ``(fd, "held")`` on success — the caller keeps the fd for the
|
||||
process lifetime. ``(None, "contended")`` when another process holds it
|
||||
(caller must NOT dispatch). ``(None, "unavailable")`` when locking cannot be
|
||||
performed (non-POSIX, or a filesystem without flock) — caller falls back to
|
||||
config-only control.
|
||||
Delegates to :func:`gateway.status._try_acquire_file_lock` (``fcntl`` on
|
||||
POSIX, ``msvcrt`` on Windows) so the guard is cross-platform.
|
||||
|
||||
Returns ``(handle, "held")`` on success — the caller keeps the file handle
|
||||
for the process lifetime and **must** release it via
|
||||
:func:`_release_singleton_lock` when done. ``(None, "contended")`` when
|
||||
another process holds the lock (caller must NOT dispatch). ``(None,
|
||||
"unavailable")`` when locking cannot be performed (non-POSIX filesystem
|
||||
without flock, or the status.py helpers are unimportable) — caller falls
|
||||
back to config-only control.
|
||||
"""
|
||||
try:
|
||||
import fcntl
|
||||
from gateway.status import _try_acquire_file_lock # deferred; same package
|
||||
except ImportError:
|
||||
return None, "unavailable"
|
||||
try:
|
||||
Path(lock_path).parent.mkdir(parents=True, exist_ok=True)
|
||||
fd = os.open(str(lock_path), os.O_WRONLY | os.O_CREAT, 0o644)
|
||||
handle = open(str(lock_path), "a+")
|
||||
except OSError:
|
||||
return None, "unavailable"
|
||||
try:
|
||||
fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
|
||||
except BlockingIOError:
|
||||
os.close(fd)
|
||||
if not _try_acquire_file_lock(handle):
|
||||
handle.close()
|
||||
return None, "contended"
|
||||
except OSError:
|
||||
os.close(fd)
|
||||
return None, "unavailable"
|
||||
return fd, "held"
|
||||
return handle, "held"
|
||||
|
||||
|
||||
def _release_singleton_lock(handle) -> None:
|
||||
"""Release a dispatcher singleton lock acquired via :func:`_acquire_singleton_lock`."""
|
||||
if handle is None:
|
||||
return
|
||||
try:
|
||||
from gateway.status import _release_file_lock
|
||||
_release_file_lock(handle)
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
handle.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
class GatewayKanbanWatchersMixin:
|
||||
|
|
@ -643,12 +660,14 @@ class GatewayKanbanWatchersMixin:
|
|||
|
||||
# Single-dispatcher backstop. dispatch_in_gateway defaults to true, so a
|
||||
# new profile gateway (or a same-profile restart race) can silently
|
||||
# start a second dispatcher; concurrent dispatchers corrupt the shared
|
||||
# kanban SQLite DBs. The lock lives at the machine-global kanban root
|
||||
# start a second dispatcher; concurrent dispatchers double reclaim
|
||||
# frequency, double claim-attempt events, and — with
|
||||
# wal_autocheckpoint=0 — concurrent manual WAL checkpoints can corrupt
|
||||
# index pages. The lock lives at the machine-global kanban root
|
||||
# (shared across profiles by design), so it serialises ALL gateways.
|
||||
self._kanban_dispatcher_lock_fd = None
|
||||
self._kanban_dispatcher_lock_handle = None
|
||||
_lock_path = _kb.kanban_home() / "kanban" / ".dispatcher.lock"
|
||||
_lock_fd, _lock_state = _acquire_singleton_lock(_lock_path)
|
||||
_lock_handle, _lock_state = _acquire_singleton_lock(_lock_path)
|
||||
if _lock_state == "contended":
|
||||
logger.info(
|
||||
"kanban dispatcher: another gateway already holds the dispatcher "
|
||||
|
|
@ -656,7 +675,7 @@ class GatewayKanbanWatchersMixin:
|
|||
)
|
||||
return
|
||||
if _lock_state == "held":
|
||||
self._kanban_dispatcher_lock_fd = _lock_fd # hold for process lifetime
|
||||
self._kanban_dispatcher_lock_handle = _lock_handle # hold for process lifetime
|
||||
logger.info("kanban dispatcher: holding singleton dispatcher lock (%s)", _lock_path)
|
||||
else:
|
||||
logger.warning(
|
||||
|
|
@ -1110,6 +1129,8 @@ class GatewayKanbanWatchersMixin:
|
|||
last_warn_at = now
|
||||
except asyncio.CancelledError:
|
||||
logger.debug("kanban dispatcher: cancelled")
|
||||
_release_singleton_lock(self._kanban_dispatcher_lock_handle)
|
||||
self._kanban_dispatcher_lock_handle = None
|
||||
raise
|
||||
except Exception:
|
||||
logger.exception("kanban dispatcher: unexpected watcher error")
|
||||
|
|
@ -1120,3 +1141,6 @@ class GatewayKanbanWatchersMixin:
|
|||
while slept < interval and self._running:
|
||||
await asyncio.sleep(min(1.0, interval - slept))
|
||||
slept += 1.0
|
||||
|
||||
_release_singleton_lock(self._kanban_dispatcher_lock_handle)
|
||||
self._kanban_dispatcher_lock_handle = None
|
||||
|
|
|
|||
|
|
@ -47,22 +47,23 @@ def test_watcher_loops_are_coroutines():
|
|||
|
||||
def test_singleton_dispatcher_lock_is_exclusive(tmp_path):
|
||||
"""Only one holder of the dispatcher lock at a time — the backstop that
|
||||
stops concurrent dispatchers corrupting the shared kanban SQLite DBs."""
|
||||
stops concurrent dispatchers double reclaiming and corrupting shared
|
||||
kanban SQLite index pages under wal_autocheckpoint=0."""
|
||||
import os
|
||||
|
||||
from gateway.kanban_watchers import _acquire_singleton_lock
|
||||
from gateway.kanban_watchers import _acquire_singleton_lock, _release_singleton_lock
|
||||
|
||||
lock = tmp_path / "kanban" / ".dispatcher.lock"
|
||||
|
||||
fd1, st1 = _acquire_singleton_lock(lock)
|
||||
assert st1 == "held" and fd1 is not None
|
||||
h1, st1 = _acquire_singleton_lock(lock)
|
||||
assert st1 == "held" and h1 is not None
|
||||
|
||||
# A second acquire while the first is held must be refused, not granted.
|
||||
fd2, st2 = _acquire_singleton_lock(lock)
|
||||
assert st2 == "contended" and fd2 is None
|
||||
h2, st2 = _acquire_singleton_lock(lock)
|
||||
assert st2 == "contended" and h2 is None
|
||||
|
||||
# Releasing the first lets a fresh acquire succeed (lock is reusable).
|
||||
os.close(fd1)
|
||||
fd3, st3 = _acquire_singleton_lock(lock)
|
||||
assert st3 == "held" and fd3 is not None
|
||||
os.close(fd3)
|
||||
_release_singleton_lock(h1)
|
||||
h3, st3 = _acquire_singleton_lock(lock)
|
||||
assert st3 == "held" and h3 is not None
|
||||
_release_singleton_lock(h3)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue