fix(weixin): add per-chunk retry with backoff for text delivery

When sending multi-chunk responses, individual chunks can fail due to
transient iLink API errors. Previously a single failure would abort the
entire message. Now each chunk is retried with linear backoff before
giving up, and the same client_id is reused across retries for
server-side deduplication.

Configurable via config.yaml (platforms.weixin.extra) or env vars:
- send_chunk_delay_seconds (default 0.35s) — pacing between chunks
- send_chunk_retries (default 2) — max retry attempts per chunk
- send_chunk_retry_delay_seconds (default 1.0s) — base retry delay

Replaces the hardcoded 0.3s inter-chunk delay from #7903.

Salvaged from PR #7899 by @corazzione. Fixes #7836.
This commit is contained in:
Markus Corazzione 2026-04-11 14:02:12 -07:00 committed by Teknium
parent 04c1c5d53f
commit 885123d44b
2 changed files with 105 additions and 8 deletions

View file

@ -1017,6 +1017,16 @@ class WeixinAdapter(BasePlatformAdapter):
self._cdn_base_url = str(
extra.get("cdn_base_url") or os.getenv("WEIXIN_CDN_BASE_URL", WEIXIN_CDN_BASE_URL)
).strip().rstrip("/")
self._send_chunk_delay_seconds = float(
extra.get("send_chunk_delay_seconds") or os.getenv("WEIXIN_SEND_CHUNK_DELAY_SECONDS", "0.35")
)
self._send_chunk_retries = int(
extra.get("send_chunk_retries") or os.getenv("WEIXIN_SEND_CHUNK_RETRIES", "2")
)
self._send_chunk_retry_delay_seconds = float(
extra.get("send_chunk_retry_delay_seconds")
or os.getenv("WEIXIN_SEND_CHUNK_RETRY_DELAY_SECONDS", "1.0")
)
self._dm_policy = str(extra.get("dm_policy") or os.getenv("WEIXIN_DM_POLICY", "open")).strip().lower()
self._group_policy = str(extra.get("group_policy") or os.getenv("WEIXIN_GROUP_POLICY", "disabled")).strip().lower()
allow_from = extra.get("allow_from")
@ -1346,6 +1356,47 @@ class WeixinAdapter(BasePlatformAdapter):
content, self.MAX_MESSAGE_LENGTH, self._split_multiline_messages,
)
async def _send_text_chunk(
self,
*,
chat_id: str,
chunk: str,
context_token: Optional[str],
client_id: str,
) -> None:
"""Send a single text chunk with per-chunk retry and backoff."""
last_error: Optional[Exception] = None
for attempt in range(self._send_chunk_retries + 1):
try:
await _send_message(
self._session,
base_url=self._base_url,
token=self._token,
to=chat_id,
text=chunk,
context_token=context_token,
client_id=client_id,
)
return
except Exception as exc:
last_error = exc
if attempt >= self._send_chunk_retries:
break
wait = self._send_chunk_retry_delay_seconds * (attempt + 1)
logger.warning(
"[%s] send chunk failed to=%s attempt=%d/%d, retrying in %.2fs: %s",
self.name,
_safe_id(chat_id),
attempt + 1,
self._send_chunk_retries + 1,
wait,
exc,
)
if wait > 0:
await asyncio.sleep(wait)
assert last_error is not None
raise last_error
async def send(
self,
chat_id: str,
@ -1360,19 +1411,16 @@ class WeixinAdapter(BasePlatformAdapter):
try:
chunks = self._split_text(self.format_message(content))
for idx, chunk in enumerate(chunks):
if idx > 0:
await asyncio.sleep(0.3)
client_id = f"hermes-weixin-{uuid.uuid4().hex}"
await _send_message(
self._session,
base_url=self._base_url,
token=self._token,
to=chat_id,
text=chunk,
await self._send_text_chunk(
chat_id=chat_id,
chunk=chunk,
context_token=context_token,
client_id=client_id,
)
last_message_id = client_id
if idx < len(chunks) - 1 and self._send_chunk_delay_seconds > 0:
await asyncio.sleep(self._send_chunk_delay_seconds)
return SendResult(success=True, message_id=last_message_id)
except Exception as exc:
logger.error("[%s] send failed to=%s: %s", self.name, _safe_id(chat_id), exc)

View file

@ -283,6 +283,55 @@ class TestWeixinSendMessageIntegration:
)
class TestWeixinChunkDelivery:
def _connected_adapter(self) -> WeixinAdapter:
adapter = _make_adapter()
adapter._session = object()
adapter._token = "test-token"
adapter._base_url = "https://weixin.example.com"
adapter._token_store.get = lambda account_id, chat_id: "ctx-token"
return adapter
@patch("gateway.platforms.weixin.asyncio.sleep", new_callable=AsyncMock)
@patch("gateway.platforms.weixin._send_message", new_callable=AsyncMock)
def test_send_waits_between_multiple_chunks(self, send_message_mock, sleep_mock):
adapter = self._connected_adapter()
adapter.MAX_MESSAGE_LENGTH = 12
# Use double newlines so _pack_markdown_blocks splits into 3 blocks
result = asyncio.run(adapter.send("wxid_test123", "first\n\nsecond\n\nthird"))
assert result.success is True
assert send_message_mock.await_count == 3
assert sleep_mock.await_count == 2
@patch("gateway.platforms.weixin.asyncio.sleep", new_callable=AsyncMock)
@patch("gateway.platforms.weixin._send_message", new_callable=AsyncMock)
def test_send_retries_failed_chunk_before_continuing(self, send_message_mock, sleep_mock):
adapter = self._connected_adapter()
adapter.MAX_MESSAGE_LENGTH = 12
calls = {"count": 0}
async def flaky_send(*args, **kwargs):
calls["count"] += 1
if calls["count"] == 2:
raise RuntimeError("temporary iLink failure")
send_message_mock.side_effect = flaky_send
# Use double newlines so _pack_markdown_blocks splits into 3 blocks
result = asyncio.run(adapter.send("wxid_test123", "first\n\nsecond\n\nthird"))
assert result.success is True
# 3 chunks, but chunk 2 fails once and retries → 4 _send_message calls total
assert send_message_mock.await_count == 4
# The retried chunk should reuse the same client_id for deduplication
first_try = send_message_mock.await_args_list[1].kwargs
retry = send_message_mock.await_args_list[2].kwargs
assert first_try["text"] == retry["text"]
assert first_try["client_id"] == retry["client_id"]
class TestWeixinRemoteMediaSafety:
def test_download_remote_media_blocks_unsafe_urls(self):
adapter = _make_adapter()