hermes-agent/tests/hermes_cli/test_kanban_notify.py

431 lines
14 KiB
Python

import asyncio
import pytest
from pathlib import Path
from hermes_cli import kanban_db as kb
from unittest.mock import AsyncMock, MagicMock, patch
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def kanban_home(tmp_path, monkeypatch):
home = tmp_path / ".hermes"
home.mkdir()
monkeypatch.setenv("HERMES_HOME", str(home))
monkeypatch.setattr(Path, "home", lambda: tmp_path)
kb.init_db()
return home
@pytest.mark.asyncio
async def test_notifier_unsubs_after_completed_event(kanban_home):
"""
Subscription should be remove after completed event
"""
import hermes_cli.kanban_db as kb
from gateway.run import GatewayRunner
from gateway.config import Platform
conn = kb.connect()
try:
tid = kb.create_task(conn, title="test task", assignee="worker1")
kb.add_notify_sub(conn, task_id=tid, platform="telegram", chat_id="chat1")
kb.complete_task(conn, tid, result="completed by agent")
finally:
conn.close()
runner = object.__new__(GatewayRunner)
runner._running = True
runner._kanban_sub_fail_counts = {}
fake_adapter = MagicMock()
async def _send_and_stop(chat_id, msg, metadata=None):
runner._running = False
fake_adapter.send = AsyncMock(side_effect=_send_and_stop)
runner.adapters = {Platform.TELEGRAM: fake_adapter}
_orig_sleep = asyncio.sleep
async def _fast_sleep(_):
await _orig_sleep(0)
with patch("gateway.run.asyncio.sleep", side_effect=_fast_sleep):
await asyncio.wait_for(
runner._kanban_notifier_watcher(interval=1),
timeout=10.0,
)
fake_adapter.send.assert_called_once()
call_msg = fake_adapter.send.call_args[0][1]
assert "completed" in call_msg
conn = kb.connect()
try:
subs = kb.list_notify_subs(conn, tid)
finally:
conn.close()
assert subs == [], "Subscription should be unsub after completed event"
@pytest.mark.asyncio
@pytest.mark.parametrize('kind', ["gave_up", "crashed", "timed_out"])
async def test_notifier_unsubs_after_abnormal_events(kind, kanban_home):
"""
Event kinds gave_up / crashed / timed_out send a notification but DO
NOT delete the subscription. The dispatcher may respawn the task and
fire the same event kind again (e.g. a worker that crashes, gets
reclaimed, and crashes a second time); the user must hear about the
second event too. Subscriptions are removed only when the task hits
a truly final status (done / archived) — see the comment on
TERMINAL_KINDS in gateway/run.py and PR #21398.
"""
import hermes_cli.kanban_db as kb
from gateway.run import GatewayRunner
from gateway.config import Platform
conn = kb.connect()
try:
tid = kb.create_task(conn, title=f"test {kind} task", assignee="worker1")
kb.add_notify_sub(conn, task_id=tid, platform="telegram", chat_id="chat1")
kb._append_event(conn, tid, kind=kind)
finally:
conn.close()
runner = object.__new__(GatewayRunner)
runner._running = True
runner._kanban_sub_fail_counts = {}
fake_adapter = MagicMock()
async def _send_and_stop(chat_id, msg, metadata=None):
runner._running = False
fake_adapter.send = AsyncMock(side_effect=_send_and_stop)
runner.adapters = {Platform.TELEGRAM: fake_adapter}
_orig_sleep = asyncio.sleep
async def _fast_sleep(_):
await _orig_sleep(0)
with patch("gateway.run.asyncio.sleep", side_effect=_fast_sleep):
await asyncio.wait_for(
runner._kanban_notifier_watcher(interval=1),
timeout=10.0,
)
# The user is notified about the abnormal event...
fake_adapter.send.assert_called_once()
assert kind.replace('_', ' ') in fake_adapter.send.call_args[0][1]
# ...but the subscription survives so a respawn-then-same-event cycle
# reaches the user too. The cursor (last_event_id) advanced inside
# the same write txn as the claim, so the same event won't re-fire.
conn = kb.connect()
try:
subs = kb.list_notify_subs(conn, tid)
finally:
conn.close()
assert len(subs) == 1, (
f"Subscription should survive {kind!r} so the next cycle of the "
f"same event reaches the user; got {subs!r}"
)
assert int(subs[0]["last_event_id"]) >= 1, (
"Cursor should have advanced past the delivered event "
"(claim_unseen_events_for_sub advances atomically inside the "
"same write txn as the read)."
)
@pytest.mark.asyncio
async def test_notifier_second_blocked_delivers(kanban_home):
"""
After the first blocked, should receive second blocked notification.
"""
import hermes_cli.kanban_db as kb
from gateway.run import GatewayRunner
from gateway.config import Platform
runner = object.__new__(GatewayRunner)
runner._running = True
runner._kanban_sub_fail_counts = {}
delivered_msgs: list[str] = []
async def _capture_send(chat_id, msg, metadata=None):
delivered_msgs.append(msg)
fake_adapter = MagicMock()
fake_adapter.send = AsyncMock(side_effect=_capture_send)
runner.adapters = {Platform.TELEGRAM: fake_adapter}
_orig_sleep = asyncio.sleep
tick_count = 0
async def _fast_sleep(_):
nonlocal tick_count
await _orig_sleep(0)
tick_count += 1
if tick_count >= 6:
runner._running = False
conn = kb.connect()
try:
tid = kb.create_task(conn, title="test task", assignee="worker1")
kb.add_notify_sub(conn, task_id=tid, platform="telegram", chat_id="chat1")
# Cycle 1: blocked
kb.block_task(conn, tid, reason="first block")
finally:
conn.close()
with patch("gateway.run.asyncio.sleep", side_effect=_fast_sleep):
await asyncio.wait_for(
runner._kanban_notifier_watcher(interval=1),
timeout=10.0,
)
# Cycle 2: unblock → block run again
runner._running = True
tick_count = 0
conn = kb.connect()
try:
kb.unblock_task(conn, tid)
kb.block_task(conn, tid, reason="second block")
finally:
conn.close()
with patch("gateway.run.asyncio.sleep", side_effect=_fast_sleep):
await asyncio.wait_for(
runner._kanban_notifier_watcher(interval=1),
timeout=10.0,
)
blocked_deliveries = [m for m in delivered_msgs if "blocked" in m]
assert "second block" not in blocked_deliveries[0]
assert "second block" in blocked_deliveries[1]
assert len(blocked_deliveries) == 2, (
f"Should receive 2 blocked notification, but only get {len(blocked_deliveries)} count\n"
f"Message {delivered_msgs}"
)
# ---------------------------------------------------------------------------
# Regression: gateway watchers must not double-init the kanban DB.
#
# Both the notifier watcher (`_kanban_notifier_watcher`) and the dispatcher
# tick (`_tick_once_for_board`) used to call `_kb.connect(board=slug)`
# immediately followed by `_kb.init_db(board=slug)`. Since `connect()`
# already runs the schema + idempotent migration on first open per process,
# the explicit `init_db()` was redundant — and worse, `init_db()`
# deliberately busts the per-process cache and re-runs the migration on a
# *second* connection, which races the first. On legacy DBs this surfaced
# as `duplicate column name: <col>` (now tolerated by
# `_add_column_if_missing`) and intermittent `database is locked` errors
# (issue #21378).
#
# The fix removes the `init_db()` calls in both watchers; this regression
# test pins that behaviour so we don't reintroduce them.
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_notifier_does_not_call_init_db(kanban_home):
"""Notifier watcher path must not invoke `_kb.init_db` (issue #21378)."""
import hermes_cli.kanban_db as kb
from gateway.run import GatewayRunner
from gateway.config import Platform
runner = object.__new__(GatewayRunner)
runner._running = True
runner._kanban_sub_fail_counts = {}
fake_adapter = MagicMock()
fake_adapter.send = AsyncMock()
runner.adapters = {Platform.TELEGRAM: fake_adapter}
_orig_sleep = asyncio.sleep
tick_count = 0
async def _fast_sleep(_):
nonlocal tick_count
await _orig_sleep(0)
tick_count += 1
if tick_count >= 3:
runner._running = False
init_db_calls: list[object] = []
real_init_db = kb.init_db
def _spy_init_db(*args, **kwargs):
init_db_calls.append((args, kwargs))
return real_init_db(*args, **kwargs)
with patch("gateway.run.asyncio.sleep", side_effect=_fast_sleep), \
patch("hermes_cli.kanban_db.init_db", side_effect=_spy_init_db):
await asyncio.wait_for(
runner._kanban_notifier_watcher(interval=1),
timeout=10.0,
)
assert init_db_calls == [], (
"_kanban_notifier_watcher must not call init_db on every tick — "
"connect() handles first-run schema init. "
"Reintroducing init_db revives issue #21378. "
f"Got {len(init_db_calls)} call(s): {init_db_calls}"
)
def test_dispatcher_tick_does_not_call_init_db(kanban_home, monkeypatch):
"""`_tick_once_for_board` must not invoke `_kb.init_db` (issue #21378).
`connect()` already runs the schema + idempotent migration on first open
per process. The explicit `init_db()` call was redundant and triggered a
second migration on a second connection that raced the first.
"""
import hermes_cli.kanban_db as kb
from gateway.run import GatewayRunner
from unittest.mock import patch
runner = object.__new__(GatewayRunner)
init_db_calls: list[object] = []
real_init_db = kb.init_db
def _spy_init_db(*args, **kwargs):
init_db_calls.append((args, kwargs))
return real_init_db(*args, **kwargs)
# The dispatcher watcher's tick lives as a local closure inside
# `_kanban_dispatcher_watcher`. Read the source and assert the
# specific patterns that would reintroduce the bug are absent.
import inspect
src = inspect.getsource(GatewayRunner._kanban_dispatcher_watcher)
assert "_kb.init_db(board=slug)" not in src, (
"_kanban_dispatcher_watcher must not call _kb.init_db(board=slug) — "
"see issue #21378. Use connect() alone; it runs migrations on first "
"open per process."
)
notifier_src = inspect.getsource(GatewayRunner._kanban_notifier_watcher)
assert "_kb.init_db(board=slug)" not in notifier_src, (
"_kanban_notifier_watcher must not call _kb.init_db(board=slug) — "
"see issue #21378."
)
@pytest.mark.asyncio
async def test_notifier_skips_subscription_owned_by_other_profile(kanban_home):
"""Each gateway keeps its watcher on, but only the subscribing profile claims."""
import hermes_cli.kanban_db as kb
from gateway.run import GatewayRunner
from gateway.config import Platform
conn = kb.connect()
try:
tid = kb.create_task(conn, title="owned task", assignee="backend-engineer")
kb.add_notify_sub(
conn,
task_id=tid,
platform="telegram",
chat_id="chat1",
notifier_profile="default",
)
kb.complete_task(conn, tid, result="done")
finally:
conn.close()
runner = object.__new__(GatewayRunner)
runner._running = True
runner._kanban_sub_fail_counts = {}
runner._kanban_notifier_profile = "business-partner"
fake_adapter = MagicMock()
fake_adapter.send = AsyncMock()
runner.adapters = {Platform.TELEGRAM: fake_adapter}
_orig_sleep = asyncio.sleep
tick_count = 0
async def _fast_sleep(_):
nonlocal tick_count
await _orig_sleep(0)
tick_count += 1
if tick_count >= 3:
runner._running = False
with patch("gateway.run.asyncio.sleep", side_effect=_fast_sleep):
await asyncio.wait_for(
runner._kanban_notifier_watcher(interval=1),
timeout=10.0,
)
fake_adapter.send.assert_not_called()
conn = kb.connect()
try:
subs = kb.list_notify_subs(conn, tid)
finally:
conn.close()
assert len(subs) == 1
assert int(subs[0]["last_event_id"]) == 0, "wrong profile must not claim the event"
@pytest.mark.asyncio
async def test_notifier_delivers_subscription_owned_by_current_profile(kanban_home):
"""The gateway for the profile that created/subscribed the task reports it."""
import hermes_cli.kanban_db as kb
from gateway.run import GatewayRunner
from gateway.config import Platform
conn = kb.connect()
try:
tid = kb.create_task(conn, title="owned task", assignee="backend-engineer")
kb.add_notify_sub(
conn,
task_id=tid,
platform="telegram",
chat_id="chat1",
notifier_profile="default",
)
kb.complete_task(conn, tid, result="done")
finally:
conn.close()
runner = object.__new__(GatewayRunner)
runner._running = True
runner._kanban_sub_fail_counts = {}
runner._kanban_notifier_profile = "default"
fake_adapter = MagicMock()
async def _send_and_stop(chat_id, msg, metadata=None):
runner._running = False
fake_adapter.send = AsyncMock(side_effect=_send_and_stop)
runner.adapters = {Platform.TELEGRAM: fake_adapter}
_orig_sleep = asyncio.sleep
async def _fast_sleep(_):
await _orig_sleep(0)
with patch("gateway.run.asyncio.sleep", side_effect=_fast_sleep):
await asyncio.wait_for(
runner._kanban_notifier_watcher(interval=1),
timeout=10.0,
)
fake_adapter.send.assert_called_once()
conn = kb.connect()
try:
subs = kb.list_notify_subs(conn, tid)
finally:
conn.close()
assert subs == []