fix(gateway): deduplicate Weixin messages by content fingerprint

This commit is contained in:
Pratik Rai 2026-04-27 00:39:11 +05:30 committed by Teknium
parent 0b5fd40a01
commit 7a8ee8b29d
2 changed files with 50 additions and 3 deletions

View file

@ -1333,6 +1333,15 @@ class WeixinAdapter(BasePlatformAdapter):
if message_id and self._dedup.is_duplicate(message_id): if message_id and self._dedup.is_duplicate(message_id):
return return
# Secondary content-fingerprint dedup for text messages
item_list = message.get("item_list") or []
text = _extract_text(item_list)
if text:
content_key = f"content:{sender_id}:{hashlib.md5(text.encode()).hexdigest()}"
if self._dedup.is_duplicate(content_key):
logger.debug("[%s] Content-dedup: skipping duplicate message from %s", self.name, sender_id)
return
chat_type, effective_chat_id = _guess_chat_type(message, self._account_id) chat_type, effective_chat_id = _guess_chat_type(message, self._account_id)
if chat_type == "group": if chat_type == "group":
if self._group_policy == "disabled": if self._group_policy == "disabled":
@ -1347,8 +1356,6 @@ class WeixinAdapter(BasePlatformAdapter):
self._token_store.set(self._account_id, sender_id, context_token) self._token_store.set(self._account_id, sender_id, context_token)
asyncio.create_task(self._maybe_fetch_typing_ticket(sender_id, context_token or None)) asyncio.create_task(self._maybe_fetch_typing_ticket(sender_id, context_token or None))
item_list = message.get("item_list") or []
text = _extract_text(item_list)
media_paths: List[str] = [] media_paths: List[str] = []
media_types: List[str] = [] media_types: List[str] = []

View file

@ -5,7 +5,7 @@ import base64
import json import json
import os import os
from pathlib import Path from pathlib import Path
from unittest.mock import AsyncMock, patch from unittest.mock import AsyncMock, Mock, patch
from gateway.config import PlatformConfig from gateway.config import PlatformConfig
from gateway.config import GatewayConfig, HomeChannel, Platform, _apply_env_overrides from gateway.config import GatewayConfig, HomeChannel, Platform, _apply_env_overrides
@ -788,3 +788,43 @@ class TestIsStaleSessionRet:
def test_success_codes_are_not_stale(self): def test_success_codes_are_not_stale(self):
assert weixin._is_stale_session_ret(0, 0, "") is False assert weixin._is_stale_session_ret(0, 0, "") is False
assert weixin._is_stale_session_ret(None, None, "unknown error") is False assert weixin._is_stale_session_ret(None, None, "unknown error") is False
class TestWeixinContentDedup:
"""Regression tests for Issue #16182 — upstream API sends duplicate content
with different message_ids, bypassing message_id deduplication.
"""
def test_duplicate_content_with_different_message_ids_is_dropped(self):
adapter = _make_adapter()
adapter._poll_session = object()
adapter.handle_message = AsyncMock()
base_msg = {
"from_user_id": "wxid_user1",
"item_list": [{"type": 1, "text_item": {"text": "hello world"}}],
}
asyncio.run(adapter._process_message({**base_msg, "message_id": "msg-1"}))
asyncio.run(adapter._process_message({**base_msg, "message_id": "msg-2"}))
assert adapter.handle_message.await_count == 1
event = adapter.handle_message.await_args[0][0]
assert event.text == "hello world"
def test_content_dedup_not_called_for_messages_without_text(self):
adapter = _make_adapter()
adapter._poll_session = object()
adapter.handle_message = AsyncMock()
adapter._dedup.is_duplicate = Mock(return_value=False)
empty_msg = {
"from_user_id": "wxid_user1",
"message_id": "msg-1",
"item_list": [],
}
asyncio.run(adapter._process_message(empty_msg))
assert adapter.handle_message.await_count == 0
# is_duplicate should only be called for message_id, never for content
assert all("content:" not in str(call) for call in adapter._dedup.is_duplicate.call_args_list)