From cee761ee4a2ff2791be13eb01716d2344fbb3a15 Mon Sep 17 00:00:00 2001 From: Teknium <127238744+teknium1@users.noreply.github.com> Date: Fri, 3 Apr 2026 18:53:52 -0700 Subject: [PATCH] =?UTF-8?q?fix:=20prevent=20duplicate=20messages=20?= =?UTF-8?q?=E2=80=94=20gateway=20dedup=20+=20partial=20stream=20guard=20(#?= =?UTF-8?q?4878)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix(gateway): add message deduplication to Discord and Slack adapters (#4777) Discord RESUME replays events after reconnects (~7/day observed), and Slack Socket Mode can redeliver events if the ack was lost. Neither adapter tracked which messages were already processed, causing duplicate bot responses. Add _seen_messages dedup cache (message ID → timestamp) with 5-min TTL and 2000-entry cap to both adapters, matching the pattern already used by Mattermost, Matrix, WeCom, Feishu, DingTalk, and Email. The check goes at the very top of the message handler, before any other logic, so replayed events are silently dropped. Co-Authored-By: Claude Opus 4.6 (1M context) * fix: prevent duplicate messages on partial stream delivery When streaming fails after tokens are already delivered to the platform, _interruptible_streaming_api_call re-raised the error into the outer retry loop, which would make a new API call — creating a duplicate message. Now checks deltas_were_sent before re-raising: if partial content was already streamed, returns a stub response instead. The outer loop treats the turn as complete (no retry, no fallback, no duplicate). Inspired by PR #4871 (@trevorgordon981) which identified the bug. This implementation avoids monkey-patching exception objects and keeps the fix within the streaming call boundary. --------- Co-authored-by: Mibayy Co-authored-by: Claude Opus 4.6 (1M context) --- gateway/platforms/discord.py | 18 ++++++++++++++++++ gateway/platforms/slack.py | 20 ++++++++++++++++++++ run_agent.py | 23 +++++++++++++++++++++++ 3 files changed, 61 insertions(+) diff --git a/gateway/platforms/discord.py b/gateway/platforms/discord.py index 91e6710d26..21fa69b6eb 100644 --- a/gateway/platforms/discord.py +++ b/gateway/platforms/discord.py @@ -449,6 +449,11 @@ class DiscordAdapter(BasePlatformAdapter): self._bot_task: Optional[asyncio.Task] = None # Cap to prevent unbounded growth (Discord threads get archived). self._MAX_TRACKED_THREADS = 500 + # Dedup cache: message_id → timestamp. Prevents duplicate bot + # responses when Discord RESUME replays events after reconnects. + self._seen_messages: Dict[str, float] = {} + self._SEEN_TTL = 300 # 5 minutes + self._SEEN_MAX = 2000 # prune threshold async def connect(self) -> bool: """Connect to Discord and start receiving events.""" @@ -539,6 +544,19 @@ class DiscordAdapter(BasePlatformAdapter): @self._client.event async def on_message(message: DiscordMessage): + # Dedup: Discord RESUME replays events after reconnects (#4777) + msg_id = str(message.id) + now = time.time() + if msg_id in adapter_self._seen_messages: + return + adapter_self._seen_messages[msg_id] = now + if len(adapter_self._seen_messages) > adapter_self._SEEN_MAX: + cutoff = now - adapter_self._SEEN_TTL + adapter_self._seen_messages = { + k: v for k, v in adapter_self._seen_messages.items() + if v > cutoff + } + # Always ignore our own messages if message.author == self._client.user: return diff --git a/gateway/platforms/slack.py b/gateway/platforms/slack.py index be11803504..2e7bbee739 100644 --- a/gateway/platforms/slack.py +++ b/gateway/platforms/slack.py @@ -13,6 +13,7 @@ import json import logging import os import re +import time from typing import Dict, Optional, Any try: @@ -78,6 +79,11 @@ class SlackAdapter(BasePlatformAdapter): self._team_clients: Dict[str, AsyncWebClient] = {} # team_id → WebClient self._team_bot_user_ids: Dict[str, str] = {} # team_id → bot_user_id self._channel_team: Dict[str, str] = {} # channel_id → team_id + # Dedup cache: event_ts → timestamp. Prevents duplicate bot + # responses when Socket Mode reconnects redeliver events. + self._seen_messages: Dict[str, float] = {} + self._SEEN_TTL = 300 # 5 minutes + self._SEEN_MAX = 2000 # prune threshold async def connect(self) -> bool: """Connect to Slack via Socket Mode.""" @@ -710,6 +716,20 @@ class SlackAdapter(BasePlatformAdapter): async def _handle_slack_message(self, event: dict) -> None: """Handle an incoming Slack message event.""" + # Dedup: Slack Socket Mode can redeliver events after reconnects (#4777) + event_ts = event.get("ts", "") + if event_ts: + now = time.time() + if event_ts in self._seen_messages: + return + self._seen_messages[event_ts] = now + if len(self._seen_messages) > self._SEEN_MAX: + cutoff = now - self._SEEN_TTL + self._seen_messages = { + k: v for k, v in self._seen_messages.items() + if v > cutoff + } + # Ignore bot messages (including our own) if event.get("bot_id") or event.get("subtype") == "bot_message": return diff --git a/run_agent.py b/run_agent.py index ab10232332..a2330f525b 100644 --- a/run_agent.py +++ b/run_agent.py @@ -4488,6 +4488,29 @@ class AIAgent: pass raise InterruptedError("Agent interrupted during streaming API call") if result["error"] is not None: + if deltas_were_sent["yes"]: + # Streaming failed AFTER some tokens were already delivered to + # the platform. Re-raising would let the outer retry loop make + # a new API call, creating a duplicate message. Return a + # partial "stop" response instead so the outer loop treats this + # turn as complete (no retry, no fallback). + logger.warning( + "Partial stream delivered before error; returning stub " + "response to prevent duplicate messages: %s", + result["error"], + ) + _stub_msg = SimpleNamespace( + role="assistant", content=None, tool_calls=None, + reasoning_content=None, + ) + return SimpleNamespace( + id="partial-stream-stub", + model=getattr(self, "model", "unknown"), + choices=[SimpleNamespace( + index=0, message=_stub_msg, finish_reason="stop", + )], + usage=None, + ) raise result["error"] return result["response"]