diff --git a/gateway/platforms/discord.py b/gateway/platforms/discord.py index 91e6710d2..21fa69b6e 100644 --- a/gateway/platforms/discord.py +++ b/gateway/platforms/discord.py @@ -449,6 +449,11 @@ class DiscordAdapter(BasePlatformAdapter): self._bot_task: Optional[asyncio.Task] = None # Cap to prevent unbounded growth (Discord threads get archived). self._MAX_TRACKED_THREADS = 500 + # Dedup cache: message_id → timestamp. Prevents duplicate bot + # responses when Discord RESUME replays events after reconnects. + self._seen_messages: Dict[str, float] = {} + self._SEEN_TTL = 300 # 5 minutes + self._SEEN_MAX = 2000 # prune threshold async def connect(self) -> bool: """Connect to Discord and start receiving events.""" @@ -539,6 +544,19 @@ class DiscordAdapter(BasePlatformAdapter): @self._client.event async def on_message(message: DiscordMessage): + # Dedup: Discord RESUME replays events after reconnects (#4777) + msg_id = str(message.id) + now = time.time() + if msg_id in adapter_self._seen_messages: + return + adapter_self._seen_messages[msg_id] = now + if len(adapter_self._seen_messages) > adapter_self._SEEN_MAX: + cutoff = now - adapter_self._SEEN_TTL + adapter_self._seen_messages = { + k: v for k, v in adapter_self._seen_messages.items() + if v > cutoff + } + # Always ignore our own messages if message.author == self._client.user: return diff --git a/gateway/platforms/slack.py b/gateway/platforms/slack.py index be1180350..2e7bbee73 100644 --- a/gateway/platforms/slack.py +++ b/gateway/platforms/slack.py @@ -13,6 +13,7 @@ import json import logging import os import re +import time from typing import Dict, Optional, Any try: @@ -78,6 +79,11 @@ class SlackAdapter(BasePlatformAdapter): self._team_clients: Dict[str, AsyncWebClient] = {} # team_id → WebClient self._team_bot_user_ids: Dict[str, str] = {} # team_id → bot_user_id self._channel_team: Dict[str, str] = {} # channel_id → team_id + # Dedup cache: event_ts → timestamp. Prevents duplicate bot + # responses when Socket Mode reconnects redeliver events. + self._seen_messages: Dict[str, float] = {} + self._SEEN_TTL = 300 # 5 minutes + self._SEEN_MAX = 2000 # prune threshold async def connect(self) -> bool: """Connect to Slack via Socket Mode.""" @@ -710,6 +716,20 @@ class SlackAdapter(BasePlatformAdapter): async def _handle_slack_message(self, event: dict) -> None: """Handle an incoming Slack message event.""" + # Dedup: Slack Socket Mode can redeliver events after reconnects (#4777) + event_ts = event.get("ts", "") + if event_ts: + now = time.time() + if event_ts in self._seen_messages: + return + self._seen_messages[event_ts] = now + if len(self._seen_messages) > self._SEEN_MAX: + cutoff = now - self._SEEN_TTL + self._seen_messages = { + k: v for k, v in self._seen_messages.items() + if v > cutoff + } + # Ignore bot messages (including our own) if event.get("bot_id") or event.get("subtype") == "bot_message": return diff --git a/run_agent.py b/run_agent.py index ab1023233..a2330f525 100644 --- a/run_agent.py +++ b/run_agent.py @@ -4488,6 +4488,29 @@ class AIAgent: pass raise InterruptedError("Agent interrupted during streaming API call") if result["error"] is not None: + if deltas_were_sent["yes"]: + # Streaming failed AFTER some tokens were already delivered to + # the platform. Re-raising would let the outer retry loop make + # a new API call, creating a duplicate message. Return a + # partial "stop" response instead so the outer loop treats this + # turn as complete (no retry, no fallback). + logger.warning( + "Partial stream delivered before error; returning stub " + "response to prevent duplicate messages: %s", + result["error"], + ) + _stub_msg = SimpleNamespace( + role="assistant", content=None, tool_calls=None, + reasoning_content=None, + ) + return SimpleNamespace( + id="partial-stream-stub", + model=getattr(self, "model", "unknown"), + choices=[SimpleNamespace( + index=0, message=_stub_msg, finish_reason="stop", + )], + usage=None, + ) raise result["error"] return result["response"]