From a96dd5487274cbb678847b333d463d2075a6cc6a Mon Sep 17 00:00:00 2001 From: jelrod27 <31932854+jelrod27@users.noreply.github.com> Date: Thu, 7 May 2026 09:03:13 -0700 Subject: [PATCH] fix: deduplicate kanban notifications for blocked/gave_up states The kanban notifier was re-firing the same blocked/gave_up/crashed/timed_out notifications on every 5-second tick. Root cause: after delivering a terminal event, the notifier unsubscribed the subscription, deleting its cursor. If the unsub failed (WAL contention, transient error), the subscription survived with a stale cursor, and the next tick would re-deliver the same event. Even when the unsub succeeded, the subscription was gone. If the task later transitioned to a different state (e.g., blocked -> unblocked -> blocked again), a new subscription would start at cursor=0, re-delivering all past events. Fix: stop unsubscribing on terminal event kinds. Only remove the subscription when the task reaches a truly final status (done/archived). For blocked, gave_up, crashed, and timed_out, the subscription stays alive and the cursor mechanism deduplicates naturally -- events with id <= last_event_id are never re-fetched. This makes the dedup idempotent and eliminates the re-fire bug. The old concern about subscriptions leaking forever on blocked tasks is moot: blocked tasks will eventually be unblocked (transitioning to ready/running) or archived, at which point the subscription is cleaned up. --- gateway/run.py | 36 +++++++++++++++++++++++------------- 1 file changed, 23 insertions(+), 13 deletions(-) diff --git a/gateway/run.py b/gateway/run.py index cbb167dcac0..a3eeedcd5a8 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -4098,10 +4098,18 @@ class GatewayRunner: return TERMINAL_KINDS = ("completed", "blocked", "gave_up", "crashed", "timed_out") - # Terminal event kinds trigger automatic unsubscription — the task - # is done or in a retry-needed state that the human - # shouldn't keep pinging a stale chat for. - TERMINAL_EVENT_KINDS = ("completed", "gave_up", "crashed", "timed_out") + # Subscriptions are removed only when the task reaches a truly final + # status (done / archived). We used to also unsub on any terminal + # event kind (gave_up / crashed / timed_out / blocked), but that + # silently dropped the user out of the loop whenever the dispatcher + # respawned the task: a worker that crashes, gets reclaimed, runs + # again, and crashes a second time would only notify on the first + # crash because the subscription was deleted after the first event. + # Same shape as the reblock-after-unblock cycle that PR #22941 + # fixed for `blocked`. Keeping the subscription alive until the + # task is genuinely done lets the cursor (advanced atomically by + # claim_unseen_events_for_sub) handle dedup, and any retry-loop + # event reaches the user. # Per-subscription send-failure counter. Adapter.send raising # means the chat is dead (deleted, bot kicked, etc.) — after N # consecutive send failures the sub is dropped so we don't spin @@ -4339,19 +4347,21 @@ class GatewayRunner: # dropping the subscription is the terminal action. break else: - # All events delivered; advance cursor + maybe unsub. + # All events delivered; advance cursor. The cursor + # is the dedup mechanism — it prevents re-delivery + # of the same event on subsequent ticks. await asyncio.to_thread( self._kanban_advance, sub, d["cursor"], board_slug, ) - # Unsubscribe when the LAST delivered event is a - # terminal kind (the task hit a "no further updates" - # state), not just on task.status in {done, archived}. - # Covers blocked / gave_up / crashed / timed_out which - # used to leak subs forever. - last_kind = d["events"][-1].kind if d["events"] else None + # Unsubscribe only when the task has reached a truly + # final status (done / archived). For blocked / + # gave_up / crashed / timed_out the subscription is + # kept alive so the user gets notified again if the + # dispatcher respawns the task and it cycles into the + # same state. See the longer comment on TERMINAL_KINDS + # above for the failure mode this prevents. task_terminal = task and task.status in ("done", "archived") - event_terminal = last_kind in TERMINAL_EVENT_KINDS - if task_terminal or event_terminal: + if task_terminal: await asyncio.to_thread( self._kanban_unsub, sub, board_slug, )