mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-25 05:52:34 +00:00
fix: deduplicate kanban notifications for blocked/gave_up states
The kanban notifier was re-firing the same blocked/gave_up/crashed/timed_out notifications on every 5-second tick. Root cause: after delivering a terminal event, the notifier unsubscribed the subscription, deleting its cursor. If the unsub failed (WAL contention, transient error), the subscription survived with a stale cursor, and the next tick would re-deliver the same event. Even when the unsub succeeded, the subscription was gone. If the task later transitioned to a different state (e.g., blocked -> unblocked -> blocked again), a new subscription would start at cursor=0, re-delivering all past events. Fix: stop unsubscribing on terminal event kinds. Only remove the subscription when the task reaches a truly final status (done/archived). For blocked, gave_up, crashed, and timed_out, the subscription stays alive and the cursor mechanism deduplicates naturally -- events with id <= last_event_id are never re-fetched. This makes the dedup idempotent and eliminates the re-fire bug. The old concern about subscriptions leaking forever on blocked tasks is moot: blocked tasks will eventually be unblocked (transitioning to ready/running) or archived, at which point the subscription is cleaned up.
This commit is contained in:
parent
04e18160ab
commit
a96dd54872
1 changed files with 23 additions and 13 deletions
|
|
@ -4098,10 +4098,18 @@ class GatewayRunner:
|
||||||
return
|
return
|
||||||
|
|
||||||
TERMINAL_KINDS = ("completed", "blocked", "gave_up", "crashed", "timed_out")
|
TERMINAL_KINDS = ("completed", "blocked", "gave_up", "crashed", "timed_out")
|
||||||
# Terminal event kinds trigger automatic unsubscription — the task
|
# Subscriptions are removed only when the task reaches a truly final
|
||||||
# is done or in a retry-needed state that the human
|
# status (done / archived). We used to also unsub on any terminal
|
||||||
# shouldn't keep pinging a stale chat for.
|
# event kind (gave_up / crashed / timed_out / blocked), but that
|
||||||
TERMINAL_EVENT_KINDS = ("completed", "gave_up", "crashed", "timed_out")
|
# silently dropped the user out of the loop whenever the dispatcher
|
||||||
|
# respawned the task: a worker that crashes, gets reclaimed, runs
|
||||||
|
# again, and crashes a second time would only notify on the first
|
||||||
|
# crash because the subscription was deleted after the first event.
|
||||||
|
# Same shape as the reblock-after-unblock cycle that PR #22941
|
||||||
|
# fixed for `blocked`. Keeping the subscription alive until the
|
||||||
|
# task is genuinely done lets the cursor (advanced atomically by
|
||||||
|
# claim_unseen_events_for_sub) handle dedup, and any retry-loop
|
||||||
|
# event reaches the user.
|
||||||
# Per-subscription send-failure counter. Adapter.send raising
|
# Per-subscription send-failure counter. Adapter.send raising
|
||||||
# means the chat is dead (deleted, bot kicked, etc.) — after N
|
# means the chat is dead (deleted, bot kicked, etc.) — after N
|
||||||
# consecutive send failures the sub is dropped so we don't spin
|
# consecutive send failures the sub is dropped so we don't spin
|
||||||
|
|
@ -4339,19 +4347,21 @@ class GatewayRunner:
|
||||||
# dropping the subscription is the terminal action.
|
# dropping the subscription is the terminal action.
|
||||||
break
|
break
|
||||||
else:
|
else:
|
||||||
# All events delivered; advance cursor + maybe unsub.
|
# All events delivered; advance cursor. The cursor
|
||||||
|
# is the dedup mechanism — it prevents re-delivery
|
||||||
|
# of the same event on subsequent ticks.
|
||||||
await asyncio.to_thread(
|
await asyncio.to_thread(
|
||||||
self._kanban_advance, sub, d["cursor"], board_slug,
|
self._kanban_advance, sub, d["cursor"], board_slug,
|
||||||
)
|
)
|
||||||
# Unsubscribe when the LAST delivered event is a
|
# Unsubscribe only when the task has reached a truly
|
||||||
# terminal kind (the task hit a "no further updates"
|
# final status (done / archived). For blocked /
|
||||||
# state), not just on task.status in {done, archived}.
|
# gave_up / crashed / timed_out the subscription is
|
||||||
# Covers blocked / gave_up / crashed / timed_out which
|
# kept alive so the user gets notified again if the
|
||||||
# used to leak subs forever.
|
# dispatcher respawns the task and it cycles into the
|
||||||
last_kind = d["events"][-1].kind if d["events"] else None
|
# same state. See the longer comment on TERMINAL_KINDS
|
||||||
|
# above for the failure mode this prevents.
|
||||||
task_terminal = task and task.status in ("done", "archived")
|
task_terminal = task and task.status in ("done", "archived")
|
||||||
event_terminal = last_kind in TERMINAL_EVENT_KINDS
|
if task_terminal:
|
||||||
if task_terminal or event_terminal:
|
|
||||||
await asyncio.to_thread(
|
await asyncio.to_thread(
|
||||||
self._kanban_unsub, sub, board_slug,
|
self._kanban_unsub, sub, board_slug,
|
||||||
)
|
)
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue