import asyncio import pytest from pathlib import Path from hermes_cli import kanban_db as kb from unittest.mock import AsyncMock, MagicMock, patch # --------------------------------------------------------------------------- # Fixtures # --------------------------------------------------------------------------- @pytest.fixture def kanban_home(tmp_path, monkeypatch): home = tmp_path / ".hermes" home.mkdir() monkeypatch.setenv("HERMES_HOME", str(home)) monkeypatch.setattr(Path, "home", lambda: tmp_path) kb.init_db() return home @pytest.mark.asyncio async def test_notifier_unsubs_after_completed_event(kanban_home): """ Subscription should be remove after completed event """ import hermes_cli.kanban_db as kb from gateway.run import GatewayRunner from gateway.config import Platform conn = kb.connect() try: tid = kb.create_task(conn, title="test task", assignee="worker1") kb.add_notify_sub(conn, task_id=tid, platform="telegram", chat_id="chat1") kb.complete_task(conn, tid, result="completed by agent") finally: conn.close() runner = object.__new__(GatewayRunner) runner._running = True runner._kanban_sub_fail_counts = {} fake_adapter = MagicMock() async def _send_and_stop(chat_id, msg, metadata=None): runner._running = False fake_adapter.send = AsyncMock(side_effect=_send_and_stop) runner.adapters = {Platform.TELEGRAM: fake_adapter} _orig_sleep = asyncio.sleep async def _fast_sleep(_): await _orig_sleep(0) with patch("gateway.run.asyncio.sleep", side_effect=_fast_sleep): await asyncio.wait_for( runner._kanban_notifier_watcher(interval=1), timeout=10.0, ) fake_adapter.send.assert_called_once() call_msg = fake_adapter.send.call_args[0][1] assert "completed" in call_msg conn = kb.connect() try: subs = kb.list_notify_subs(conn, tid) finally: conn.close() assert subs == [], "Subscription should be unsub after completed event" @pytest.mark.asyncio @pytest.mark.parametrize('kind', ["gave_up", "crashed", "timed_out"]) async def test_notifier_unsubs_after_abnormal_events(kind, kanban_home): """ Event kind of gave_up, crashed, time_out would be cover, and remove subscription """ import hermes_cli.kanban_db as kb from gateway.run import GatewayRunner from gateway.config import Platform conn = kb.connect() try: tid = kb.create_task(conn, title=f"test {kind} task", assignee="worker1") kb.add_notify_sub(conn, task_id=tid, platform="telegram", chat_id="chat1") kb._append_event(conn, tid, kind=kind) finally: conn.close() runner = object.__new__(GatewayRunner) runner._running = True runner._kanban_sub_fail_counts = {} fake_adapter = MagicMock() async def _send_and_stop(chat_id, msg, metadata=None): runner._running = False fake_adapter.send = AsyncMock(side_effect=_send_and_stop) runner.adapters = {Platform.TELEGRAM: fake_adapter} _orig_sleep = asyncio.sleep async def _fast_sleep(_): await _orig_sleep(0) with patch("gateway.run.asyncio.sleep", side_effect=_fast_sleep): await asyncio.wait_for( runner._kanban_notifier_watcher(interval=1), timeout=10.0, ) fake_adapter.send.assert_called_once() assert kind.replace('_', ' ') in fake_adapter.send.call_args[0][1] conn = kb.connect() try: subs = kb.list_notify_subs(conn, tid) finally: conn.close() assert subs == [], "Subscription should be unsub after abnormal crash" @pytest.mark.asyncio async def test_notifier_second_blocked_delivers(kanban_home): """ After the first blocked, should receive second blocked notification. """ import hermes_cli.kanban_db as kb from gateway.run import GatewayRunner from gateway.config import Platform runner = object.__new__(GatewayRunner) runner._running = True runner._kanban_sub_fail_counts = {} delivered_msgs: list[str] = [] async def _capture_send(chat_id, msg, metadata=None): delivered_msgs.append(msg) fake_adapter = MagicMock() fake_adapter.send = AsyncMock(side_effect=_capture_send) runner.adapters = {Platform.TELEGRAM: fake_adapter} _orig_sleep = asyncio.sleep tick_count = 0 async def _fast_sleep(_): nonlocal tick_count await _orig_sleep(0) tick_count += 1 if tick_count >= 6: runner._running = False conn = kb.connect() try: tid = kb.create_task(conn, title="test task", assignee="worker1") kb.add_notify_sub(conn, task_id=tid, platform="telegram", chat_id="chat1") # Cycle 1: blocked kb.block_task(conn, tid, reason="first block") finally: conn.close() with patch("gateway.run.asyncio.sleep", side_effect=_fast_sleep): await asyncio.wait_for( runner._kanban_notifier_watcher(interval=1), timeout=10.0, ) # Cycle 2: unblock → block run again runner._running = True tick_count = 0 conn = kb.connect() try: kb.unblock_task(conn, tid) kb.block_task(conn, tid, reason="second block") finally: conn.close() with patch("gateway.run.asyncio.sleep", side_effect=_fast_sleep): await asyncio.wait_for( runner._kanban_notifier_watcher(interval=1), timeout=10.0, ) blocked_deliveries = [m for m in delivered_msgs if "blocked" in m] assert "second block" not in blocked_deliveries[0] assert "second block" in blocked_deliveries[1] assert len(blocked_deliveries) == 2, ( f"Should receive 2 blocked notification, but only get {len(blocked_deliveries)} count\n" f"Message {delivered_msgs}" ) # --------------------------------------------------------------------------- # Regression: gateway watchers must not double-init the kanban DB. # # Both the notifier watcher (`_kanban_notifier_watcher`) and the dispatcher # tick (`_tick_once_for_board`) used to call `_kb.connect(board=slug)` # immediately followed by `_kb.init_db(board=slug)`. Since `connect()` # already runs the schema + idempotent migration on first open per process, # the explicit `init_db()` was redundant — and worse, `init_db()` # deliberately busts the per-process cache and re-runs the migration on a # *second* connection, which races the first. On legacy DBs this surfaced # as `duplicate column name: ` (now tolerated by # `_add_column_if_missing`) and intermittent `database is locked` errors # (issue #21378). # # The fix removes the `init_db()` calls in both watchers; this regression # test pins that behaviour so we don't reintroduce them. # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_notifier_does_not_call_init_db(kanban_home): """Notifier watcher path must not invoke `_kb.init_db` (issue #21378).""" import hermes_cli.kanban_db as kb from gateway.run import GatewayRunner from gateway.config import Platform runner = object.__new__(GatewayRunner) runner._running = True runner._kanban_sub_fail_counts = {} fake_adapter = MagicMock() fake_adapter.send = AsyncMock() runner.adapters = {Platform.TELEGRAM: fake_adapter} _orig_sleep = asyncio.sleep tick_count = 0 async def _fast_sleep(_): nonlocal tick_count await _orig_sleep(0) tick_count += 1 if tick_count >= 3: runner._running = False init_db_calls: list[object] = [] real_init_db = kb.init_db def _spy_init_db(*args, **kwargs): init_db_calls.append((args, kwargs)) return real_init_db(*args, **kwargs) with patch("gateway.run.asyncio.sleep", side_effect=_fast_sleep), \ patch("hermes_cli.kanban_db.init_db", side_effect=_spy_init_db): await asyncio.wait_for( runner._kanban_notifier_watcher(interval=1), timeout=10.0, ) assert init_db_calls == [], ( "_kanban_notifier_watcher must not call init_db on every tick — " "connect() handles first-run schema init. " "Reintroducing init_db revives issue #21378. " f"Got {len(init_db_calls)} call(s): {init_db_calls}" ) def test_dispatcher_tick_does_not_call_init_db(kanban_home, monkeypatch): """`_tick_once_for_board` must not invoke `_kb.init_db` (issue #21378). `connect()` already runs the schema + idempotent migration on first open per process. The explicit `init_db()` call was redundant and triggered a second migration on a second connection that raced the first. """ import hermes_cli.kanban_db as kb from gateway.run import GatewayRunner from unittest.mock import patch runner = object.__new__(GatewayRunner) init_db_calls: list[object] = [] real_init_db = kb.init_db def _spy_init_db(*args, **kwargs): init_db_calls.append((args, kwargs)) return real_init_db(*args, **kwargs) # The dispatcher watcher's tick lives as a local closure inside # `_kanban_dispatcher_watcher`. Read the source and assert the # specific patterns that would reintroduce the bug are absent. import inspect src = inspect.getsource(GatewayRunner._kanban_dispatcher_watcher) assert "_kb.init_db(board=slug)" not in src, ( "_kanban_dispatcher_watcher must not call _kb.init_db(board=slug) — " "see issue #21378. Use connect() alone; it runs migrations on first " "open per process." ) notifier_src = inspect.getsource(GatewayRunner._kanban_notifier_watcher) assert "_kb.init_db(board=slug)" not in notifier_src, ( "_kanban_notifier_watcher must not call _kb.init_db(board=slug) — " "see issue #21378." )