mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-25 00:51:20 +00:00
fix: prevent duplicate messages — gateway dedup + partial stream guard (#4878)
* fix(gateway): add message deduplication to Discord and Slack adapters (#4777) Discord RESUME replays events after reconnects (~7/day observed), and Slack Socket Mode can redeliver events if the ack was lost. Neither adapter tracked which messages were already processed, causing duplicate bot responses. Add _seen_messages dedup cache (message ID → timestamp) with 5-min TTL and 2000-entry cap to both adapters, matching the pattern already used by Mattermost, Matrix, WeCom, Feishu, DingTalk, and Email. The check goes at the very top of the message handler, before any other logic, so replayed events are silently dropped. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: prevent duplicate messages on partial stream delivery When streaming fails after tokens are already delivered to the platform, _interruptible_streaming_api_call re-raised the error into the outer retry loop, which would make a new API call — creating a duplicate message. Now checks deltas_were_sent before re-raising: if partial content was already streamed, returns a stub response instead. The outer loop treats the turn as complete (no retry, no fallback, no duplicate). Inspired by PR #4871 (@trevorgordon981) which identified the bug. This implementation avoids monkey-patching exception objects and keeps the fix within the streaming call boundary. --------- Co-authored-by: Mibayy <mibayy@users.noreply.github.com> Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
36aace34aa
commit
cee761ee4a
3 changed files with 61 additions and 0 deletions
|
|
@ -449,6 +449,11 @@ class DiscordAdapter(BasePlatformAdapter):
|
|||
self._bot_task: Optional[asyncio.Task] = None
|
||||
# Cap to prevent unbounded growth (Discord threads get archived).
|
||||
self._MAX_TRACKED_THREADS = 500
|
||||
# Dedup cache: message_id → timestamp. Prevents duplicate bot
|
||||
# responses when Discord RESUME replays events after reconnects.
|
||||
self._seen_messages: Dict[str, float] = {}
|
||||
self._SEEN_TTL = 300 # 5 minutes
|
||||
self._SEEN_MAX = 2000 # prune threshold
|
||||
|
||||
async def connect(self) -> bool:
|
||||
"""Connect to Discord and start receiving events."""
|
||||
|
|
@ -539,6 +544,19 @@ class DiscordAdapter(BasePlatformAdapter):
|
|||
|
||||
@self._client.event
|
||||
async def on_message(message: DiscordMessage):
|
||||
# Dedup: Discord RESUME replays events after reconnects (#4777)
|
||||
msg_id = str(message.id)
|
||||
now = time.time()
|
||||
if msg_id in adapter_self._seen_messages:
|
||||
return
|
||||
adapter_self._seen_messages[msg_id] = now
|
||||
if len(adapter_self._seen_messages) > adapter_self._SEEN_MAX:
|
||||
cutoff = now - adapter_self._SEEN_TTL
|
||||
adapter_self._seen_messages = {
|
||||
k: v for k, v in adapter_self._seen_messages.items()
|
||||
if v > cutoff
|
||||
}
|
||||
|
||||
# Always ignore our own messages
|
||||
if message.author == self._client.user:
|
||||
return
|
||||
|
|
|
|||
|
|
@ -13,6 +13,7 @@ import json
|
|||
import logging
|
||||
import os
|
||||
import re
|
||||
import time
|
||||
from typing import Dict, Optional, Any
|
||||
|
||||
try:
|
||||
|
|
@ -78,6 +79,11 @@ class SlackAdapter(BasePlatformAdapter):
|
|||
self._team_clients: Dict[str, AsyncWebClient] = {} # team_id → WebClient
|
||||
self._team_bot_user_ids: Dict[str, str] = {} # team_id → bot_user_id
|
||||
self._channel_team: Dict[str, str] = {} # channel_id → team_id
|
||||
# Dedup cache: event_ts → timestamp. Prevents duplicate bot
|
||||
# responses when Socket Mode reconnects redeliver events.
|
||||
self._seen_messages: Dict[str, float] = {}
|
||||
self._SEEN_TTL = 300 # 5 minutes
|
||||
self._SEEN_MAX = 2000 # prune threshold
|
||||
|
||||
async def connect(self) -> bool:
|
||||
"""Connect to Slack via Socket Mode."""
|
||||
|
|
@ -710,6 +716,20 @@ class SlackAdapter(BasePlatformAdapter):
|
|||
|
||||
async def _handle_slack_message(self, event: dict) -> None:
|
||||
"""Handle an incoming Slack message event."""
|
||||
# Dedup: Slack Socket Mode can redeliver events after reconnects (#4777)
|
||||
event_ts = event.get("ts", "")
|
||||
if event_ts:
|
||||
now = time.time()
|
||||
if event_ts in self._seen_messages:
|
||||
return
|
||||
self._seen_messages[event_ts] = now
|
||||
if len(self._seen_messages) > self._SEEN_MAX:
|
||||
cutoff = now - self._SEEN_TTL
|
||||
self._seen_messages = {
|
||||
k: v for k, v in self._seen_messages.items()
|
||||
if v > cutoff
|
||||
}
|
||||
|
||||
# Ignore bot messages (including our own)
|
||||
if event.get("bot_id") or event.get("subtype") == "bot_message":
|
||||
return
|
||||
|
|
|
|||
23
run_agent.py
23
run_agent.py
|
|
@ -4488,6 +4488,29 @@ class AIAgent:
|
|||
pass
|
||||
raise InterruptedError("Agent interrupted during streaming API call")
|
||||
if result["error"] is not None:
|
||||
if deltas_were_sent["yes"]:
|
||||
# Streaming failed AFTER some tokens were already delivered to
|
||||
# the platform. Re-raising would let the outer retry loop make
|
||||
# a new API call, creating a duplicate message. Return a
|
||||
# partial "stop" response instead so the outer loop treats this
|
||||
# turn as complete (no retry, no fallback).
|
||||
logger.warning(
|
||||
"Partial stream delivered before error; returning stub "
|
||||
"response to prevent duplicate messages: %s",
|
||||
result["error"],
|
||||
)
|
||||
_stub_msg = SimpleNamespace(
|
||||
role="assistant", content=None, tool_calls=None,
|
||||
reasoning_content=None,
|
||||
)
|
||||
return SimpleNamespace(
|
||||
id="partial-stream-stub",
|
||||
model=getattr(self, "model", "unknown"),
|
||||
choices=[SimpleNamespace(
|
||||
index=0, message=_stub_msg, finish_reason="stop",
|
||||
)],
|
||||
usage=None,
|
||||
)
|
||||
raise result["error"]
|
||||
return result["response"]
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue