diff --git a/tests/gateway/test_kanban_notifier.py b/tests/gateway/test_kanban_notifier.py index a7a5fcbb1ee..8e85f045037 100644 --- a/tests/gateway/test_kanban_notifier.py +++ b/tests/gateway/test_kanban_notifier.py @@ -172,3 +172,65 @@ def test_kanban_notifier_rewinds_claim_on_send_exception(tmp_path, monkeypatch): # still returns the event for retry on the next tick. assert adapter.attempts >= 1, "send should have been attempted at least once" assert [ev.kind for ev in _unseen_terminal_events(tid)] == ["completed"] + + +def test_notifier_redelivers_same_kind_on_dispatch_cycle(tmp_path, monkeypatch): + """A retry cycle (crashed → reclaimed → crashed) notifies the user twice. + + Before #21398 the notifier auto-unsubscribed on any terminal event kind + (gave_up / crashed / timed_out), so the second crash in a respawn cycle + silently dropped — the subscription was already gone. This test pins the + new contract: subscription survives non-final terminal events; the + cursor handles dedup. + + Two crashes ten seconds apart on the same task — both should land on + the adapter. + """ + db_path = tmp_path / "redeliver-cycle.db" + monkeypatch.setenv("HERMES_KANBAN_DB", str(db_path)) + kb.init_db() + + conn = kb.connect() + try: + tid = kb.create_task(conn, title="cycle test", assignee="worker") + kb.add_notify_sub(conn, task_id=tid, platform="telegram", chat_id="chat-1") + # First crash — fired by the dispatcher when the worker PID dies. + kb._append_event(conn, tid, kind="crashed") + finally: + conn.close() + + adapter = RecordingAdapter() + runner = _make_runner(adapter) + asyncio.run(_run_one_notifier_tick(monkeypatch, runner)) + + # First crash delivered. + assert len(adapter.sent) == 1 + assert "crashed" in adapter.sent[0]["text"].lower() + + # Subscription survives — the cursor advanced past event #1, but the + # row is still there. + conn = kb.connect() + try: + subs = kb.list_notify_subs(conn, tid) + assert len(subs) == 1, ( + "Subscription must survive a crashed event so a respawn-cycle " + "second crash also notifies the user (issue #21398)." + ) + + # Second crash — same task, same dispatcher (or a respawn). Append + # another event to simulate the dispatcher firing crashed a second + # time during retry. + kb._append_event(conn, tid, kind="crashed") + finally: + conn.close() + + # New tick: the second event has a fresh id past the cursor advance, + # so it gets claimed and delivered. + runner = _make_runner(adapter) + asyncio.run(_run_one_notifier_tick(monkeypatch, runner)) + + assert len(adapter.sent) == 2, ( + f"Second crashed event should also notify; got {len(adapter.sent)} " + f"deliveries (texts: {[d['text'] for d in adapter.sent]})" + ) + assert "crashed" in adapter.sent[1]["text"].lower() diff --git a/tests/hermes_cli/test_kanban_notify.py b/tests/hermes_cli/test_kanban_notify.py index 3b8cf4865d9..6e59fb45aa7 100644 --- a/tests/hermes_cli/test_kanban_notify.py +++ b/tests/hermes_cli/test_kanban_notify.py @@ -76,7 +76,13 @@ async def test_notifier_unsubs_after_completed_event(kanban_home): @pytest.mark.parametrize('kind', ["gave_up", "crashed", "timed_out"]) async def test_notifier_unsubs_after_abnormal_events(kind, kanban_home): """ - Event kind of gave_up, crashed, time_out would be cover, and remove subscription + Event kinds gave_up / crashed / timed_out send a notification but DO + NOT delete the subscription. The dispatcher may respawn the task and + fire the same event kind again (e.g. a worker that crashes, gets + reclaimed, and crashes a second time); the user must hear about the + second event too. Subscriptions are removed only when the task hits + a truly final status (done / archived) — see the comment on + TERMINAL_KINDS in gateway/run.py and PR #21398. """ import hermes_cli.kanban_db as kb from gateway.run import GatewayRunner @@ -114,15 +120,27 @@ async def test_notifier_unsubs_after_abnormal_events(kind, kanban_home): timeout=10.0, ) + # The user is notified about the abnormal event... fake_adapter.send.assert_called_once() assert kind.replace('_', ' ') in fake_adapter.send.call_args[0][1] + # ...but the subscription survives so a respawn-then-same-event cycle + # reaches the user too. The cursor (last_event_id) advanced inside + # the same write txn as the claim, so the same event won't re-fire. conn = kb.connect() try: subs = kb.list_notify_subs(conn, tid) finally: conn.close() - assert subs == [], "Subscription should be unsub after abnormal crash" + assert len(subs) == 1, ( + f"Subscription should survive {kind!r} so the next cycle of the " + f"same event reaches the user; got {subs!r}" + ) + assert int(subs[0]["last_event_id"]) >= 1, ( + "Cursor should have advanced past the delivered event " + "(claim_unseen_events_for_sub advances atomically inside the " + "same write txn as the read)." + ) @pytest.mark.asyncio