diff --git a/gateway/platforms/weixin.py b/gateway/platforms/weixin.py index 958e71da17..c33e5c1fed 100644 --- a/gateway/platforms/weixin.py +++ b/gateway/platforms/weixin.py @@ -21,7 +21,9 @@ import mimetypes import os import re import secrets +import shutil import struct +import subprocess import tempfile import time import uuid @@ -90,6 +92,10 @@ RETRY_DELAY_SECONDS = 2 BACKOFF_DELAY_SECONDS = 30 SESSION_EXPIRED_ERRCODE = -14 MESSAGE_DEDUP_TTL_SECONDS = 300 +WEIXIN_IMAGE_UPLOAD_TARGET_BYTES = 128_000 +WEIXIN_IMAGE_UPLOAD_MIN_QUALITY = 45 +WEIXIN_IMAGE_UPLOAD_QUALITY_STEPS = (85, 75, 65, 55, 45) +WEIXIN_IMAGE_MAX_DIMENSION = 1280 MEDIA_IMAGE = 1 MEDIA_VIDEO = 2 @@ -135,6 +141,7 @@ _HEADER_RE = re.compile(r"^(#{1,6})\s+(.+?)\s*$") _TABLE_RULE_RE = re.compile(r"^\s*\|?(?:\s*:?-{3,}:?\s*\|)+\s*:?-{3,}:?\s*\|?\s*$") _FENCE_RE = re.compile(r"^```([^\n`]*)\s*$") _MARKDOWN_LINK_RE = re.compile(r"\[([^\]]+)\]\(([^)]+)\)") +_WEIXIN_IMAGE_EXTS = {".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp"} def check_weixin_requirements() -> bool: @@ -182,6 +189,74 @@ def _aes_padded_size(size: int) -> int: return ((size + 1 + 15) // 16) * 16 +def _has_ffmpeg() -> bool: + return shutil.which("ffmpeg") is not None + + +def _looks_like_large_image_upload_failure(exc: Exception) -> bool: + message = str(exc).lower() + return "cdn upload http 500" in message or "broken pipe" in message + + +def _ffmpeg_scale_filter(max_dimension: int) -> str: + return ( + f"scale='if(gte(iw,ih),min(iw,{max_dimension}),-2)':" + f"'if(gte(ih,iw),min(ih,{max_dimension}),-2)'" + ) + + +def _compress_image_with_ffmpeg( + src_path: str, + *, + max_dimension: int = WEIXIN_IMAGE_MAX_DIMENSION, + target_bytes: int = WEIXIN_IMAGE_UPLOAD_TARGET_BYTES, +) -> Optional[str]: + ffmpeg = shutil.which("ffmpeg") + if not ffmpeg: + return None + + source = Path(src_path) + if not source.exists(): + return None + + temp_dir = tempfile.mkdtemp(prefix="weixin-img-") + output_path = Path(temp_dir) / f"{source.stem}.jpg" + scale_filter = _ffmpeg_scale_filter(max_dimension) + + for quality in WEIXIN_IMAGE_UPLOAD_QUALITY_STEPS: + cmd = [ + ffmpeg, + "-y", + "-i", + str(source), + "-vf", + scale_filter, + "-frames:v", + "1", + "-q:v", + str(quality), + str(output_path), + ] + try: + result = subprocess.run(cmd, capture_output=True, text=True, timeout=120) + except (OSError, subprocess.TimeoutExpired) as exc: + logger.warning("weixin: ffmpeg image compression failed for %s at q=%s: %s", source, quality, str(exc)[:200]) + continue + if result.returncode != 0: + logger.warning("weixin: ffmpeg image compression failed for %s at q=%s: %s", source, quality, (result.stderr or result.stdout).strip()[:200]) + continue + try: + if output_path.stat().st_size <= target_bytes or quality <= WEIXIN_IMAGE_UPLOAD_MIN_QUALITY: + return str(output_path) + except OSError: + continue + + if output_path.exists(): + return str(output_path) + shutil.rmtree(temp_dir, ignore_errors=True) + return None + + def _random_wechat_uin() -> str: value = struct.unpack(">I", secrets.token_bytes(4))[0] return base64.b64encode(str(value).encode("utf-8")).decode("ascii") @@ -1673,7 +1748,7 @@ class WeixinAdapter(BasePlatformAdapter): file_path = os.path.abspath(file_path) cleanup = False try: - return await self.send_document(chat_id, file_path, caption=caption, metadata=metadata) + return await self.send_image_file(chat_id, file_path, caption=caption, metadata=metadata) finally: if cleanup and file_path and os.path.exists(file_path): try: @@ -1691,12 +1766,54 @@ class WeixinAdapter(BasePlatformAdapter): **kwargs, ) -> SendResult: del reply_to, kwargs - return await self.send_document( - chat_id=chat_id, - file_path=image_path, - caption=caption, - metadata=metadata, - ) + cleanup_paths: List[str] = [] + try: + result = await self.send_document( + chat_id=chat_id, + file_path=image_path, + caption=caption, + metadata=metadata, + ) + if result.success or not _has_ffmpeg() or not _looks_like_large_image_upload_failure(Exception(result.error or "")): + return result + + compressed_path = _compress_image_with_ffmpeg(image_path) + if not compressed_path or compressed_path == image_path: + return result + cleanup_paths.append(compressed_path) + logger.info( + "[%s] Retrying image upload with compressed fallback for %s (%s -> %s bytes)", + self.name, + _safe_id(chat_id), + Path(image_path).stat().st_size if Path(image_path).exists() else "?", + Path(compressed_path).stat().st_size if Path(compressed_path).exists() else "?", + ) + if self._send_session is None: + return await self.send_document( + chat_id=chat_id, + file_path=compressed_path, + caption=caption, + metadata=metadata, + ) + original_session = self._send_session + async with aiohttp.ClientSession(trust_env=True, connector=_make_ssl_connector()) as retry_session: + self._send_session = retry_session + try: + return await self.send_document( + chat_id=chat_id, + file_path=compressed_path, + caption=caption, + metadata=metadata, + ) + finally: + self._send_session = original_session + finally: + for cleanup_path in cleanup_paths: + try: + Path(cleanup_path).unlink(missing_ok=True) + Path(cleanup_path).parent.rmdir() + except OSError: + pass async def send_document( self, @@ -1992,7 +2109,7 @@ async def send_weixin_direct( for media_path, _is_voice in media_files or []: ext = Path(media_path).suffix.lower() - if ext in {".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp"}: + if ext in _WEIXIN_IMAGE_EXTS: last_result = await live_adapter.send_image_file(chat_id, media_path) else: last_result = await live_adapter.send_document(chat_id, media_path) @@ -2037,7 +2154,7 @@ async def send_weixin_direct( for media_path, _is_voice in media_files or []: ext = Path(media_path).suffix.lower() - if ext in {".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp"}: + if ext in _WEIXIN_IMAGE_EXTS: last_result = await adapter.send_image_file(chat_id, media_path) else: last_result = await adapter.send_document(chat_id, media_path) diff --git a/tests/gateway/test_weixin.py b/tests/gateway/test_weixin.py index 3a377effbd..dd8e23f674 100644 --- a/tests/gateway/test_weixin.py +++ b/tests/gateway/test_weixin.py @@ -4,6 +4,7 @@ import asyncio import base64 import json import os +import subprocess from pathlib import Path from unittest.mock import AsyncMock, patch @@ -468,6 +469,151 @@ class TestWeixinOutboundMedia: assert media["aes_key"] == expected_aes_key +class TestWeixinImageCompressionFallback: + def test_send_image_file_retries_with_compressed_fallback_on_large_upload_failure(self): + adapter = _make_adapter() + compressed_path = "/tmp/compressed.jpg" + adapter.send_document = AsyncMock( + side_effect=[ + SendResult(success=False, error="CDN upload HTTP 500"), + SendResult(success=True, message_id="msg-2"), + ] + ) + + with patch("gateway.platforms.weixin._has_ffmpeg", return_value=True), \ + patch("gateway.platforms.weixin._compress_image_with_ffmpeg", return_value=compressed_path), \ + patch("pathlib.Path.exists", return_value=True), \ + patch("pathlib.Path.stat") as stat_mock, \ + patch("pathlib.Path.unlink"), \ + patch("pathlib.Path.rmdir"): + stat_mock.return_value.st_size = 12345 + result = asyncio.run(adapter.send_image_file("wxid_test123", "/tmp/original.png")) + + assert result.success is True + assert adapter.send_document.await_args_list[0].kwargs["file_path"] == "/tmp/original.png" + assert adapter.send_document.await_args_list[1].kwargs["file_path"] == compressed_path + + def test_send_image_file_skips_retry_without_compressed_fallback(self): + adapter = _make_adapter() + failure = SendResult(success=False, error="CDN upload HTTP 500") + adapter.send_document = AsyncMock(return_value=failure) + + with patch("gateway.platforms.weixin._has_ffmpeg", return_value=True), \ + patch("gateway.platforms.weixin._compress_image_with_ffmpeg", return_value=None): + result = asyncio.run(adapter.send_image_file("wxid_test123", "/tmp/original.png")) + + assert result == failure + adapter.send_document.assert_awaited_once() + + def test_send_image_routes_local_paths_through_send_image_file(self): + adapter = _make_adapter() + adapter.send_image_file = AsyncMock(return_value=SendResult(success=True, message_id="msg-3")) + + result = asyncio.run(adapter.send_image("wxid_test123", "/tmp/original.png", caption="caption")) + + assert result.success is True + adapter.send_image_file.assert_awaited_once_with("wxid_test123", "/tmp/original.png", caption="caption", metadata=None) + + def test_send_weixin_direct_routes_images_via_send_image_file(self): + adapter = _make_adapter() + adapter._send_session = type("S", (), {"closed": False})() + adapter.format_message = lambda message: "" + adapter.send = AsyncMock(return_value=SendResult(success=True, message_id="text-msg")) + adapter.send_image_file = AsyncMock(return_value=SendResult(success=True, message_id="img-msg")) + adapter.send_document = AsyncMock(return_value=SendResult(success=True, message_id="doc-msg")) + + with patch.dict("gateway.platforms.weixin._LIVE_ADAPTERS", {"tok": adapter}, clear=True): + result = asyncio.run( + weixin.send_weixin_direct( + extra={"account_id": "acct"}, + token="tok", + chat_id="wxid_test123", + message="", + media_files=[("/tmp/picture.png", False), ("/tmp/file.pdf", False)], + ) + ) + + assert result["success"] is True + adapter.send_image_file.assert_awaited_once_with("wxid_test123", "/tmp/picture.png") + adapter.send_document.assert_awaited_once_with("wxid_test123", "/tmp/file.pdf") + + def test_send_weixin_direct_treats_bmp_as_image(self): + adapter = _make_adapter() + adapter._send_session = type("S", (), {"closed": False})() + adapter.format_message = lambda message: "" + adapter.send = AsyncMock(return_value=SendResult(success=True, message_id="text-msg")) + adapter.send_image_file = AsyncMock(return_value=SendResult(success=True, message_id="img-msg")) + adapter.send_document = AsyncMock(return_value=SendResult(success=True, message_id="doc-msg")) + + with patch.dict("gateway.platforms.weixin._LIVE_ADAPTERS", {"tok": adapter}, clear=True): + result = asyncio.run( + weixin.send_weixin_direct( + extra={"account_id": "acct"}, + token="tok", + chat_id="wxid_test123", + message="", + media_files=[("/tmp/picture.bmp", False)], + ) + ) + + assert result["success"] is True + adapter.send_image_file.assert_awaited_once_with("wxid_test123", "/tmp/picture.bmp") + adapter.send_document.assert_not_awaited() + + def test_send_image_file_uses_fresh_session_for_retry(self): + adapter = _make_adapter() + original_session = object() + retry_session = object() + adapter._send_session = original_session + adapter.send_document = AsyncMock( + side_effect=[ + SendResult(success=False, error="Broken pipe"), + SendResult(success=True, message_id="msg-4"), + ] + ) + + class _RetryClientSession: + def __init__(self, *args, **kwargs): + self.session = retry_session + async def __aenter__(self): + return self.session + async def __aexit__(self, exc_type, exc, tb): + return False + + with patch("gateway.platforms.weixin._has_ffmpeg", return_value=True), \ + patch("gateway.platforms.weixin._compress_image_with_ffmpeg", return_value="/tmp/compressed.jpg"), \ + patch("gateway.platforms.weixin.aiohttp.ClientSession", _RetryClientSession), \ + patch("gateway.platforms.weixin._make_ssl_connector", return_value=None), \ + patch("pathlib.Path.exists", return_value=True), \ + patch("pathlib.Path.stat") as stat_mock, \ + patch("pathlib.Path.unlink"), \ + patch("pathlib.Path.rmdir"): + stat_mock.return_value.st_size = 12345 + result = asyncio.run(adapter.send_image_file("wxid_test123", "/tmp/original.png")) + + assert result.success is True + assert adapter.send_document.await_args_list[0].kwargs["file_path"] == "/tmp/original.png" + assert adapter.send_document.await_args_list[1].kwargs["file_path"] == "/tmp/compressed.jpg" + assert adapter._send_session is original_session + + +class TestWeixinImageCompressionHelpers: + def test_ffmpeg_scale_filter_handles_square_images(self): + scale = weixin._ffmpeg_scale_filter(1280) + assert "gte(iw,ih)" in scale + assert "gte(ih,iw)" in scale + + def test_compress_image_with_ffmpeg_returns_none_on_timeout(self, tmp_path): + image_path = tmp_path / "demo.png" + image_path.write_bytes(b"png") + + with patch("gateway.platforms.weixin.shutil.which", return_value="/usr/bin/ffmpeg"), \ + patch("gateway.platforms.weixin.subprocess.run", side_effect=subprocess.TimeoutExpired(cmd=["ffmpeg"], timeout=120)): + result = weixin._compress_image_with_ffmpeg(str(image_path)) + + assert result is None + + class TestWeixinRemoteMediaSafety: def test_download_remote_media_blocks_unsafe_urls(self): adapter = _make_adapter()