fix: dedupe kanban notifier delivery claims

This commit is contained in:
Mike Nguyen 2026-05-09 12:41:31 +00:00 committed by Teknium
parent 373c4d6647
commit 861ce7c0b6
5 changed files with 411 additions and 7 deletions

View file

@ -4119,18 +4119,42 @@ class GatewayRunner:
try:
def _collect():
deliveries: list[dict] = []
# Enumerate every board on disk. Cheap: a few
# directory stat calls per tick. Missing/empty
# boards are silently skipped.
active_platforms = {
getattr(platform, "value", str(platform)).lower()
for platform in self.adapters.keys()
}
if not active_platforms:
logger.debug("kanban notifier: no connected adapters; skipping tick")
return deliveries
# Enumerate every board on disk, but poll each resolved DB
# path once. Multiple slugs can point at the same DB when
# HERMES_KANBAN_DB pins the board path; without this guard
# one gateway could collect the same subscription/event
# more than once before advancing the cursor.
try:
boards = _kb.list_boards(include_archived=False)
except Exception:
boards = [_kb.read_board_metadata(_kb.DEFAULT_BOARD)]
seen_db_paths: set[str] = set()
for board_meta in boards:
slug = board_meta.get("slug") or _kb.DEFAULT_BOARD
db_path = board_meta.get("db_path")
try:
resolved_db_path = str(Path(db_path).expanduser().resolve()) if db_path else str(_kb.kanban_db_path(slug).resolve())
except Exception:
resolved_db_path = f"slug:{slug}"
if resolved_db_path in seen_db_paths:
logger.debug(
"kanban notifier: skipping duplicate board slug %s for DB %s",
slug, resolved_db_path,
)
continue
seen_db_paths.add(resolved_db_path)
try:
conn = _kb.connect(board=slug)
except Exception:
except Exception as exc:
logger.debug("kanban notifier: cannot open board %s: %s", slug, exc)
continue
try:
# `connect()` runs the schema + idempotent migration
@ -4146,8 +4170,17 @@ class GatewayRunner:
# tolerates that race, but we still skip the
# redundant call to avoid the wasted work.
subs = _kb.list_notify_subs(conn)
if not subs:
logger.debug("kanban notifier: board %s has no subscriptions", slug)
for sub in subs:
cursor, events = _kb.unseen_events_for_sub(
platform = (sub.get("platform") or "").lower()
if platform not in active_platforms:
logger.debug(
"kanban notifier: subscription for %s on %s skipped; adapter not connected",
sub.get("task_id"), platform or "<missing>",
)
continue
old_cursor, cursor, events = _kb.claim_unseen_events_for_sub(
conn,
task_id=sub["task_id"],
platform=sub["platform"],
@ -4158,8 +4191,13 @@ class GatewayRunner:
if not events:
continue
task = _kb.get_task(conn, sub["task_id"])
logger.debug(
"kanban notifier: claimed %d event(s) for %s on board %s cursor %s%s",
len(events), sub["task_id"], slug, old_cursor, cursor,
)
deliveries.append({
"sub": sub,
"old_cursor": old_cursor,
"cursor": cursor,
"events": events,
"task": task,
@ -4186,7 +4224,18 @@ class GatewayRunner:
continue
adapter = self.adapters.get(plat)
if adapter is None:
continue # platform not currently connected
logger.debug(
"kanban notifier: adapter %s disconnected before delivery for %s; rewinding claim",
platform_str, sub["task_id"],
)
await asyncio.to_thread(
self._kanban_rewind,
sub,
d["cursor"],
d.get("old_cursor", 0),
board_slug,
)
continue
title = (task.title if task else sub["task_id"])[:120]
for ev in d["events"]:
kind = ev.kind
@ -4254,6 +4303,10 @@ class GatewayRunner:
await adapter.send(
sub["chat_id"], msg, metadata=metadata,
)
logger.info(
"kanban notifier: delivered %s event for %s to %s/%s on board %s",
kind, sub["task_id"], platform_str, sub["chat_id"], board_slug,
)
# Reset the failure counter on success.
sub_fail_counts.pop(sub_key, None)
except Exception as exc:
@ -4273,7 +4326,17 @@ class GatewayRunner:
)
await asyncio.to_thread(self._kanban_unsub, sub, board_slug)
sub_fail_counts.pop(sub_key, None)
# Don't advance cursor on send failure — retry next tick.
else:
await asyncio.to_thread(
self._kanban_rewind,
sub,
d["cursor"],
d.get("old_cursor", 0),
board_slug,
)
# Rewind the pre-send claim on transient failure so
# a later tick can retry. After too many failures,
# dropping the subscription is the terminal action.
break
else:
# All events delivered; advance cursor + maybe unsub.
@ -4336,6 +4399,29 @@ class GatewayRunner:
finally:
conn.close()
def _kanban_rewind(
self,
sub: dict,
claimed_cursor: int,
old_cursor: int,
board: Optional[str] = None,
) -> None:
"""Sync helper: undo a claimed notification cursor after send failure."""
from hermes_cli import kanban_db as _kb
conn = _kb.connect(board=board)
try:
_kb.rewind_notify_cursor(
conn,
task_id=sub["task_id"],
platform=sub["platform"],
chat_id=sub["chat_id"],
thread_id=sub.get("thread_id") or "",
claimed_cursor=claimed_cursor,
old_cursor=old_cursor,
)
finally:
conn.close()
async def _kanban_dispatcher_watcher(self) -> None:
"""Embedded kanban dispatcher — one tick every `dispatch_interval_seconds`.

View file

@ -4368,6 +4368,57 @@ def unseen_events_for_sub(
return max_id, out
def claim_unseen_events_for_sub(
conn: sqlite3.Connection,
*,
task_id: str,
platform: str,
chat_id: str,
thread_id: Optional[str] = None,
kinds: Optional[Iterable[str]] = None,
) -> tuple[int, int, list[Event]]:
"""Atomically claim unseen notification events for one subscription.
Returns ``(old_cursor, new_cursor, events)``. When events are returned,
``kanban_notify_subs.last_event_id`` has already been advanced to
``new_cursor`` inside a ``BEGIN IMMEDIATE`` transaction. That makes the
notifier's read/claim step single-owner across multiple gateway watcher
processes pointed at the same board DB: concurrent watchers serialize on
SQLite's writer lock, and only the first process sees and claims a given
event range.
Callers should send the claimed events, then either leave the cursor at
``new_cursor`` on success or call :func:`rewind_notify_cursor` if delivery
failed before any terminal unsubscribe removed the row.
"""
with write_txn(conn):
row = conn.execute(
"SELECT last_event_id FROM kanban_notify_subs "
"WHERE task_id = ? AND platform = ? AND chat_id = ? AND thread_id = ?",
(task_id, platform, chat_id, thread_id or ""),
).fetchone()
if row is None:
return 0, 0, []
old_cursor = int(row["last_event_id"])
new_cursor, events = unseen_events_for_sub(
conn,
task_id=task_id,
platform=platform,
chat_id=chat_id,
thread_id=thread_id,
kinds=kinds,
)
if not events:
return old_cursor, old_cursor, []
conn.execute(
"UPDATE kanban_notify_subs SET last_event_id = ? "
"WHERE task_id = ? AND platform = ? AND chat_id = ? AND thread_id = ? "
"AND last_event_id = ?",
(int(new_cursor), task_id, platform, chat_id, thread_id or "", int(old_cursor)),
)
return old_cursor, new_cursor, events
def advance_notify_cursor(
conn: sqlite3.Connection,
*,
@ -4385,6 +4436,35 @@ def advance_notify_cursor(
)
def rewind_notify_cursor(
conn: sqlite3.Connection,
*,
task_id: str,
platform: str,
chat_id: str,
thread_id: Optional[str] = None,
claimed_cursor: int,
old_cursor: int,
) -> bool:
"""Undo a notification claim when delivery fails.
The CAS guard only rewinds if no later notifier advanced the row after our
claim. This keeps retry behavior for transient send failures without
clobbering newer progress.
"""
with write_txn(conn):
cur = conn.execute(
"UPDATE kanban_notify_subs SET last_event_id = ? "
"WHERE task_id = ? AND platform = ? AND chat_id = ? AND thread_id = ? "
"AND last_event_id = ?",
(
int(old_cursor), task_id, platform, chat_id, thread_id or "",
int(claimed_cursor),
),
)
return cur.rowcount > 0
# ---------------------------------------------------------------------------
# Retention + garbage collection
# ---------------------------------------------------------------------------

View file

@ -188,6 +188,16 @@ _HERMES_BEHAVIORAL_VARS = frozenset({
"HERMES_BACKGROUND_NOTIFICATIONS",
"HERMES_EXEC_ASK",
"HERMES_HOME_MODE",
# Kanban path/board pins must never leak from a developer shell or
# dispatched worker into tests; otherwise tests can write fake tasks to
# the real ~/.hermes/kanban.db instead of the per-test HERMES_HOME.
"HERMES_KANBAN_DB",
"HERMES_KANBAN_BOARD",
"HERMES_KANBAN_WORKSPACES_ROOT",
"HERMES_KANBAN_LOGS_ROOT",
"HERMES_KANBAN_TASK",
"HERMES_KANBAN_WORKSPACE",
"HERMES_TENANT",
"TERMINAL_CWD",
"TERMINAL_ENV",
"TERMINAL_VERCEL_RUNTIME",
@ -223,6 +233,45 @@ _HERMES_BEHAVIORAL_VARS = frozenset({
"SIGNAL_ALLOW_ALL_USERS",
"EMAIL_ALLOW_ALL_USERS",
"SMS_ALLOW_ALL_USERS",
# Gateway home channels are set by /sethome in real profiles. Tests that
# exercise dashboard notification toggles must opt in explicitly or they
# can accidentally subscribe against a developer's real home channel.
"TELEGRAM_HOME_CHANNEL",
"TELEGRAM_HOME_CHANNEL_THREAD_ID",
"TELEGRAM_HOME_CHANNEL_NAME",
"DISCORD_HOME_CHANNEL",
"DISCORD_HOME_CHANNEL_THREAD_ID",
"DISCORD_HOME_CHANNEL_NAME",
"SLACK_HOME_CHANNEL",
"SLACK_HOME_CHANNEL_THREAD_ID",
"SLACK_HOME_CHANNEL_NAME",
"WHATSAPP_HOME_CHANNEL",
"WHATSAPP_HOME_CHANNEL_THREAD_ID",
"WHATSAPP_HOME_CHANNEL_NAME",
"SIGNAL_HOME_CHANNEL",
"SIGNAL_HOME_CHANNEL_THREAD_ID",
"SIGNAL_HOME_CHANNEL_NAME",
"EMAIL_HOME_CHANNEL",
"EMAIL_HOME_CHANNEL_THREAD_ID",
"EMAIL_HOME_CHANNEL_NAME",
"SMS_HOME_CHANNEL",
"SMS_HOME_CHANNEL_THREAD_ID",
"SMS_HOME_CHANNEL_NAME",
"MATTERMOST_HOME_CHANNEL",
"MATTERMOST_HOME_CHANNEL_THREAD_ID",
"MATTERMOST_HOME_CHANNEL_NAME",
"MATRIX_HOME_CHANNEL",
"MATRIX_HOME_CHANNEL_THREAD_ID",
"MATRIX_HOME_CHANNEL_NAME",
"DINGTALK_HOME_CHANNEL",
"DINGTALK_HOME_CHANNEL_THREAD_ID",
"DINGTALK_HOME_CHANNEL_NAME",
"FEISHU_HOME_CHANNEL",
"FEISHU_HOME_CHANNEL_THREAD_ID",
"FEISHU_HOME_CHANNEL_NAME",
"WECOM_HOME_CHANNEL",
"WECOM_HOME_CHANNEL_THREAD_ID",
"WECOM_HOME_CHANNEL_NAME",
# Platform gating — set by load_gateway_config() as a side effect when
# a config.yaml is present, so individual test bodies that call the
# loader leak these values into later tests on the same xdist worker.

View file

@ -0,0 +1,138 @@
import asyncio
from pathlib import Path
import pytest
from gateway.config import Platform
from gateway.run import GatewayRunner
from hermes_cli import kanban_db as kb
class RecordingAdapter:
def __init__(self):
self.sent = []
async def send(self, chat_id, text, metadata=None):
self.sent.append({"chat_id": chat_id, "text": text, "metadata": metadata or {}})
class DisconnectedAdapters(dict):
"""Expose a platform during collection, then simulate disconnect on get()."""
def get(self, key, default=None):
return None
async def _run_one_notifier_tick(monkeypatch, runner):
real_sleep = asyncio.sleep
async def fake_sleep(delay):
if delay == 5:
return None
runner._running = False
await real_sleep(0)
monkeypatch.setattr(asyncio, "sleep", fake_sleep)
await runner._kanban_notifier_watcher(interval=1)
def _make_runner(adapter):
runner = GatewayRunner.__new__(GatewayRunner)
runner._running = True
runner.adapters = {Platform.TELEGRAM: adapter}
runner._kanban_sub_fail_counts = {}
return runner
def _create_completed_subscription(summary="done once"):
conn = kb.connect()
try:
tid = kb.create_task(conn, title="notify once", assignee="worker")
kb.add_notify_sub(conn, task_id=tid, platform="telegram", chat_id="chat-1")
kb.complete_task(conn, tid, summary=summary)
return tid
finally:
conn.close()
def _unseen_terminal_events(tid):
conn = kb.connect()
try:
_, events = kb.unseen_events_for_sub(
conn,
task_id=tid,
platform="telegram",
chat_id="chat-1",
kinds=["completed", "blocked", "gave_up", "crashed", "timed_out"],
)
return events
finally:
conn.close()
def test_kanban_notifier_dedupes_board_slugs_pointing_to_same_db(tmp_path, monkeypatch):
db_path = tmp_path / "shared-kanban.db"
monkeypatch.setenv("HERMES_KANBAN_DB", str(db_path))
kb.init_db()
kb.write_board_metadata("alias-a", name="Alias A")
kb.write_board_metadata("alias-b", name="Alias B")
tid = _create_completed_subscription()
adapter = RecordingAdapter()
runner = _make_runner(adapter)
asyncio.run(_run_one_notifier_tick(monkeypatch, runner))
assert len(adapter.sent) == 1
assert "Kanban" in adapter.sent[0]["text"]
assert tid in adapter.sent[0]["text"]
def test_kanban_notifier_claim_prevents_second_watcher_send(tmp_path, monkeypatch):
db_path = tmp_path / "single-owner.db"
monkeypatch.setenv("HERMES_KANBAN_DB", str(db_path))
kb.init_db()
tid = _create_completed_subscription()
adapter1 = RecordingAdapter()
adapter2 = RecordingAdapter()
asyncio.run(_run_one_notifier_tick(monkeypatch, _make_runner(adapter1)))
asyncio.run(_run_one_notifier_tick(monkeypatch, _make_runner(adapter2)))
assert len(adapter1.sent) == 1
assert adapter2.sent == []
def test_kanban_notifier_rewinds_claim_if_adapter_disconnects(tmp_path, monkeypatch):
db_path = tmp_path / "adapter-disconnect.db"
monkeypatch.setenv("HERMES_KANBAN_DB", str(db_path))
kb.init_db()
tid = _create_completed_subscription()
runner = GatewayRunner.__new__(GatewayRunner)
runner._running = True
runner.adapters = DisconnectedAdapters({Platform.TELEGRAM: RecordingAdapter()})
runner._kanban_sub_fail_counts = {}
asyncio.run(_run_one_notifier_tick(monkeypatch, runner))
assert [ev.kind for ev in _unseen_terminal_events(tid)] == ["completed"]
def test_kanban_db_path_is_test_isolated_from_real_home():
hermes_home = Path(kb.kanban_home())
production_db = Path.home() / ".hermes" / "kanban.db"
assert kb.kanban_db_path().resolve() != production_db.resolve()
conn = kb.connect()
try:
tid = kb.create_task(conn, title="x", assignee="worker")
kb.add_notify_sub(conn, task_id=tid, platform="telegram", chat_id="chat-1")
finally:
conn.close()
assert kb.kanban_db_path().resolve().is_relative_to(hermes_home.resolve())
assert kb.kanban_db_path().resolve() != production_db.resolve()

View file

@ -568,6 +568,57 @@ def test_notify_cursor_advances(kanban_home):
conn.close()
def test_notify_claim_is_single_owner_and_rewindable(kanban_home):
conn1 = kb.connect()
conn2 = kb.connect()
try:
tid = kb.create_task(conn1, title="x", assignee="w")
kb.add_notify_sub(conn1, task_id=tid, platform="telegram", chat_id="123")
kb.complete_task(conn1, tid, result="ok")
old_cursor, claimed_cursor, events = kb.claim_unseen_events_for_sub(
conn1,
task_id=tid,
platform="telegram",
chat_id="123",
kinds=["completed", "blocked"],
)
assert old_cursor == 0
assert claimed_cursor > old_cursor
assert [ev.kind for ev in events] == ["completed"]
# A concurrent notifier instance sees the advanced cursor and cannot
# claim/send the same event range.
_, _, duplicate_events = kb.claim_unseen_events_for_sub(
conn2,
task_id=tid,
platform="telegram",
chat_id="123",
kinds=["completed", "blocked"],
)
assert duplicate_events == []
assert kb.rewind_notify_cursor(
conn1,
task_id=tid,
platform="telegram",
chat_id="123",
claimed_cursor=claimed_cursor,
old_cursor=old_cursor,
) is True
_, retried_events = kb.unseen_events_for_sub(
conn2,
task_id=tid,
platform="telegram",
chat_id="123",
kinds=["completed", "blocked"],
)
assert [ev.kind for ev in retried_events] == ["completed"]
finally:
conn1.close()
conn2.close()
# ---------------------------------------------------------------------------
# GC + retention
# ---------------------------------------------------------------------------