From 861ce7c0b6743064a65864b0ea9e9bb725e98a8e Mon Sep 17 00:00:00 2001 From: Mike Nguyen Date: Sat, 9 May 2026 12:41:31 +0000 Subject: [PATCH] fix: dedupe kanban notifier delivery claims --- gateway/run.py | 100 ++++++++++++- hermes_cli/kanban_db.py | 80 ++++++++++ tests/conftest.py | 49 +++++++ tests/gateway/test_kanban_notifier.py | 138 ++++++++++++++++++ .../test_kanban_core_functionality.py | 51 +++++++ 5 files changed, 411 insertions(+), 7 deletions(-) create mode 100644 tests/gateway/test_kanban_notifier.py diff --git a/gateway/run.py b/gateway/run.py index ec2fdcb249e..c9400fdfdd9 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -4119,18 +4119,42 @@ class GatewayRunner: try: def _collect(): deliveries: list[dict] = [] - # Enumerate every board on disk. Cheap: a few - # directory stat calls per tick. Missing/empty - # boards are silently skipped. + active_platforms = { + getattr(platform, "value", str(platform)).lower() + for platform in self.adapters.keys() + } + if not active_platforms: + logger.debug("kanban notifier: no connected adapters; skipping tick") + return deliveries + + # Enumerate every board on disk, but poll each resolved DB + # path once. Multiple slugs can point at the same DB when + # HERMES_KANBAN_DB pins the board path; without this guard + # one gateway could collect the same subscription/event + # more than once before advancing the cursor. try: boards = _kb.list_boards(include_archived=False) except Exception: boards = [_kb.read_board_metadata(_kb.DEFAULT_BOARD)] + seen_db_paths: set[str] = set() for board_meta in boards: slug = board_meta.get("slug") or _kb.DEFAULT_BOARD + db_path = board_meta.get("db_path") + try: + resolved_db_path = str(Path(db_path).expanduser().resolve()) if db_path else str(_kb.kanban_db_path(slug).resolve()) + except Exception: + resolved_db_path = f"slug:{slug}" + if resolved_db_path in seen_db_paths: + logger.debug( + "kanban notifier: skipping duplicate board slug %s for DB %s", + slug, resolved_db_path, + ) + continue + seen_db_paths.add(resolved_db_path) try: conn = _kb.connect(board=slug) - except Exception: + except Exception as exc: + logger.debug("kanban notifier: cannot open board %s: %s", slug, exc) continue try: # `connect()` runs the schema + idempotent migration @@ -4146,8 +4170,17 @@ class GatewayRunner: # tolerates that race, but we still skip the # redundant call to avoid the wasted work. subs = _kb.list_notify_subs(conn) + if not subs: + logger.debug("kanban notifier: board %s has no subscriptions", slug) for sub in subs: - cursor, events = _kb.unseen_events_for_sub( + platform = (sub.get("platform") or "").lower() + if platform not in active_platforms: + logger.debug( + "kanban notifier: subscription for %s on %s skipped; adapter not connected", + sub.get("task_id"), platform or "", + ) + continue + old_cursor, cursor, events = _kb.claim_unseen_events_for_sub( conn, task_id=sub["task_id"], platform=sub["platform"], @@ -4158,8 +4191,13 @@ class GatewayRunner: if not events: continue task = _kb.get_task(conn, sub["task_id"]) + logger.debug( + "kanban notifier: claimed %d event(s) for %s on board %s cursor %s→%s", + len(events), sub["task_id"], slug, old_cursor, cursor, + ) deliveries.append({ "sub": sub, + "old_cursor": old_cursor, "cursor": cursor, "events": events, "task": task, @@ -4186,7 +4224,18 @@ class GatewayRunner: continue adapter = self.adapters.get(plat) if adapter is None: - continue # platform not currently connected + logger.debug( + "kanban notifier: adapter %s disconnected before delivery for %s; rewinding claim", + platform_str, sub["task_id"], + ) + await asyncio.to_thread( + self._kanban_rewind, + sub, + d["cursor"], + d.get("old_cursor", 0), + board_slug, + ) + continue title = (task.title if task else sub["task_id"])[:120] for ev in d["events"]: kind = ev.kind @@ -4254,6 +4303,10 @@ class GatewayRunner: await adapter.send( sub["chat_id"], msg, metadata=metadata, ) + logger.info( + "kanban notifier: delivered %s event for %s to %s/%s on board %s", + kind, sub["task_id"], platform_str, sub["chat_id"], board_slug, + ) # Reset the failure counter on success. sub_fail_counts.pop(sub_key, None) except Exception as exc: @@ -4273,7 +4326,17 @@ class GatewayRunner: ) await asyncio.to_thread(self._kanban_unsub, sub, board_slug) sub_fail_counts.pop(sub_key, None) - # Don't advance cursor on send failure — retry next tick. + else: + await asyncio.to_thread( + self._kanban_rewind, + sub, + d["cursor"], + d.get("old_cursor", 0), + board_slug, + ) + # Rewind the pre-send claim on transient failure so + # a later tick can retry. After too many failures, + # dropping the subscription is the terminal action. break else: # All events delivered; advance cursor + maybe unsub. @@ -4336,6 +4399,29 @@ class GatewayRunner: finally: conn.close() + def _kanban_rewind( + self, + sub: dict, + claimed_cursor: int, + old_cursor: int, + board: Optional[str] = None, + ) -> None: + """Sync helper: undo a claimed notification cursor after send failure.""" + from hermes_cli import kanban_db as _kb + conn = _kb.connect(board=board) + try: + _kb.rewind_notify_cursor( + conn, + task_id=sub["task_id"], + platform=sub["platform"], + chat_id=sub["chat_id"], + thread_id=sub.get("thread_id") or "", + claimed_cursor=claimed_cursor, + old_cursor=old_cursor, + ) + finally: + conn.close() + async def _kanban_dispatcher_watcher(self) -> None: """Embedded kanban dispatcher — one tick every `dispatch_interval_seconds`. diff --git a/hermes_cli/kanban_db.py b/hermes_cli/kanban_db.py index 5ee6f28385e..1e50b97dd68 100644 --- a/hermes_cli/kanban_db.py +++ b/hermes_cli/kanban_db.py @@ -4368,6 +4368,57 @@ def unseen_events_for_sub( return max_id, out +def claim_unseen_events_for_sub( + conn: sqlite3.Connection, + *, + task_id: str, + platform: str, + chat_id: str, + thread_id: Optional[str] = None, + kinds: Optional[Iterable[str]] = None, +) -> tuple[int, int, list[Event]]: + """Atomically claim unseen notification events for one subscription. + + Returns ``(old_cursor, new_cursor, events)``. When events are returned, + ``kanban_notify_subs.last_event_id`` has already been advanced to + ``new_cursor`` inside a ``BEGIN IMMEDIATE`` transaction. That makes the + notifier's read/claim step single-owner across multiple gateway watcher + processes pointed at the same board DB: concurrent watchers serialize on + SQLite's writer lock, and only the first process sees and claims a given + event range. + + Callers should send the claimed events, then either leave the cursor at + ``new_cursor`` on success or call :func:`rewind_notify_cursor` if delivery + failed before any terminal unsubscribe removed the row. + """ + with write_txn(conn): + row = conn.execute( + "SELECT last_event_id FROM kanban_notify_subs " + "WHERE task_id = ? AND platform = ? AND chat_id = ? AND thread_id = ?", + (task_id, platform, chat_id, thread_id or ""), + ).fetchone() + if row is None: + return 0, 0, [] + old_cursor = int(row["last_event_id"]) + new_cursor, events = unseen_events_for_sub( + conn, + task_id=task_id, + platform=platform, + chat_id=chat_id, + thread_id=thread_id, + kinds=kinds, + ) + if not events: + return old_cursor, old_cursor, [] + conn.execute( + "UPDATE kanban_notify_subs SET last_event_id = ? " + "WHERE task_id = ? AND platform = ? AND chat_id = ? AND thread_id = ? " + "AND last_event_id = ?", + (int(new_cursor), task_id, platform, chat_id, thread_id or "", int(old_cursor)), + ) + return old_cursor, new_cursor, events + + def advance_notify_cursor( conn: sqlite3.Connection, *, @@ -4385,6 +4436,35 @@ def advance_notify_cursor( ) +def rewind_notify_cursor( + conn: sqlite3.Connection, + *, + task_id: str, + platform: str, + chat_id: str, + thread_id: Optional[str] = None, + claimed_cursor: int, + old_cursor: int, +) -> bool: + """Undo a notification claim when delivery fails. + + The CAS guard only rewinds if no later notifier advanced the row after our + claim. This keeps retry behavior for transient send failures without + clobbering newer progress. + """ + with write_txn(conn): + cur = conn.execute( + "UPDATE kanban_notify_subs SET last_event_id = ? " + "WHERE task_id = ? AND platform = ? AND chat_id = ? AND thread_id = ? " + "AND last_event_id = ?", + ( + int(old_cursor), task_id, platform, chat_id, thread_id or "", + int(claimed_cursor), + ), + ) + return cur.rowcount > 0 + + # --------------------------------------------------------------------------- # Retention + garbage collection # --------------------------------------------------------------------------- diff --git a/tests/conftest.py b/tests/conftest.py index 651a48b3916..669ad186102 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -188,6 +188,16 @@ _HERMES_BEHAVIORAL_VARS = frozenset({ "HERMES_BACKGROUND_NOTIFICATIONS", "HERMES_EXEC_ASK", "HERMES_HOME_MODE", + # Kanban path/board pins must never leak from a developer shell or + # dispatched worker into tests; otherwise tests can write fake tasks to + # the real ~/.hermes/kanban.db instead of the per-test HERMES_HOME. + "HERMES_KANBAN_DB", + "HERMES_KANBAN_BOARD", + "HERMES_KANBAN_WORKSPACES_ROOT", + "HERMES_KANBAN_LOGS_ROOT", + "HERMES_KANBAN_TASK", + "HERMES_KANBAN_WORKSPACE", + "HERMES_TENANT", "TERMINAL_CWD", "TERMINAL_ENV", "TERMINAL_VERCEL_RUNTIME", @@ -223,6 +233,45 @@ _HERMES_BEHAVIORAL_VARS = frozenset({ "SIGNAL_ALLOW_ALL_USERS", "EMAIL_ALLOW_ALL_USERS", "SMS_ALLOW_ALL_USERS", + # Gateway home channels are set by /sethome in real profiles. Tests that + # exercise dashboard notification toggles must opt in explicitly or they + # can accidentally subscribe against a developer's real home channel. + "TELEGRAM_HOME_CHANNEL", + "TELEGRAM_HOME_CHANNEL_THREAD_ID", + "TELEGRAM_HOME_CHANNEL_NAME", + "DISCORD_HOME_CHANNEL", + "DISCORD_HOME_CHANNEL_THREAD_ID", + "DISCORD_HOME_CHANNEL_NAME", + "SLACK_HOME_CHANNEL", + "SLACK_HOME_CHANNEL_THREAD_ID", + "SLACK_HOME_CHANNEL_NAME", + "WHATSAPP_HOME_CHANNEL", + "WHATSAPP_HOME_CHANNEL_THREAD_ID", + "WHATSAPP_HOME_CHANNEL_NAME", + "SIGNAL_HOME_CHANNEL", + "SIGNAL_HOME_CHANNEL_THREAD_ID", + "SIGNAL_HOME_CHANNEL_NAME", + "EMAIL_HOME_CHANNEL", + "EMAIL_HOME_CHANNEL_THREAD_ID", + "EMAIL_HOME_CHANNEL_NAME", + "SMS_HOME_CHANNEL", + "SMS_HOME_CHANNEL_THREAD_ID", + "SMS_HOME_CHANNEL_NAME", + "MATTERMOST_HOME_CHANNEL", + "MATTERMOST_HOME_CHANNEL_THREAD_ID", + "MATTERMOST_HOME_CHANNEL_NAME", + "MATRIX_HOME_CHANNEL", + "MATRIX_HOME_CHANNEL_THREAD_ID", + "MATRIX_HOME_CHANNEL_NAME", + "DINGTALK_HOME_CHANNEL", + "DINGTALK_HOME_CHANNEL_THREAD_ID", + "DINGTALK_HOME_CHANNEL_NAME", + "FEISHU_HOME_CHANNEL", + "FEISHU_HOME_CHANNEL_THREAD_ID", + "FEISHU_HOME_CHANNEL_NAME", + "WECOM_HOME_CHANNEL", + "WECOM_HOME_CHANNEL_THREAD_ID", + "WECOM_HOME_CHANNEL_NAME", # Platform gating — set by load_gateway_config() as a side effect when # a config.yaml is present, so individual test bodies that call the # loader leak these values into later tests on the same xdist worker. diff --git a/tests/gateway/test_kanban_notifier.py b/tests/gateway/test_kanban_notifier.py new file mode 100644 index 00000000000..f09c6acb967 --- /dev/null +++ b/tests/gateway/test_kanban_notifier.py @@ -0,0 +1,138 @@ +import asyncio +from pathlib import Path + +import pytest + +from gateway.config import Platform +from gateway.run import GatewayRunner +from hermes_cli import kanban_db as kb + + +class RecordingAdapter: + def __init__(self): + self.sent = [] + + async def send(self, chat_id, text, metadata=None): + self.sent.append({"chat_id": chat_id, "text": text, "metadata": metadata or {}}) + + +class DisconnectedAdapters(dict): + """Expose a platform during collection, then simulate disconnect on get().""" + + def get(self, key, default=None): + return None + + +async def _run_one_notifier_tick(monkeypatch, runner): + real_sleep = asyncio.sleep + + async def fake_sleep(delay): + if delay == 5: + return None + runner._running = False + await real_sleep(0) + + monkeypatch.setattr(asyncio, "sleep", fake_sleep) + await runner._kanban_notifier_watcher(interval=1) + + +def _make_runner(adapter): + runner = GatewayRunner.__new__(GatewayRunner) + runner._running = True + runner.adapters = {Platform.TELEGRAM: adapter} + runner._kanban_sub_fail_counts = {} + return runner + + +def _create_completed_subscription(summary="done once"): + conn = kb.connect() + try: + tid = kb.create_task(conn, title="notify once", assignee="worker") + kb.add_notify_sub(conn, task_id=tid, platform="telegram", chat_id="chat-1") + kb.complete_task(conn, tid, summary=summary) + return tid + finally: + conn.close() + + +def _unseen_terminal_events(tid): + conn = kb.connect() + try: + _, events = kb.unseen_events_for_sub( + conn, + task_id=tid, + platform="telegram", + chat_id="chat-1", + kinds=["completed", "blocked", "gave_up", "crashed", "timed_out"], + ) + return events + finally: + conn.close() + + +def test_kanban_notifier_dedupes_board_slugs_pointing_to_same_db(tmp_path, monkeypatch): + db_path = tmp_path / "shared-kanban.db" + monkeypatch.setenv("HERMES_KANBAN_DB", str(db_path)) + kb.init_db() + kb.write_board_metadata("alias-a", name="Alias A") + kb.write_board_metadata("alias-b", name="Alias B") + + tid = _create_completed_subscription() + + adapter = RecordingAdapter() + runner = _make_runner(adapter) + + asyncio.run(_run_one_notifier_tick(monkeypatch, runner)) + + assert len(adapter.sent) == 1 + assert "Kanban" in adapter.sent[0]["text"] + assert tid in adapter.sent[0]["text"] + + +def test_kanban_notifier_claim_prevents_second_watcher_send(tmp_path, monkeypatch): + db_path = tmp_path / "single-owner.db" + monkeypatch.setenv("HERMES_KANBAN_DB", str(db_path)) + kb.init_db() + + tid = _create_completed_subscription() + + adapter1 = RecordingAdapter() + adapter2 = RecordingAdapter() + + asyncio.run(_run_one_notifier_tick(monkeypatch, _make_runner(adapter1))) + asyncio.run(_run_one_notifier_tick(monkeypatch, _make_runner(adapter2))) + + assert len(adapter1.sent) == 1 + assert adapter2.sent == [] + + +def test_kanban_notifier_rewinds_claim_if_adapter_disconnects(tmp_path, monkeypatch): + db_path = tmp_path / "adapter-disconnect.db" + monkeypatch.setenv("HERMES_KANBAN_DB", str(db_path)) + kb.init_db() + tid = _create_completed_subscription() + + runner = GatewayRunner.__new__(GatewayRunner) + runner._running = True + runner.adapters = DisconnectedAdapters({Platform.TELEGRAM: RecordingAdapter()}) + runner._kanban_sub_fail_counts = {} + + asyncio.run(_run_one_notifier_tick(monkeypatch, runner)) + + assert [ev.kind for ev in _unseen_terminal_events(tid)] == ["completed"] + + +def test_kanban_db_path_is_test_isolated_from_real_home(): + hermes_home = Path(kb.kanban_home()) + production_db = Path.home() / ".hermes" / "kanban.db" + assert kb.kanban_db_path().resolve() != production_db.resolve() + + conn = kb.connect() + try: + tid = kb.create_task(conn, title="x", assignee="worker") + kb.add_notify_sub(conn, task_id=tid, platform="telegram", chat_id="chat-1") + finally: + conn.close() + + assert kb.kanban_db_path().resolve().is_relative_to(hermes_home.resolve()) + assert kb.kanban_db_path().resolve() != production_db.resolve() diff --git a/tests/hermes_cli/test_kanban_core_functionality.py b/tests/hermes_cli/test_kanban_core_functionality.py index c5ec3fa4ed2..4572cd40056 100644 --- a/tests/hermes_cli/test_kanban_core_functionality.py +++ b/tests/hermes_cli/test_kanban_core_functionality.py @@ -568,6 +568,57 @@ def test_notify_cursor_advances(kanban_home): conn.close() +def test_notify_claim_is_single_owner_and_rewindable(kanban_home): + conn1 = kb.connect() + conn2 = kb.connect() + try: + tid = kb.create_task(conn1, title="x", assignee="w") + kb.add_notify_sub(conn1, task_id=tid, platform="telegram", chat_id="123") + kb.complete_task(conn1, tid, result="ok") + + old_cursor, claimed_cursor, events = kb.claim_unseen_events_for_sub( + conn1, + task_id=tid, + platform="telegram", + chat_id="123", + kinds=["completed", "blocked"], + ) + assert old_cursor == 0 + assert claimed_cursor > old_cursor + assert [ev.kind for ev in events] == ["completed"] + + # A concurrent notifier instance sees the advanced cursor and cannot + # claim/send the same event range. + _, _, duplicate_events = kb.claim_unseen_events_for_sub( + conn2, + task_id=tid, + platform="telegram", + chat_id="123", + kinds=["completed", "blocked"], + ) + assert duplicate_events == [] + + assert kb.rewind_notify_cursor( + conn1, + task_id=tid, + platform="telegram", + chat_id="123", + claimed_cursor=claimed_cursor, + old_cursor=old_cursor, + ) is True + _, retried_events = kb.unseen_events_for_sub( + conn2, + task_id=tid, + platform="telegram", + chat_id="123", + kinds=["completed", "blocked"], + ) + assert [ev.kind for ev in retried_events] == ["completed"] + finally: + conn1.close() + conn2.close() + + # --------------------------------------------------------------------------- # GC + retention # ---------------------------------------------------------------------------