import asyncio import pytest from pathlib import Path from hermes_cli import kanban_db as kb from unittest.mock import AsyncMock, MagicMock, patch # --------------------------------------------------------------------------- # Fixtures # --------------------------------------------------------------------------- @pytest.fixture def kanban_home(tmp_path, monkeypatch): home = tmp_path / ".hermes" home.mkdir() monkeypatch.setenv("HERMES_HOME", str(home)) monkeypatch.setattr(Path, "home", lambda: tmp_path) kb.init_db() return home @pytest.mark.asyncio async def test_notifier_unsubs_after_completed_event(kanban_home): """ Subscription should be remove after completed event """ import hermes_cli.kanban_db as kb from gateway.run import GatewayRunner from gateway.config import Platform conn = kb.connect() try: tid = kb.create_task(conn, title="test task", assignee="worker1") kb.add_notify_sub(conn, task_id=tid, platform="telegram", chat_id="chat1") kb.complete_task(conn, tid, result="completed by agent") finally: conn.close() runner = object.__new__(GatewayRunner) runner._running = True runner._kanban_sub_fail_counts = {} fake_adapter = MagicMock() async def _send_and_stop(chat_id, msg, metadata=None): runner._running = False fake_adapter.send = AsyncMock(side_effect=_send_and_stop) runner.adapters = {Platform.TELEGRAM: fake_adapter} _orig_sleep = asyncio.sleep async def _fast_sleep(_): await _orig_sleep(0) with patch("gateway.run.asyncio.sleep", side_effect=_fast_sleep): await asyncio.wait_for( runner._kanban_notifier_watcher(interval=1), timeout=10.0, ) fake_adapter.send.assert_called_once() call_msg = fake_adapter.send.call_args[0][1] assert "completed" in call_msg conn = kb.connect() try: subs = kb.list_notify_subs(conn, tid) finally: conn.close() assert subs == [], "Subscription should be unsub after completed event" @pytest.mark.asyncio @pytest.mark.parametrize('kind', ["gave_up", "crashed", "timed_out"]) async def test_notifier_unsubs_after_abnormal_events(kind, kanban_home): """ Event kinds gave_up / crashed / timed_out send a notification but DO NOT delete the subscription. The dispatcher may respawn the task and fire the same event kind again (e.g. a worker that crashes, gets reclaimed, and crashes a second time); the user must hear about the second event too. Subscriptions are removed only when the task hits a truly final status (done / archived) — see the comment on TERMINAL_KINDS in gateway/run.py and PR #21398. """ import hermes_cli.kanban_db as kb from gateway.run import GatewayRunner from gateway.config import Platform conn = kb.connect() try: tid = kb.create_task(conn, title=f"test {kind} task", assignee="worker1") kb.add_notify_sub(conn, task_id=tid, platform="telegram", chat_id="chat1") kb._append_event(conn, tid, kind=kind) finally: conn.close() runner = object.__new__(GatewayRunner) runner._running = True runner._kanban_sub_fail_counts = {} fake_adapter = MagicMock() async def _send_and_stop(chat_id, msg, metadata=None): runner._running = False fake_adapter.send = AsyncMock(side_effect=_send_and_stop) runner.adapters = {Platform.TELEGRAM: fake_adapter} _orig_sleep = asyncio.sleep async def _fast_sleep(_): await _orig_sleep(0) with patch("gateway.run.asyncio.sleep", side_effect=_fast_sleep): await asyncio.wait_for( runner._kanban_notifier_watcher(interval=1), timeout=10.0, ) # The user is notified about the abnormal event... fake_adapter.send.assert_called_once() assert kind.replace('_', ' ') in fake_adapter.send.call_args[0][1] # ...but the subscription survives so a respawn-then-same-event cycle # reaches the user too. The cursor (last_event_id) advanced inside # the same write txn as the claim, so the same event won't re-fire. conn = kb.connect() try: subs = kb.list_notify_subs(conn, tid) finally: conn.close() assert len(subs) == 1, ( f"Subscription should survive {kind!r} so the next cycle of the " f"same event reaches the user; got {subs!r}" ) assert int(subs[0]["last_event_id"]) >= 1, ( "Cursor should have advanced past the delivered event " "(claim_unseen_events_for_sub advances atomically inside the " "same write txn as the read)." ) @pytest.mark.asyncio async def test_notifier_second_blocked_delivers(kanban_home): """ After the first blocked, should receive second blocked notification. """ import hermes_cli.kanban_db as kb from gateway.run import GatewayRunner from gateway.config import Platform runner = object.__new__(GatewayRunner) runner._running = True runner._kanban_sub_fail_counts = {} delivered_msgs: list[str] = [] async def _capture_send(chat_id, msg, metadata=None): delivered_msgs.append(msg) fake_adapter = MagicMock() fake_adapter.send = AsyncMock(side_effect=_capture_send) runner.adapters = {Platform.TELEGRAM: fake_adapter} _orig_sleep = asyncio.sleep tick_count = 0 async def _fast_sleep(_): nonlocal tick_count await _orig_sleep(0) tick_count += 1 if tick_count >= 6: runner._running = False conn = kb.connect() try: tid = kb.create_task(conn, title="test task", assignee="worker1") kb.add_notify_sub(conn, task_id=tid, platform="telegram", chat_id="chat1") # Cycle 1: blocked kb.block_task(conn, tid, reason="first block") finally: conn.close() with patch("gateway.run.asyncio.sleep", side_effect=_fast_sleep): await asyncio.wait_for( runner._kanban_notifier_watcher(interval=1), timeout=10.0, ) # Cycle 2: unblock → block run again runner._running = True tick_count = 0 conn = kb.connect() try: kb.unblock_task(conn, tid) kb.block_task(conn, tid, reason="second block") finally: conn.close() with patch("gateway.run.asyncio.sleep", side_effect=_fast_sleep): await asyncio.wait_for( runner._kanban_notifier_watcher(interval=1), timeout=10.0, ) blocked_deliveries = [m for m in delivered_msgs if "blocked" in m] assert "second block" not in blocked_deliveries[0] assert "second block" in blocked_deliveries[1] assert len(blocked_deliveries) == 2, ( f"Should receive 2 blocked notification, but only get {len(blocked_deliveries)} count\n" f"Message {delivered_msgs}" ) # --------------------------------------------------------------------------- # Regression: gateway watchers must not double-init the kanban DB. # # Both the notifier watcher (`_kanban_notifier_watcher`) and the dispatcher # tick (`_tick_once_for_board`) used to call `_kb.connect(board=slug)` # immediately followed by `_kb.init_db(board=slug)`. Since `connect()` # already runs the schema + idempotent migration on first open per process, # the explicit `init_db()` was redundant — and worse, `init_db()` # deliberately busts the per-process cache and re-runs the migration on a # *second* connection, which races the first. On legacy DBs this surfaced # as `duplicate column name: