fix(weixin): add rate-limit circuit breaker

This commit is contained in:
Hariharan Ayappane 2026-05-16 17:11:00 +05:30 committed by Teknium
parent 2e62862784
commit b8469a81e3
3 changed files with 174 additions and 6 deletions

View file

@ -1174,6 +1174,24 @@ class WeixinAdapter(BasePlatformAdapter):
extra.get("send_chunk_retry_delay_seconds")
or os.getenv("WEIXIN_SEND_CHUNK_RETRY_DELAY_SECONDS", "1.0")
)
self._send_text_gate = asyncio.Lock()
self._rate_limit_circuit_threshold = max(
1,
int(
extra.get("rate_limit_circuit_threshold")
or os.getenv("WEIXIN_RATE_LIMIT_CIRCUIT_THRESHOLD", "1")
),
)
self._rate_limit_circuit_window_seconds = float(
extra.get("rate_limit_circuit_window_seconds")
or os.getenv("WEIXIN_RATE_LIMIT_CIRCUIT_WINDOW_SECONDS", "30.0")
)
self._rate_limit_circuit_open_seconds = float(
extra.get("rate_limit_circuit_open_seconds")
or os.getenv("WEIXIN_RATE_LIMIT_CIRCUIT_OPEN_SECONDS", "30.0")
)
self._rate_limit_circuit_until = 0.0
self._rate_limit_events: List[float] = []
self._dm_policy = str(extra.get("dm_policy") or os.getenv("WEIXIN_DM_POLICY", "open")).strip().lower()
self._group_policy = str(extra.get("group_policy") or os.getenv("WEIXIN_GROUP_POLICY", "disabled")).strip().lower()
allow_from = extra.get("allow_from")
@ -1647,6 +1665,37 @@ class WeixinAdapter(BasePlatformAdapter):
content, self.MAX_MESSAGE_LENGTH, self._split_multiline_messages,
)
def _rate_limit_cooldown_remaining(self) -> float:
return max(0.0, self._rate_limit_circuit_until - time.monotonic())
def _rate_limit_error(self) -> RuntimeError:
return RuntimeError(
f"iLink sendmessage rate limited; cooldown active for {self._rate_limit_cooldown_remaining():.1f}s"
)
def _open_rate_limit_circuit(self) -> None:
if self._rate_limit_circuit_open_seconds <= 0:
return
self._rate_limit_circuit_until = max(
self._rate_limit_circuit_until,
time.monotonic() + self._rate_limit_circuit_open_seconds,
)
def _record_rate_limit_event(self) -> bool:
"""Record a genuine iLink rate limit and return True if breaker opened."""
now = time.monotonic()
window_start = now - self._rate_limit_circuit_window_seconds
self._rate_limit_events = [ts for ts in self._rate_limit_events if ts >= window_start]
self._rate_limit_events.append(now)
if len(self._rate_limit_events) >= self._rate_limit_circuit_threshold:
self._open_rate_limit_circuit()
return self._rate_limit_cooldown_remaining() > 0
return False
def _reset_rate_limit_circuit(self) -> None:
self._rate_limit_events.clear()
self._rate_limit_circuit_until = 0.0
async def _send_text_chunk(
self,
*,
@ -1662,9 +1711,28 @@ class WeixinAdapter(BasePlatformAdapter):
degraded fallback, which keeps cron-initiated push messages working
even when no user message has refreshed the session recently.
"""
async with self._send_text_gate:
await self._send_text_chunk_locked(
chat_id=chat_id,
chunk=chunk,
context_token=context_token,
client_id=client_id,
)
async def _send_text_chunk_locked(
self,
*,
chat_id: str,
chunk: str,
context_token: Optional[str],
client_id: str,
) -> None:
"""Send a text chunk while holding the adapter-wide outbound text gate."""
last_error: Optional[Exception] = None
retried_without_token = False
for attempt in range(self._send_chunk_retries + 1):
if self._rate_limit_cooldown_remaining() > 0:
raise self._rate_limit_error()
try:
resp = await _send_message(
self._send_session,
@ -1710,6 +1778,9 @@ class WeixinAdapter(BasePlatformAdapter):
last_error = RuntimeError(
f"iLink sendmessage rate limited: ret={ret} errcode={errcode} errmsg={errmsg}"
)
if self._record_rate_limit_event():
last_error = self._rate_limit_error()
break
if attempt >= self._send_chunk_retries:
break
wait = self._send_chunk_retry_delay_seconds * 3 # 3x backoff for rate limit
@ -1723,6 +1794,7 @@ class WeixinAdapter(BasePlatformAdapter):
raise RuntimeError(
f"iLink sendmessage error: ret={ret} errcode={errcode} errmsg={errmsg}"
)
self._reset_rate_limit_circuit()
return
except Exception as exc:
last_error = exc

View file

@ -411,6 +411,98 @@ class TestWeixinChunkDelivery:
assert first_try["text"] == retry["text"]
assert first_try["client_id"] == retry["client_id"]
@patch("gateway.platforms.weixin.asyncio.sleep", new_callable=AsyncMock)
@patch("gateway.platforms.weixin._send_message", new_callable=AsyncMock)
def test_repeated_rate_limits_open_circuit_for_followup_sends(self, send_message_mock, sleep_mock):
adapter = self._connected_adapter()
adapter._send_chunk_retries = 3
adapter._send_chunk_retry_delay_seconds = 0
adapter._rate_limit_circuit_threshold = 2
adapter._rate_limit_circuit_window_seconds = 60
adapter._rate_limit_circuit_open_seconds = 60
send_message_mock.return_value = {
"ret": weixin.RATE_LIMIT_ERRCODE,
"errcode": weixin.RATE_LIMIT_ERRCODE,
"errmsg": "frequency limit",
}
first = asyncio.run(adapter.send("wxid_test123", "first"))
second = asyncio.run(adapter.send("wxid_test123", "second"))
assert first.success is False
assert "cooldown" in (first.error or "")
assert second.success is False
assert "cooldown" in (second.error or "")
# The first rate-limit response is retried once. The second response
# crosses the sliding-window threshold, opens the breaker, and both the
# rest of the current chunk and follow-up sends fail fast.
assert send_message_mock.await_count == 2
assert sleep_mock.await_count == 1
@patch("gateway.platforms.weixin._send_message", new_callable=AsyncMock)
def test_open_rate_limit_circuit_fails_fast_without_sendmessage(self, send_message_mock):
adapter = self._connected_adapter()
adapter._rate_limit_circuit_open_seconds = 60
adapter._open_rate_limit_circuit()
result = asyncio.run(adapter.send("wxid_test123", "blocked"))
assert result.success is False
assert "cooldown" in (result.error or "")
send_message_mock.assert_not_awaited()
@patch("gateway.platforms.weixin._send_message", new_callable=AsyncMock)
def test_successful_send_after_cooldown_resets_rate_limit_state(self, send_message_mock):
adapter = self._connected_adapter()
adapter._rate_limit_circuit_until = weixin.time.monotonic() - 1
adapter._rate_limit_events = [weixin.time.monotonic()]
send_message_mock.return_value = {"errcode": 0}
result = asyncio.run(adapter.send("wxid_test123", "after cooldown"))
assert result.success is True
assert adapter._rate_limit_events == []
assert adapter._rate_limit_circuit_until == 0.0
send_message_mock.assert_awaited_once()
def test_concurrent_rate_limited_sends_are_serialized_by_gate(self):
adapter = self._connected_adapter()
adapter._send_chunk_retries = 3
adapter._send_chunk_retry_delay_seconds = 0
adapter._rate_limit_circuit_threshold = 1
adapter._rate_limit_circuit_open_seconds = 60
active = 0
peak_active = 0
async def rate_limited_send(*args, **kwargs):
nonlocal active, peak_active
active += 1
peak_active = max(peak_active, active)
await asyncio.sleep(0)
active -= 1
return {
"ret": weixin.RATE_LIMIT_ERRCODE,
"errcode": weixin.RATE_LIMIT_ERRCODE,
"errmsg": "frequency limit",
}
async def run_burst():
with patch("gateway.platforms.weixin._send_message", side_effect=rate_limited_send) as send_message_mock:
results = await asyncio.gather(
*(adapter.send("wxid_test123", f"message {idx}") for idx in range(20))
)
return results, send_message_mock
results, send_message_mock = asyncio.run(run_burst())
assert all(not result.success for result in results)
assert peak_active == 1
# Once the first send observes iLink's rate limit, the breaker opens;
# queued concurrent sends acquire the gate later and fail before making
# their own iLink calls.
assert send_message_mock.await_count == 1
class TestWeixinOutboundMedia:
def test_send_image_file_accepts_keyword_image_path(self):

View file

@ -588,6 +588,16 @@ async def _send_to_platform(platform, pconfig, chat_id, message, thread_id=None,
(preserves code-block boundaries, adds part indicators).
"""
from gateway.config import Platform
media_files = media_files or []
# Weixin handles text/media delivery inside its native helper and does not
# need the optional platform adapter imports below. Keep this branch early
# so a Weixin send is not blocked by unrelated optional dependencies (for
# example lark-oapi's heavy Feishu import path).
if platform == Platform.WEIXIN:
return await _send_weixin(pconfig, chat_id, message, media_files=media_files)
from gateway.platforms.base import BasePlatformAdapter, utf16_len
from gateway.platforms.slack import SlackAdapter
@ -605,8 +615,6 @@ async def _send_to_platform(platform, pconfig, chat_id, message, thread_id=None,
except ImportError:
_feishu_available = False
media_files = media_files or []
if platform == Platform.SLACK and message:
try:
slack_adapter = SlackAdapter.__new__(SlackAdapter)
@ -663,10 +671,6 @@ async def _send_to_platform(platform, pconfig, chat_id, message, thread_id=None,
last_result = result
return last_result
# --- Weixin: use the native one-shot adapter helper for text + media ---
if platform == Platform.WEIXIN:
return await _send_weixin(pconfig, chat_id, message, media_files=media_files)
# --- Discord: chunked delivery via the registry's standalone_sender_fn.
# The plugin's ``_standalone_send`` (registered in
# plugins/platforms/discord/adapter.py) handles forum channels, threads,