From b8469a81e3e3f0793615d9e4f71589652ae9bc9e Mon Sep 17 00:00:00 2001 From: Hariharan Ayappane Date: Sat, 16 May 2026 17:11:00 +0530 Subject: [PATCH] fix(weixin): add rate-limit circuit breaker --- gateway/platforms/weixin.py | 72 ++++++++++++++++++++++++++++ tests/gateway/test_weixin.py | 92 ++++++++++++++++++++++++++++++++++++ tools/send_message_tool.py | 16 ++++--- 3 files changed, 174 insertions(+), 6 deletions(-) diff --git a/gateway/platforms/weixin.py b/gateway/platforms/weixin.py index 86358392c20..b1247d8eae0 100644 --- a/gateway/platforms/weixin.py +++ b/gateway/platforms/weixin.py @@ -1174,6 +1174,24 @@ class WeixinAdapter(BasePlatformAdapter): extra.get("send_chunk_retry_delay_seconds") or os.getenv("WEIXIN_SEND_CHUNK_RETRY_DELAY_SECONDS", "1.0") ) + self._send_text_gate = asyncio.Lock() + self._rate_limit_circuit_threshold = max( + 1, + int( + extra.get("rate_limit_circuit_threshold") + or os.getenv("WEIXIN_RATE_LIMIT_CIRCUIT_THRESHOLD", "1") + ), + ) + self._rate_limit_circuit_window_seconds = float( + extra.get("rate_limit_circuit_window_seconds") + or os.getenv("WEIXIN_RATE_LIMIT_CIRCUIT_WINDOW_SECONDS", "30.0") + ) + self._rate_limit_circuit_open_seconds = float( + extra.get("rate_limit_circuit_open_seconds") + or os.getenv("WEIXIN_RATE_LIMIT_CIRCUIT_OPEN_SECONDS", "30.0") + ) + self._rate_limit_circuit_until = 0.0 + self._rate_limit_events: List[float] = [] self._dm_policy = str(extra.get("dm_policy") or os.getenv("WEIXIN_DM_POLICY", "open")).strip().lower() self._group_policy = str(extra.get("group_policy") or os.getenv("WEIXIN_GROUP_POLICY", "disabled")).strip().lower() allow_from = extra.get("allow_from") @@ -1647,6 +1665,37 @@ class WeixinAdapter(BasePlatformAdapter): content, self.MAX_MESSAGE_LENGTH, self._split_multiline_messages, ) + def _rate_limit_cooldown_remaining(self) -> float: + return max(0.0, self._rate_limit_circuit_until - time.monotonic()) + + def _rate_limit_error(self) -> RuntimeError: + return RuntimeError( + f"iLink sendmessage rate limited; cooldown active for {self._rate_limit_cooldown_remaining():.1f}s" + ) + + def _open_rate_limit_circuit(self) -> None: + if self._rate_limit_circuit_open_seconds <= 0: + return + self._rate_limit_circuit_until = max( + self._rate_limit_circuit_until, + time.monotonic() + self._rate_limit_circuit_open_seconds, + ) + + def _record_rate_limit_event(self) -> bool: + """Record a genuine iLink rate limit and return True if breaker opened.""" + now = time.monotonic() + window_start = now - self._rate_limit_circuit_window_seconds + self._rate_limit_events = [ts for ts in self._rate_limit_events if ts >= window_start] + self._rate_limit_events.append(now) + if len(self._rate_limit_events) >= self._rate_limit_circuit_threshold: + self._open_rate_limit_circuit() + return self._rate_limit_cooldown_remaining() > 0 + return False + + def _reset_rate_limit_circuit(self) -> None: + self._rate_limit_events.clear() + self._rate_limit_circuit_until = 0.0 + async def _send_text_chunk( self, *, @@ -1662,9 +1711,28 @@ class WeixinAdapter(BasePlatformAdapter): degraded fallback, which keeps cron-initiated push messages working even when no user message has refreshed the session recently. """ + async with self._send_text_gate: + await self._send_text_chunk_locked( + chat_id=chat_id, + chunk=chunk, + context_token=context_token, + client_id=client_id, + ) + + async def _send_text_chunk_locked( + self, + *, + chat_id: str, + chunk: str, + context_token: Optional[str], + client_id: str, + ) -> None: + """Send a text chunk while holding the adapter-wide outbound text gate.""" last_error: Optional[Exception] = None retried_without_token = False for attempt in range(self._send_chunk_retries + 1): + if self._rate_limit_cooldown_remaining() > 0: + raise self._rate_limit_error() try: resp = await _send_message( self._send_session, @@ -1710,6 +1778,9 @@ class WeixinAdapter(BasePlatformAdapter): last_error = RuntimeError( f"iLink sendmessage rate limited: ret={ret} errcode={errcode} errmsg={errmsg}" ) + if self._record_rate_limit_event(): + last_error = self._rate_limit_error() + break if attempt >= self._send_chunk_retries: break wait = self._send_chunk_retry_delay_seconds * 3 # 3x backoff for rate limit @@ -1723,6 +1794,7 @@ class WeixinAdapter(BasePlatformAdapter): raise RuntimeError( f"iLink sendmessage error: ret={ret} errcode={errcode} errmsg={errmsg}" ) + self._reset_rate_limit_circuit() return except Exception as exc: last_error = exc diff --git a/tests/gateway/test_weixin.py b/tests/gateway/test_weixin.py index bbfba37d51c..5169666e8ba 100644 --- a/tests/gateway/test_weixin.py +++ b/tests/gateway/test_weixin.py @@ -411,6 +411,98 @@ class TestWeixinChunkDelivery: assert first_try["text"] == retry["text"] assert first_try["client_id"] == retry["client_id"] + @patch("gateway.platforms.weixin.asyncio.sleep", new_callable=AsyncMock) + @patch("gateway.platforms.weixin._send_message", new_callable=AsyncMock) + def test_repeated_rate_limits_open_circuit_for_followup_sends(self, send_message_mock, sleep_mock): + adapter = self._connected_adapter() + adapter._send_chunk_retries = 3 + adapter._send_chunk_retry_delay_seconds = 0 + adapter._rate_limit_circuit_threshold = 2 + adapter._rate_limit_circuit_window_seconds = 60 + adapter._rate_limit_circuit_open_seconds = 60 + + send_message_mock.return_value = { + "ret": weixin.RATE_LIMIT_ERRCODE, + "errcode": weixin.RATE_LIMIT_ERRCODE, + "errmsg": "frequency limit", + } + + first = asyncio.run(adapter.send("wxid_test123", "first")) + second = asyncio.run(adapter.send("wxid_test123", "second")) + + assert first.success is False + assert "cooldown" in (first.error or "") + assert second.success is False + assert "cooldown" in (second.error or "") + # The first rate-limit response is retried once. The second response + # crosses the sliding-window threshold, opens the breaker, and both the + # rest of the current chunk and follow-up sends fail fast. + assert send_message_mock.await_count == 2 + assert sleep_mock.await_count == 1 + + @patch("gateway.platforms.weixin._send_message", new_callable=AsyncMock) + def test_open_rate_limit_circuit_fails_fast_without_sendmessage(self, send_message_mock): + adapter = self._connected_adapter() + adapter._rate_limit_circuit_open_seconds = 60 + adapter._open_rate_limit_circuit() + + result = asyncio.run(adapter.send("wxid_test123", "blocked")) + + assert result.success is False + assert "cooldown" in (result.error or "") + send_message_mock.assert_not_awaited() + + @patch("gateway.platforms.weixin._send_message", new_callable=AsyncMock) + def test_successful_send_after_cooldown_resets_rate_limit_state(self, send_message_mock): + adapter = self._connected_adapter() + adapter._rate_limit_circuit_until = weixin.time.monotonic() - 1 + adapter._rate_limit_events = [weixin.time.monotonic()] + send_message_mock.return_value = {"errcode": 0} + + result = asyncio.run(adapter.send("wxid_test123", "after cooldown")) + + assert result.success is True + assert adapter._rate_limit_events == [] + assert adapter._rate_limit_circuit_until == 0.0 + send_message_mock.assert_awaited_once() + + def test_concurrent_rate_limited_sends_are_serialized_by_gate(self): + adapter = self._connected_adapter() + adapter._send_chunk_retries = 3 + adapter._send_chunk_retry_delay_seconds = 0 + adapter._rate_limit_circuit_threshold = 1 + adapter._rate_limit_circuit_open_seconds = 60 + active = 0 + peak_active = 0 + + async def rate_limited_send(*args, **kwargs): + nonlocal active, peak_active + active += 1 + peak_active = max(peak_active, active) + await asyncio.sleep(0) + active -= 1 + return { + "ret": weixin.RATE_LIMIT_ERRCODE, + "errcode": weixin.RATE_LIMIT_ERRCODE, + "errmsg": "frequency limit", + } + + async def run_burst(): + with patch("gateway.platforms.weixin._send_message", side_effect=rate_limited_send) as send_message_mock: + results = await asyncio.gather( + *(adapter.send("wxid_test123", f"message {idx}") for idx in range(20)) + ) + return results, send_message_mock + + results, send_message_mock = asyncio.run(run_burst()) + + assert all(not result.success for result in results) + assert peak_active == 1 + # Once the first send observes iLink's rate limit, the breaker opens; + # queued concurrent sends acquire the gate later and fail before making + # their own iLink calls. + assert send_message_mock.await_count == 1 + class TestWeixinOutboundMedia: def test_send_image_file_accepts_keyword_image_path(self): diff --git a/tools/send_message_tool.py b/tools/send_message_tool.py index 53a9fc60037..83608044330 100644 --- a/tools/send_message_tool.py +++ b/tools/send_message_tool.py @@ -588,6 +588,16 @@ async def _send_to_platform(platform, pconfig, chat_id, message, thread_id=None, (preserves code-block boundaries, adds part indicators). """ from gateway.config import Platform + + media_files = media_files or [] + + # Weixin handles text/media delivery inside its native helper and does not + # need the optional platform adapter imports below. Keep this branch early + # so a Weixin send is not blocked by unrelated optional dependencies (for + # example lark-oapi's heavy Feishu import path). + if platform == Platform.WEIXIN: + return await _send_weixin(pconfig, chat_id, message, media_files=media_files) + from gateway.platforms.base import BasePlatformAdapter, utf16_len from gateway.platforms.slack import SlackAdapter @@ -605,8 +615,6 @@ async def _send_to_platform(platform, pconfig, chat_id, message, thread_id=None, except ImportError: _feishu_available = False - media_files = media_files or [] - if platform == Platform.SLACK and message: try: slack_adapter = SlackAdapter.__new__(SlackAdapter) @@ -663,10 +671,6 @@ async def _send_to_platform(platform, pconfig, chat_id, message, thread_id=None, last_result = result return last_result - # --- Weixin: use the native one-shot adapter helper for text + media --- - if platform == Platform.WEIXIN: - return await _send_weixin(pconfig, chat_id, message, media_files=media_files) - # --- Discord: chunked delivery via the registry's standalone_sender_fn. # The plugin's ``_standalone_send`` (registered in # plugins/platforms/discord/adapter.py) handles forum channels, threads,