diff --git a/gateway/platforms/weixin.py b/gateway/platforms/weixin.py index 3a4a805401f..5821d922f8c 100644 --- a/gateway/platforms/weixin.py +++ b/gateway/platforms/weixin.py @@ -1017,6 +1017,16 @@ class WeixinAdapter(BasePlatformAdapter): self._cdn_base_url = str( extra.get("cdn_base_url") or os.getenv("WEIXIN_CDN_BASE_URL", WEIXIN_CDN_BASE_URL) ).strip().rstrip("/") + self._send_chunk_delay_seconds = float( + extra.get("send_chunk_delay_seconds") or os.getenv("WEIXIN_SEND_CHUNK_DELAY_SECONDS", "0.35") + ) + self._send_chunk_retries = int( + extra.get("send_chunk_retries") or os.getenv("WEIXIN_SEND_CHUNK_RETRIES", "2") + ) + self._send_chunk_retry_delay_seconds = float( + extra.get("send_chunk_retry_delay_seconds") + or os.getenv("WEIXIN_SEND_CHUNK_RETRY_DELAY_SECONDS", "1.0") + ) self._dm_policy = str(extra.get("dm_policy") or os.getenv("WEIXIN_DM_POLICY", "open")).strip().lower() self._group_policy = str(extra.get("group_policy") or os.getenv("WEIXIN_GROUP_POLICY", "disabled")).strip().lower() allow_from = extra.get("allow_from") @@ -1346,6 +1356,47 @@ class WeixinAdapter(BasePlatformAdapter): content, self.MAX_MESSAGE_LENGTH, self._split_multiline_messages, ) + async def _send_text_chunk( + self, + *, + chat_id: str, + chunk: str, + context_token: Optional[str], + client_id: str, + ) -> None: + """Send a single text chunk with per-chunk retry and backoff.""" + last_error: Optional[Exception] = None + for attempt in range(self._send_chunk_retries + 1): + try: + await _send_message( + self._session, + base_url=self._base_url, + token=self._token, + to=chat_id, + text=chunk, + context_token=context_token, + client_id=client_id, + ) + return + except Exception as exc: + last_error = exc + if attempt >= self._send_chunk_retries: + break + wait = self._send_chunk_retry_delay_seconds * (attempt + 1) + logger.warning( + "[%s] send chunk failed to=%s attempt=%d/%d, retrying in %.2fs: %s", + self.name, + _safe_id(chat_id), + attempt + 1, + self._send_chunk_retries + 1, + wait, + exc, + ) + if wait > 0: + await asyncio.sleep(wait) + assert last_error is not None + raise last_error + async def send( self, chat_id: str, @@ -1360,19 +1411,16 @@ class WeixinAdapter(BasePlatformAdapter): try: chunks = self._split_text(self.format_message(content)) for idx, chunk in enumerate(chunks): - if idx > 0: - await asyncio.sleep(0.3) client_id = f"hermes-weixin-{uuid.uuid4().hex}" - await _send_message( - self._session, - base_url=self._base_url, - token=self._token, - to=chat_id, - text=chunk, + await self._send_text_chunk( + chat_id=chat_id, + chunk=chunk, context_token=context_token, client_id=client_id, ) last_message_id = client_id + if idx < len(chunks) - 1 and self._send_chunk_delay_seconds > 0: + await asyncio.sleep(self._send_chunk_delay_seconds) return SendResult(success=True, message_id=last_message_id) except Exception as exc: logger.error("[%s] send failed to=%s: %s", self.name, _safe_id(chat_id), exc) diff --git a/tests/gateway/test_weixin.py b/tests/gateway/test_weixin.py index 815ea75ef0a..bb439fa9a6b 100644 --- a/tests/gateway/test_weixin.py +++ b/tests/gateway/test_weixin.py @@ -283,6 +283,55 @@ class TestWeixinSendMessageIntegration: ) +class TestWeixinChunkDelivery: + def _connected_adapter(self) -> WeixinAdapter: + adapter = _make_adapter() + adapter._session = object() + adapter._token = "test-token" + adapter._base_url = "https://weixin.example.com" + adapter._token_store.get = lambda account_id, chat_id: "ctx-token" + return adapter + + @patch("gateway.platforms.weixin.asyncio.sleep", new_callable=AsyncMock) + @patch("gateway.platforms.weixin._send_message", new_callable=AsyncMock) + def test_send_waits_between_multiple_chunks(self, send_message_mock, sleep_mock): + adapter = self._connected_adapter() + adapter.MAX_MESSAGE_LENGTH = 12 + + # Use double newlines so _pack_markdown_blocks splits into 3 blocks + result = asyncio.run(adapter.send("wxid_test123", "first\n\nsecond\n\nthird")) + + assert result.success is True + assert send_message_mock.await_count == 3 + assert sleep_mock.await_count == 2 + + @patch("gateway.platforms.weixin.asyncio.sleep", new_callable=AsyncMock) + @patch("gateway.platforms.weixin._send_message", new_callable=AsyncMock) + def test_send_retries_failed_chunk_before_continuing(self, send_message_mock, sleep_mock): + adapter = self._connected_adapter() + adapter.MAX_MESSAGE_LENGTH = 12 + calls = {"count": 0} + + async def flaky_send(*args, **kwargs): + calls["count"] += 1 + if calls["count"] == 2: + raise RuntimeError("temporary iLink failure") + + send_message_mock.side_effect = flaky_send + + # Use double newlines so _pack_markdown_blocks splits into 3 blocks + result = asyncio.run(adapter.send("wxid_test123", "first\n\nsecond\n\nthird")) + + assert result.success is True + # 3 chunks, but chunk 2 fails once and retries → 4 _send_message calls total + assert send_message_mock.await_count == 4 + # The retried chunk should reuse the same client_id for deduplication + first_try = send_message_mock.await_args_list[1].kwargs + retry = send_message_mock.await_args_list[2].kwargs + assert first_try["text"] == retry["text"] + assert first_try["client_id"] == retry["client_id"] + + class TestWeixinRemoteMediaSafety: def test_download_remote_media_blocks_unsafe_urls(self): adapter = _make_adapter()