hermes-agent/tests/hermes_cli/test_kanban_notify.py
Clooooode dd49d50389 test(kanban): assert re-block notification is delivered after unblock cycle
Adds test_notifier_second_blocked_delivers to cover the case where a
task is blocked, unblocked, then blocked again — the second blocked
event must still deliver a gateway notification.

Currently fails because blocked is treated as a terminal event kind,
causing the subscription to be dropped after the first block.
2026-05-09 19:31:41 -07:00

199 lines
5.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import pytest
from pathlib import Path
from hermes_cli import kanban_db as kb
from unittest.mock import AsyncMock, MagicMock, patch
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def kanban_home(tmp_path, monkeypatch):
home = tmp_path / ".hermes"
home.mkdir()
monkeypatch.setenv("HERMES_HOME", str(home))
monkeypatch.setattr(Path, "home", lambda: tmp_path)
kb.init_db()
return home
@pytest.mark.asyncio
async def test_notifier_unsubs_after_completed_event(kanban_home):
"""
completed 事件送出後subscription 應該被移除event_terminal 邏輯)。
"""
import hermes_cli.kanban_db as kb
from gateway.run import GatewayRunner
from gateway.config import Platform
conn = kb.connect()
try:
tid = kb.create_task(conn, title="test task", assignee="worker1")
kb.add_notify_sub(conn, task_id=tid, platform="telegram", chat_id="chat1")
kb.complete_task(conn, tid, result="completed by agent")
finally:
conn.close()
runner = object.__new__(GatewayRunner)
runner._running = True
runner._kanban_sub_fail_counts = {}
fake_adapter = MagicMock()
async def _send_and_stop(chat_id, msg, metadata=None):
runner._running = False
fake_adapter.send = AsyncMock(side_effect=_send_and_stop)
runner.adapters = {Platform.TELEGRAM: fake_adapter}
_orig_sleep = asyncio.sleep
async def _fast_sleep(_):
await _orig_sleep(0)
with patch("gateway.run.asyncio.sleep", side_effect=_fast_sleep):
await asyncio.wait_for(
runner._kanban_notifier_watcher(interval=1),
timeout=10.0,
)
fake_adapter.send.assert_called_once()
call_msg = fake_adapter.send.call_args[0][1]
assert "completed" in call_msg
conn = kb.connect()
try:
subs = kb.list_notify_subs(conn, tid)
finally:
conn.close()
assert subs == [], "Subscription should be unsub after completed event"
@pytest.mark.asyncio
@pytest.mark.parametrize('kind', ["gave_up", "crashed", "timed_out"])
async def test_notifier_unsubs_after_abnormal_events(kind, kanban_home):
"""
Event kind of gave_up, crashed, time_out would be cover, and remove subscription
"""
import hermes_cli.kanban_db as kb
from gateway.run import GatewayRunner
from gateway.config import Platform
conn = kb.connect()
try:
tid = kb.create_task(conn, title=f"test {kind} task", assignee="worker1")
kb.add_notify_sub(conn, task_id=tid, platform="telegram", chat_id="chat1")
kb._append_event(conn, tid, kind=kind)
finally:
conn.close()
runner = object.__new__(GatewayRunner)
runner._running = True
runner._kanban_sub_fail_counts = {}
fake_adapter = MagicMock()
async def _send_and_stop(chat_id, msg, metadata=None):
runner._running = False
fake_adapter.send = AsyncMock(side_effect=_send_and_stop)
runner.adapters = {Platform.TELEGRAM: fake_adapter}
_orig_sleep = asyncio.sleep
async def _fast_sleep(_):
await _orig_sleep(0)
with patch("gateway.run.asyncio.sleep", side_effect=_fast_sleep):
await asyncio.wait_for(
runner._kanban_notifier_watcher(interval=1),
timeout=10.0,
)
fake_adapter.send.assert_called_once()
assert kind.replace('_', ' ') in fake_adapter.send.call_args[0][1]
conn = kb.connect()
try:
subs = kb.list_notify_subs(conn, tid)
finally:
conn.close()
assert subs == [], "Subscription should be unsub after abnormal crash"
@pytest.mark.asyncio
async def test_notifier_second_blocked_delivers(kanban_home):
"""
After the first blocked, should receive second blocked notification.
"""
import hermes_cli.kanban_db as kb
from gateway.run import GatewayRunner
from gateway.config import Platform
runner = object.__new__(GatewayRunner)
runner._running = True
runner._kanban_sub_fail_counts = {}
delivered_msgs: list[str] = []
async def _capture_send(chat_id, msg, metadata=None):
delivered_msgs.append(msg)
fake_adapter = MagicMock()
fake_adapter.send = AsyncMock(side_effect=_capture_send)
runner.adapters = {Platform.TELEGRAM: fake_adapter}
_orig_sleep = asyncio.sleep
tick_count = 0
async def _fast_sleep(_):
nonlocal tick_count
await _orig_sleep(0)
tick_count += 1
if tick_count >= 6:
runner._running = False
conn = kb.connect()
try:
tid = kb.create_task(conn, title="test task", assignee="worker1")
kb.add_notify_sub(conn, task_id=tid, platform="telegram", chat_id="chat1")
# Cycle 1: blocked
kb.block_task(conn, tid, reason="first block")
finally:
conn.close()
with patch("gateway.run.asyncio.sleep", side_effect=_fast_sleep):
await asyncio.wait_for(
runner._kanban_notifier_watcher(interval=1),
timeout=10.0,
)
# Cycle 2: unblock → block run again
runner._running = True
tick_count = 0
conn = kb.connect()
try:
kb.unblock_task(conn, tid)
kb.block_task(conn, tid, reason="second block")
finally:
conn.close()
with patch("gateway.run.asyncio.sleep", side_effect=_fast_sleep):
await asyncio.wait_for(
runner._kanban_notifier_watcher(interval=1),
timeout=10.0,
)
blocked_deliveries = [m for m in delivered_msgs if "blocked" in m]
assert "second block" not in blocked_deliveries[0]
assert "second block" in blocked_deliveries[1]
assert len(blocked_deliveries) == 2, (
f"Should receive 2 blocked notification, but only get {len(blocked_deliveries)} count\n"
f"Message {delivered_msgs}"
)