From 649f38390c2fa5f1f690f4ed2676fb754ddc65a8 Mon Sep 17 00:00:00 2001 From: Patrick Wang Date: Tue, 14 Apr 2026 05:10:04 +0000 Subject: [PATCH] fix: force Weixin voice fallback to file attachments --- gateway/platforms/weixin.py | 127 +++++++++++++++-------------------- tests/gateway/test_weixin.py | 80 ++++++++++++++-------- 2 files changed, 106 insertions(+), 101 deletions(-) diff --git a/gateway/platforms/weixin.py b/gateway/platforms/weixin.py index 2af64850594..3eda50d30c2 100644 --- a/gateway/platforms/weixin.py +++ b/gateway/platforms/weixin.py @@ -25,7 +25,6 @@ import struct import tempfile import time import uuid -import subprocess from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional, Tuple @@ -1602,71 +1601,21 @@ class WeixinAdapter(BasePlatformAdapter): if not self._session or not self._token: return SendResult(success=False, error="Not connected") - temp_paths: List[str] = [] + # Native outbound Weixin voice bubbles are not proven-working in the + # upstream reference implementation. Prefer a reliable file attachment + # fallback so users at least receive playable audio, even for .silk. + fallback_caption = caption or "[voice message as attachment]" try: - voice_path = self._prepare_voice_payload(audio_path) - if voice_path != audio_path: - temp_paths.append(voice_path) - message_id = await self._send_file(chat_id, voice_path, caption or "") + message_id = await self._send_file( + chat_id, + audio_path, + fallback_caption, + force_file_attachment=True, + ) return SendResult(success=True, message_id=message_id) except Exception as exc: logger.error("[%s] send_voice failed to=%s: %s", self.name, _safe_id(chat_id), exc) return SendResult(success=False, error=str(exc)) - finally: - for path in temp_paths: - try: - os.unlink(path) - except OSError: - pass - - def _prepare_voice_payload(self, audio_path: str) -> str: - path = str(audio_path) - if path.endswith(".silk"): - return path - if not PILK_AVAILABLE: - raise RuntimeError( - "Weixin native voice requires SILK encoding, but pilk is not installed" - ) - - wav_path = self._transcode_audio_to_wav(path) - try: - fd, silk_path = tempfile.mkstemp(suffix='.silk') - os.close(fd) - pilk.encode(wav_path, silk_path, tencent=True) - if not os.path.exists(silk_path) or os.path.getsize(silk_path) <= 0: - raise RuntimeError("Generated SILK voice file is empty") - return silk_path - finally: - try: - os.unlink(wav_path) - except OSError: - pass - - def _transcode_audio_to_wav(self, input_path: str) -> str: - fd, wav_path = tempfile.mkstemp(suffix='.wav') - os.close(fd) - try: - result = subprocess.run( - [ - 'ffmpeg', '-y', '-i', input_path, - '-ar', '24000', '-ac', '1', '-f', 'wav', wav_path, - ], - capture_output=True, - timeout=60, - check=False, - ) - if result.returncode != 0: - stderr = result.stderr.decode('utf-8', errors='ignore')[:400] - raise RuntimeError(f"ffmpeg voice conversion failed: {stderr}") - if not os.path.exists(wav_path) or os.path.getsize(wav_path) <= 0: - raise RuntimeError("ffmpeg produced empty wav for Weixin voice") - return wav_path - except Exception: - try: - os.unlink(wav_path) - except OSError: - pass - raise async def _download_remote_media(self, url: str) -> str: from tools.url_safety import is_safe_url @@ -1683,10 +1632,16 @@ class WeixinAdapter(BasePlatformAdapter): handle.write(data) return handle.name - async def _send_file(self, chat_id: str, path: str, caption: str) -> str: + async def _send_file( + self, + chat_id: str, + path: str, + caption: str, + force_file_attachment: bool = False, + ) -> str: assert self._session is not None and self._token is not None plaintext = Path(path).read_bytes() - media_type, item_builder = self._outbound_media_builder(path) + media_type, item_builder = self._outbound_media_builder(path, force_file_attachment=force_file_attachment) filekey = secrets.token_hex(16) aes_key = secrets.token_bytes(16) rawsize = len(plaintext) @@ -1728,14 +1683,24 @@ class WeixinAdapter(BasePlatformAdapter): # Sending base64(raw_bytes) causes images to show as grey boxes on the # receiver side because the decryption key doesn't match. aes_key_for_api = base64.b64encode(aes_key.hex().encode("ascii")).decode("ascii") - media_item = item_builder( - encrypt_query_param=encrypted_query_param, - aes_key_for_api=aes_key_for_api, - ciphertext_size=len(ciphertext), - plaintext_size=rawsize, - filename=Path(path).name, - rawfilemd5=rawfilemd5, - ) + item_kwargs = { + "encrypt_query_param": encrypted_query_param, + "aes_key_for_api": aes_key_for_api, + "ciphertext_size": len(ciphertext), + "plaintext_size": rawsize, + "filename": Path(path).name, + "rawfilemd5": rawfilemd5, + } + if media_type == MEDIA_VOICE and path.endswith(".silk"): + item_kwargs["encode_type"] = 6 + item_kwargs["sample_rate"] = 24000 + item_kwargs["bits_per_sample"] = 16 + if PILK_AVAILABLE: + try: + item_kwargs["playtime"] = pilk.get_duration(path) + except Exception as exc: + logger.warning("[%s] failed to read SILK duration for %s: %s", self.name, path, exc) + media_item = item_builder(**item_kwargs) last_message_id = None if caption: @@ -1771,7 +1736,7 @@ class WeixinAdapter(BasePlatformAdapter): ) return last_message_id - def _outbound_media_builder(self, path: str): + def _outbound_media_builder(self, path: str, force_file_attachment: bool = False): mime = mimetypes.guess_type(path)[0] or "application/octet-stream" if mime.startswith("image/"): return MEDIA_IMAGE, lambda **kw: { @@ -1799,7 +1764,7 @@ class WeixinAdapter(BasePlatformAdapter): "video_md5": kw.get("rawfilemd5", ""), }, } - if mime.startswith("audio/") or path.endswith(".silk"): + if path.endswith(".silk") and not force_file_attachment: return MEDIA_VOICE, lambda **kw: { "type": ITEM_VOICE, "voice_item": { @@ -1808,9 +1773,25 @@ class WeixinAdapter(BasePlatformAdapter): "aes_key": kw["aes_key_for_api"], "encrypt_type": 1, }, + "encode_type": kw.get("encode_type"), + "bits_per_sample": kw.get("bits_per_sample"), + "sample_rate": kw.get("sample_rate"), "playtime": kw.get("playtime", 0), }, } + if mime.startswith("audio/"): + return MEDIA_FILE, lambda **kw: { + "type": ITEM_FILE, + "file_item": { + "media": { + "encrypt_query_param": kw["encrypt_query_param"], + "aes_key": kw["aes_key_for_api"], + "encrypt_type": 1, + }, + "file_name": kw["filename"], + "len": str(kw["plaintext_size"]), + }, + } return MEDIA_FILE, lambda **kw: { "type": ITEM_FILE, "file_item": { diff --git a/tests/gateway/test_weixin.py b/tests/gateway/test_weixin.py index 03aeda60bbd..211b9b8b586 100644 --- a/tests/gateway/test_weixin.py +++ b/tests/gateway/test_weixin.py @@ -501,10 +501,10 @@ class TestWeixinMediaBuilder: ) assert item["video_item"]["video_md5"] == "deadbeef" - def test_voice_builder_for_audio_files(self): + def test_voice_builder_for_audio_files_uses_file_attachment_type(self): adapter = _make_adapter() media_type, builder = adapter._outbound_media_builder("note.mp3") - assert media_type == weixin.MEDIA_VOICE + assert media_type == weixin.MEDIA_FILE item = builder( encrypt_query_param="eq", @@ -514,8 +514,8 @@ class TestWeixinMediaBuilder: filename="note.mp3", rawfilemd5="abc", ) - assert item["type"] == weixin.ITEM_VOICE - assert "voice_item" in item + assert item["type"] == weixin.ITEM_FILE + assert item["file_item"]["file_name"] == "note.mp3" def test_voice_builder_for_silk_files(self): adapter = _make_adapter() @@ -593,41 +593,65 @@ class TestWeixinVoiceSending: return adapter @patch.object(WeixinAdapter, "_send_file", new_callable=AsyncMock) - @patch.object(WeixinAdapter, "_prepare_voice_payload") - def test_send_voice_uses_silk_payload(self, prepare_mock, send_file_mock, tmp_path): + def test_send_voice_downgrades_to_document_attachment(self, send_file_mock, tmp_path): adapter = self._connected_adapter() source = tmp_path / "voice.ogg" - silk = tmp_path / "voice.silk" source.write_bytes(b"ogg") - silk.write_bytes(b"silk") - prepare_mock.return_value = str(silk) send_file_mock.return_value = "msg-1" result = asyncio.run(adapter.send_voice("wxid_test123", str(source))) assert result.success is True - prepare_mock.assert_called_once_with(str(source)) - send_file_mock.assert_awaited_once_with("wxid_test123", str(silk), "") + send_file_mock.assert_awaited_once_with( + "wxid_test123", + str(source), + "[voice message as attachment]", + force_file_attachment=True, + ) - @patch("gateway.platforms.weixin.pilk.encode") - @patch.object(WeixinAdapter, "_transcode_audio_to_wav") - def test_prepare_voice_payload_transcodes_to_silk(self, transcode_mock, pilk_encode_mock, tmp_path): + def test_voice_builder_for_silk_files_can_be_forced_to_file_attachment(self): adapter = _make_adapter() - src = tmp_path / "voice.ogg" - src.write_bytes(b"ogg") - wav = tmp_path / "voice.wav" - wav.write_bytes(b"wav") - transcode_mock.return_value = str(wav) + media_type, builder = adapter._outbound_media_builder( + "recording.silk", + force_file_attachment=True, + ) + assert media_type == weixin.MEDIA_FILE - def _fake_encode(infile, outfile, **kwargs): - Path(outfile).write_bytes(b"silk-bytes") + item = builder( + encrypt_query_param="eq", + aes_key_for_api="fakekey", + ciphertext_size=512, + plaintext_size=500, + filename="recording.silk", + rawfilemd5="abc", + ) + assert item["type"] == weixin.ITEM_FILE + assert item["file_item"]["file_name"] == "recording.silk" - pilk_encode_mock.side_effect = _fake_encode + @patch.object(weixin, "_api_post", new_callable=AsyncMock) + @patch.object(weixin, "_upload_ciphertext", new_callable=AsyncMock) + @patch.object(weixin, "_get_upload_url", new_callable=AsyncMock) + def test_send_file_sets_voice_playtime_from_silk_duration( + self, + get_upload_url_mock, + upload_ciphertext_mock, + api_post_mock, + tmp_path, + ): + adapter = self._connected_adapter() + silk = tmp_path / "voice.silk" + silk.write_bytes(b"\x02#!SILK_V3\x01\x00") + get_upload_url_mock.return_value = {"upload_full_url": "https://cdn.example.com/upload"} + upload_ciphertext_mock.return_value = "enc-q" + api_post_mock.return_value = {"success": True} - silk_path = adapter._prepare_voice_payload(str(src)) + with patch("gateway.platforms.weixin.pilk.get_duration", return_value=1260) as duration_mock: + asyncio.run(adapter._send_file("wxid_test123", str(silk), "")) - assert silk_path.endswith('.silk') - assert Path(silk_path).read_bytes() == b"silk-bytes" - pilk_encode_mock.assert_called_once_with(str(wav), silk_path, tencent=True) - assert not wav.exists() - os.unlink(silk_path) + duration_mock.assert_called_once_with(str(silk)) + payload = api_post_mock.await_args.kwargs["payload"] + voice_item = payload["msg"]["item_list"][0]["voice_item"] + assert voice_item["playtime"] == 1260 + assert voice_item["encode_type"] == 6 + assert voice_item["sample_rate"] == 24000 + assert voice_item["bits_per_sample"] == 16