fix(weixin): auto-compress large image retries

This commit is contained in:
OpenClaw Assistant 2026-04-23 07:39:37 +00:00
parent 88b6eb9ad1
commit f10ad7fffc
2 changed files with 272 additions and 9 deletions

View file

@ -21,7 +21,9 @@ import mimetypes
import os
import re
import secrets
import shutil
import struct
import subprocess
import tempfile
import time
import uuid
@ -90,6 +92,10 @@ RETRY_DELAY_SECONDS = 2
BACKOFF_DELAY_SECONDS = 30
SESSION_EXPIRED_ERRCODE = -14
MESSAGE_DEDUP_TTL_SECONDS = 300
WEIXIN_IMAGE_UPLOAD_TARGET_BYTES = 128_000
WEIXIN_IMAGE_UPLOAD_MIN_QUALITY = 45
WEIXIN_IMAGE_UPLOAD_QUALITY_STEPS = (85, 75, 65, 55, 45)
WEIXIN_IMAGE_MAX_DIMENSION = 1280
MEDIA_IMAGE = 1
MEDIA_VIDEO = 2
@ -135,6 +141,7 @@ _HEADER_RE = re.compile(r"^(#{1,6})\s+(.+?)\s*$")
_TABLE_RULE_RE = re.compile(r"^\s*\|?(?:\s*:?-{3,}:?\s*\|)+\s*:?-{3,}:?\s*\|?\s*$")
_FENCE_RE = re.compile(r"^```([^\n`]*)\s*$")
_MARKDOWN_LINK_RE = re.compile(r"\[([^\]]+)\]\(([^)]+)\)")
_WEIXIN_IMAGE_EXTS = {".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp"}
def check_weixin_requirements() -> bool:
@ -182,6 +189,74 @@ def _aes_padded_size(size: int) -> int:
return ((size + 1 + 15) // 16) * 16
def _has_ffmpeg() -> bool:
return shutil.which("ffmpeg") is not None
def _looks_like_large_image_upload_failure(exc: Exception) -> bool:
message = str(exc).lower()
return "cdn upload http 500" in message or "broken pipe" in message
def _ffmpeg_scale_filter(max_dimension: int) -> str:
return (
f"scale='if(gte(iw,ih),min(iw,{max_dimension}),-2)':"
f"'if(gte(ih,iw),min(ih,{max_dimension}),-2)'"
)
def _compress_image_with_ffmpeg(
src_path: str,
*,
max_dimension: int = WEIXIN_IMAGE_MAX_DIMENSION,
target_bytes: int = WEIXIN_IMAGE_UPLOAD_TARGET_BYTES,
) -> Optional[str]:
ffmpeg = shutil.which("ffmpeg")
if not ffmpeg:
return None
source = Path(src_path)
if not source.exists():
return None
temp_dir = tempfile.mkdtemp(prefix="weixin-img-")
output_path = Path(temp_dir) / f"{source.stem}.jpg"
scale_filter = _ffmpeg_scale_filter(max_dimension)
for quality in WEIXIN_IMAGE_UPLOAD_QUALITY_STEPS:
cmd = [
ffmpeg,
"-y",
"-i",
str(source),
"-vf",
scale_filter,
"-frames:v",
"1",
"-q:v",
str(quality),
str(output_path),
]
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=120)
except (OSError, subprocess.TimeoutExpired) as exc:
logger.warning("weixin: ffmpeg image compression failed for %s at q=%s: %s", source, quality, str(exc)[:200])
continue
if result.returncode != 0:
logger.warning("weixin: ffmpeg image compression failed for %s at q=%s: %s", source, quality, (result.stderr or result.stdout).strip()[:200])
continue
try:
if output_path.stat().st_size <= target_bytes or quality <= WEIXIN_IMAGE_UPLOAD_MIN_QUALITY:
return str(output_path)
except OSError:
continue
if output_path.exists():
return str(output_path)
shutil.rmtree(temp_dir, ignore_errors=True)
return None
def _random_wechat_uin() -> str:
value = struct.unpack(">I", secrets.token_bytes(4))[0]
return base64.b64encode(str(value).encode("utf-8")).decode("ascii")
@ -1673,7 +1748,7 @@ class WeixinAdapter(BasePlatformAdapter):
file_path = os.path.abspath(file_path)
cleanup = False
try:
return await self.send_document(chat_id, file_path, caption=caption, metadata=metadata)
return await self.send_image_file(chat_id, file_path, caption=caption, metadata=metadata)
finally:
if cleanup and file_path and os.path.exists(file_path):
try:
@ -1691,12 +1766,54 @@ class WeixinAdapter(BasePlatformAdapter):
**kwargs,
) -> SendResult:
del reply_to, kwargs
return await self.send_document(
chat_id=chat_id,
file_path=image_path,
caption=caption,
metadata=metadata,
)
cleanup_paths: List[str] = []
try:
result = await self.send_document(
chat_id=chat_id,
file_path=image_path,
caption=caption,
metadata=metadata,
)
if result.success or not _has_ffmpeg() or not _looks_like_large_image_upload_failure(Exception(result.error or "")):
return result
compressed_path = _compress_image_with_ffmpeg(image_path)
if not compressed_path or compressed_path == image_path:
return result
cleanup_paths.append(compressed_path)
logger.info(
"[%s] Retrying image upload with compressed fallback for %s (%s -> %s bytes)",
self.name,
_safe_id(chat_id),
Path(image_path).stat().st_size if Path(image_path).exists() else "?",
Path(compressed_path).stat().st_size if Path(compressed_path).exists() else "?",
)
if self._send_session is None:
return await self.send_document(
chat_id=chat_id,
file_path=compressed_path,
caption=caption,
metadata=metadata,
)
original_session = self._send_session
async with aiohttp.ClientSession(trust_env=True, connector=_make_ssl_connector()) as retry_session:
self._send_session = retry_session
try:
return await self.send_document(
chat_id=chat_id,
file_path=compressed_path,
caption=caption,
metadata=metadata,
)
finally:
self._send_session = original_session
finally:
for cleanup_path in cleanup_paths:
try:
Path(cleanup_path).unlink(missing_ok=True)
Path(cleanup_path).parent.rmdir()
except OSError:
pass
async def send_document(
self,
@ -1992,7 +2109,7 @@ async def send_weixin_direct(
for media_path, _is_voice in media_files or []:
ext = Path(media_path).suffix.lower()
if ext in {".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp"}:
if ext in _WEIXIN_IMAGE_EXTS:
last_result = await live_adapter.send_image_file(chat_id, media_path)
else:
last_result = await live_adapter.send_document(chat_id, media_path)
@ -2037,7 +2154,7 @@ async def send_weixin_direct(
for media_path, _is_voice in media_files or []:
ext = Path(media_path).suffix.lower()
if ext in {".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp"}:
if ext in _WEIXIN_IMAGE_EXTS:
last_result = await adapter.send_image_file(chat_id, media_path)
else:
last_result = await adapter.send_document(chat_id, media_path)

View file

@ -4,6 +4,7 @@ import asyncio
import base64
import json
import os
import subprocess
from pathlib import Path
from unittest.mock import AsyncMock, patch
@ -468,6 +469,151 @@ class TestWeixinOutboundMedia:
assert media["aes_key"] == expected_aes_key
class TestWeixinImageCompressionFallback:
def test_send_image_file_retries_with_compressed_fallback_on_large_upload_failure(self):
adapter = _make_adapter()
compressed_path = "/tmp/compressed.jpg"
adapter.send_document = AsyncMock(
side_effect=[
SendResult(success=False, error="CDN upload HTTP 500"),
SendResult(success=True, message_id="msg-2"),
]
)
with patch("gateway.platforms.weixin._has_ffmpeg", return_value=True), \
patch("gateway.platforms.weixin._compress_image_with_ffmpeg", return_value=compressed_path), \
patch("pathlib.Path.exists", return_value=True), \
patch("pathlib.Path.stat") as stat_mock, \
patch("pathlib.Path.unlink"), \
patch("pathlib.Path.rmdir"):
stat_mock.return_value.st_size = 12345
result = asyncio.run(adapter.send_image_file("wxid_test123", "/tmp/original.png"))
assert result.success is True
assert adapter.send_document.await_args_list[0].kwargs["file_path"] == "/tmp/original.png"
assert adapter.send_document.await_args_list[1].kwargs["file_path"] == compressed_path
def test_send_image_file_skips_retry_without_compressed_fallback(self):
adapter = _make_adapter()
failure = SendResult(success=False, error="CDN upload HTTP 500")
adapter.send_document = AsyncMock(return_value=failure)
with patch("gateway.platforms.weixin._has_ffmpeg", return_value=True), \
patch("gateway.platforms.weixin._compress_image_with_ffmpeg", return_value=None):
result = asyncio.run(adapter.send_image_file("wxid_test123", "/tmp/original.png"))
assert result == failure
adapter.send_document.assert_awaited_once()
def test_send_image_routes_local_paths_through_send_image_file(self):
adapter = _make_adapter()
adapter.send_image_file = AsyncMock(return_value=SendResult(success=True, message_id="msg-3"))
result = asyncio.run(adapter.send_image("wxid_test123", "/tmp/original.png", caption="caption"))
assert result.success is True
adapter.send_image_file.assert_awaited_once_with("wxid_test123", "/tmp/original.png", caption="caption", metadata=None)
def test_send_weixin_direct_routes_images_via_send_image_file(self):
adapter = _make_adapter()
adapter._send_session = type("S", (), {"closed": False})()
adapter.format_message = lambda message: ""
adapter.send = AsyncMock(return_value=SendResult(success=True, message_id="text-msg"))
adapter.send_image_file = AsyncMock(return_value=SendResult(success=True, message_id="img-msg"))
adapter.send_document = AsyncMock(return_value=SendResult(success=True, message_id="doc-msg"))
with patch.dict("gateway.platforms.weixin._LIVE_ADAPTERS", {"tok": adapter}, clear=True):
result = asyncio.run(
weixin.send_weixin_direct(
extra={"account_id": "acct"},
token="tok",
chat_id="wxid_test123",
message="",
media_files=[("/tmp/picture.png", False), ("/tmp/file.pdf", False)],
)
)
assert result["success"] is True
adapter.send_image_file.assert_awaited_once_with("wxid_test123", "/tmp/picture.png")
adapter.send_document.assert_awaited_once_with("wxid_test123", "/tmp/file.pdf")
def test_send_weixin_direct_treats_bmp_as_image(self):
adapter = _make_adapter()
adapter._send_session = type("S", (), {"closed": False})()
adapter.format_message = lambda message: ""
adapter.send = AsyncMock(return_value=SendResult(success=True, message_id="text-msg"))
adapter.send_image_file = AsyncMock(return_value=SendResult(success=True, message_id="img-msg"))
adapter.send_document = AsyncMock(return_value=SendResult(success=True, message_id="doc-msg"))
with patch.dict("gateway.platforms.weixin._LIVE_ADAPTERS", {"tok": adapter}, clear=True):
result = asyncio.run(
weixin.send_weixin_direct(
extra={"account_id": "acct"},
token="tok",
chat_id="wxid_test123",
message="",
media_files=[("/tmp/picture.bmp", False)],
)
)
assert result["success"] is True
adapter.send_image_file.assert_awaited_once_with("wxid_test123", "/tmp/picture.bmp")
adapter.send_document.assert_not_awaited()
def test_send_image_file_uses_fresh_session_for_retry(self):
adapter = _make_adapter()
original_session = object()
retry_session = object()
adapter._send_session = original_session
adapter.send_document = AsyncMock(
side_effect=[
SendResult(success=False, error="Broken pipe"),
SendResult(success=True, message_id="msg-4"),
]
)
class _RetryClientSession:
def __init__(self, *args, **kwargs):
self.session = retry_session
async def __aenter__(self):
return self.session
async def __aexit__(self, exc_type, exc, tb):
return False
with patch("gateway.platforms.weixin._has_ffmpeg", return_value=True), \
patch("gateway.platforms.weixin._compress_image_with_ffmpeg", return_value="/tmp/compressed.jpg"), \
patch("gateway.platforms.weixin.aiohttp.ClientSession", _RetryClientSession), \
patch("gateway.platforms.weixin._make_ssl_connector", return_value=None), \
patch("pathlib.Path.exists", return_value=True), \
patch("pathlib.Path.stat") as stat_mock, \
patch("pathlib.Path.unlink"), \
patch("pathlib.Path.rmdir"):
stat_mock.return_value.st_size = 12345
result = asyncio.run(adapter.send_image_file("wxid_test123", "/tmp/original.png"))
assert result.success is True
assert adapter.send_document.await_args_list[0].kwargs["file_path"] == "/tmp/original.png"
assert adapter.send_document.await_args_list[1].kwargs["file_path"] == "/tmp/compressed.jpg"
assert adapter._send_session is original_session
class TestWeixinImageCompressionHelpers:
def test_ffmpeg_scale_filter_handles_square_images(self):
scale = weixin._ffmpeg_scale_filter(1280)
assert "gte(iw,ih)" in scale
assert "gte(ih,iw)" in scale
def test_compress_image_with_ffmpeg_returns_none_on_timeout(self, tmp_path):
image_path = tmp_path / "demo.png"
image_path.write_bytes(b"png")
with patch("gateway.platforms.weixin.shutil.which", return_value="/usr/bin/ffmpeg"), \
patch("gateway.platforms.weixin.subprocess.run", side_effect=subprocess.TimeoutExpired(cmd=["ffmpeg"], timeout=120)):
result = weixin._compress_image_with_ffmpeg(str(image_path))
assert result is None
class TestWeixinRemoteMediaSafety:
def test_download_remote_media_blocks_unsafe_urls(self):
adapter = _make_adapter()