diff --git a/tests/hermes_cli/test_kanban_notify.py b/tests/hermes_cli/test_kanban_notify.py new file mode 100644 index 00000000000..5a7b00e9581 --- /dev/null +++ b/tests/hermes_cli/test_kanban_notify.py @@ -0,0 +1,199 @@ +import asyncio +import pytest + +from pathlib import Path +from hermes_cli import kanban_db as kb +from unittest.mock import AsyncMock, MagicMock, patch + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + +@pytest.fixture +def kanban_home(tmp_path, monkeypatch): + home = tmp_path / ".hermes" + home.mkdir() + monkeypatch.setenv("HERMES_HOME", str(home)) + monkeypatch.setattr(Path, "home", lambda: tmp_path) + kb.init_db() + return home + + +@pytest.mark.asyncio +async def test_notifier_unsubs_after_completed_event(kanban_home): + """ + completed 事件送出後,subscription 應該被移除(event_terminal 邏輯)。 + """ + import hermes_cli.kanban_db as kb + from gateway.run import GatewayRunner + from gateway.config import Platform + + conn = kb.connect() + try: + tid = kb.create_task(conn, title="test task", assignee="worker1") + kb.add_notify_sub(conn, task_id=tid, platform="telegram", chat_id="chat1") + kb.complete_task(conn, tid, result="completed by agent") + finally: + conn.close() + + runner = object.__new__(GatewayRunner) + runner._running = True + runner._kanban_sub_fail_counts = {} + + fake_adapter = MagicMock() + + async def _send_and_stop(chat_id, msg, metadata=None): + runner._running = False + + fake_adapter.send = AsyncMock(side_effect=_send_and_stop) + runner.adapters = {Platform.TELEGRAM: fake_adapter} + + _orig_sleep = asyncio.sleep + + async def _fast_sleep(_): + await _orig_sleep(0) + + with patch("gateway.run.asyncio.sleep", side_effect=_fast_sleep): + await asyncio.wait_for( + runner._kanban_notifier_watcher(interval=1), + timeout=10.0, + ) + + fake_adapter.send.assert_called_once() + call_msg = fake_adapter.send.call_args[0][1] + assert "completed" in call_msg + + conn = kb.connect() + try: + subs = kb.list_notify_subs(conn, tid) + finally: + conn.close() + assert subs == [], "Subscription should be unsub after completed event" + + +@pytest.mark.asyncio +@pytest.mark.parametrize('kind', ["gave_up", "crashed", "timed_out"]) +async def test_notifier_unsubs_after_abnormal_events(kind, kanban_home): + """ + Event kind of gave_up, crashed, time_out would be cover, and remove subscription + """ + import hermes_cli.kanban_db as kb + from gateway.run import GatewayRunner + from gateway.config import Platform + + conn = kb.connect() + + try: + tid = kb.create_task(conn, title=f"test {kind} task", assignee="worker1") + kb.add_notify_sub(conn, task_id=tid, platform="telegram", chat_id="chat1") + kb._append_event(conn, tid, kind=kind) + finally: + conn.close() + + runner = object.__new__(GatewayRunner) + runner._running = True + runner._kanban_sub_fail_counts = {} + + fake_adapter = MagicMock() + + async def _send_and_stop(chat_id, msg, metadata=None): + runner._running = False + + fake_adapter.send = AsyncMock(side_effect=_send_and_stop) + runner.adapters = {Platform.TELEGRAM: fake_adapter} + + _orig_sleep = asyncio.sleep + + async def _fast_sleep(_): + await _orig_sleep(0) + + with patch("gateway.run.asyncio.sleep", side_effect=_fast_sleep): + await asyncio.wait_for( + runner._kanban_notifier_watcher(interval=1), + timeout=10.0, + ) + + fake_adapter.send.assert_called_once() + assert kind.replace('_', ' ') in fake_adapter.send.call_args[0][1] + + conn = kb.connect() + try: + subs = kb.list_notify_subs(conn, tid) + finally: + conn.close() + assert subs == [], "Subscription should be unsub after abnormal crash" + + +@pytest.mark.asyncio +async def test_notifier_second_blocked_delivers(kanban_home): + """ + After the first blocked, should receive second blocked notification. + """ + import hermes_cli.kanban_db as kb + from gateway.run import GatewayRunner + from gateway.config import Platform + + runner = object.__new__(GatewayRunner) + runner._running = True + runner._kanban_sub_fail_counts = {} + + delivered_msgs: list[str] = [] + + async def _capture_send(chat_id, msg, metadata=None): + delivered_msgs.append(msg) + + fake_adapter = MagicMock() + fake_adapter.send = AsyncMock(side_effect=_capture_send) + runner.adapters = {Platform.TELEGRAM: fake_adapter} + + _orig_sleep = asyncio.sleep + tick_count = 0 + + async def _fast_sleep(_): + nonlocal tick_count + await _orig_sleep(0) + tick_count += 1 + if tick_count >= 6: + runner._running = False + + conn = kb.connect() + try: + tid = kb.create_task(conn, title="test task", assignee="worker1") + kb.add_notify_sub(conn, task_id=tid, platform="telegram", chat_id="chat1") + + # Cycle 1: blocked + kb.block_task(conn, tid, reason="first block") + finally: + conn.close() + + with patch("gateway.run.asyncio.sleep", side_effect=_fast_sleep): + await asyncio.wait_for( + runner._kanban_notifier_watcher(interval=1), + timeout=10.0, + ) + + # Cycle 2: unblock → block run again + runner._running = True + tick_count = 0 + + conn = kb.connect() + try: + kb.unblock_task(conn, tid) + kb.block_task(conn, tid, reason="second block") + finally: + conn.close() + + with patch("gateway.run.asyncio.sleep", side_effect=_fast_sleep): + await asyncio.wait_for( + runner._kanban_notifier_watcher(interval=1), + timeout=10.0, + ) + + blocked_deliveries = [m for m in delivered_msgs if "blocked" in m] + assert "second block" not in blocked_deliveries[0] + assert "second block" in blocked_deliveries[1] + assert len(blocked_deliveries) == 2, ( + f"Should receive 2 blocked notification, but only get {len(blocked_deliveries)} count\n" + f"Message {delivered_msgs}" + )