fix: force Weixin voice fallback to file attachments

This commit is contained in:
Patrick Wang 2026-04-14 05:10:04 +00:00 committed by Teknium
parent 678b69ec1b
commit 649f38390c
2 changed files with 106 additions and 101 deletions

View file

@ -25,7 +25,6 @@ import struct
import tempfile
import time
import uuid
import subprocess
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
@ -1602,71 +1601,21 @@ class WeixinAdapter(BasePlatformAdapter):
if not self._session or not self._token:
return SendResult(success=False, error="Not connected")
temp_paths: List[str] = []
# Native outbound Weixin voice bubbles are not proven-working in the
# upstream reference implementation. Prefer a reliable file attachment
# fallback so users at least receive playable audio, even for .silk.
fallback_caption = caption or "[voice message as attachment]"
try:
voice_path = self._prepare_voice_payload(audio_path)
if voice_path != audio_path:
temp_paths.append(voice_path)
message_id = await self._send_file(chat_id, voice_path, caption or "")
message_id = await self._send_file(
chat_id,
audio_path,
fallback_caption,
force_file_attachment=True,
)
return SendResult(success=True, message_id=message_id)
except Exception as exc:
logger.error("[%s] send_voice failed to=%s: %s", self.name, _safe_id(chat_id), exc)
return SendResult(success=False, error=str(exc))
finally:
for path in temp_paths:
try:
os.unlink(path)
except OSError:
pass
def _prepare_voice_payload(self, audio_path: str) -> str:
path = str(audio_path)
if path.endswith(".silk"):
return path
if not PILK_AVAILABLE:
raise RuntimeError(
"Weixin native voice requires SILK encoding, but pilk is not installed"
)
wav_path = self._transcode_audio_to_wav(path)
try:
fd, silk_path = tempfile.mkstemp(suffix='.silk')
os.close(fd)
pilk.encode(wav_path, silk_path, tencent=True)
if not os.path.exists(silk_path) or os.path.getsize(silk_path) <= 0:
raise RuntimeError("Generated SILK voice file is empty")
return silk_path
finally:
try:
os.unlink(wav_path)
except OSError:
pass
def _transcode_audio_to_wav(self, input_path: str) -> str:
fd, wav_path = tempfile.mkstemp(suffix='.wav')
os.close(fd)
try:
result = subprocess.run(
[
'ffmpeg', '-y', '-i', input_path,
'-ar', '24000', '-ac', '1', '-f', 'wav', wav_path,
],
capture_output=True,
timeout=60,
check=False,
)
if result.returncode != 0:
stderr = result.stderr.decode('utf-8', errors='ignore')[:400]
raise RuntimeError(f"ffmpeg voice conversion failed: {stderr}")
if not os.path.exists(wav_path) or os.path.getsize(wav_path) <= 0:
raise RuntimeError("ffmpeg produced empty wav for Weixin voice")
return wav_path
except Exception:
try:
os.unlink(wav_path)
except OSError:
pass
raise
async def _download_remote_media(self, url: str) -> str:
from tools.url_safety import is_safe_url
@ -1683,10 +1632,16 @@ class WeixinAdapter(BasePlatformAdapter):
handle.write(data)
return handle.name
async def _send_file(self, chat_id: str, path: str, caption: str) -> str:
async def _send_file(
self,
chat_id: str,
path: str,
caption: str,
force_file_attachment: bool = False,
) -> str:
assert self._session is not None and self._token is not None
plaintext = Path(path).read_bytes()
media_type, item_builder = self._outbound_media_builder(path)
media_type, item_builder = self._outbound_media_builder(path, force_file_attachment=force_file_attachment)
filekey = secrets.token_hex(16)
aes_key = secrets.token_bytes(16)
rawsize = len(plaintext)
@ -1728,14 +1683,24 @@ class WeixinAdapter(BasePlatformAdapter):
# Sending base64(raw_bytes) causes images to show as grey boxes on the
# receiver side because the decryption key doesn't match.
aes_key_for_api = base64.b64encode(aes_key.hex().encode("ascii")).decode("ascii")
media_item = item_builder(
encrypt_query_param=encrypted_query_param,
aes_key_for_api=aes_key_for_api,
ciphertext_size=len(ciphertext),
plaintext_size=rawsize,
filename=Path(path).name,
rawfilemd5=rawfilemd5,
)
item_kwargs = {
"encrypt_query_param": encrypted_query_param,
"aes_key_for_api": aes_key_for_api,
"ciphertext_size": len(ciphertext),
"plaintext_size": rawsize,
"filename": Path(path).name,
"rawfilemd5": rawfilemd5,
}
if media_type == MEDIA_VOICE and path.endswith(".silk"):
item_kwargs["encode_type"] = 6
item_kwargs["sample_rate"] = 24000
item_kwargs["bits_per_sample"] = 16
if PILK_AVAILABLE:
try:
item_kwargs["playtime"] = pilk.get_duration(path)
except Exception as exc:
logger.warning("[%s] failed to read SILK duration for %s: %s", self.name, path, exc)
media_item = item_builder(**item_kwargs)
last_message_id = None
if caption:
@ -1771,7 +1736,7 @@ class WeixinAdapter(BasePlatformAdapter):
)
return last_message_id
def _outbound_media_builder(self, path: str):
def _outbound_media_builder(self, path: str, force_file_attachment: bool = False):
mime = mimetypes.guess_type(path)[0] or "application/octet-stream"
if mime.startswith("image/"):
return MEDIA_IMAGE, lambda **kw: {
@ -1799,7 +1764,7 @@ class WeixinAdapter(BasePlatformAdapter):
"video_md5": kw.get("rawfilemd5", ""),
},
}
if mime.startswith("audio/") or path.endswith(".silk"):
if path.endswith(".silk") and not force_file_attachment:
return MEDIA_VOICE, lambda **kw: {
"type": ITEM_VOICE,
"voice_item": {
@ -1808,9 +1773,25 @@ class WeixinAdapter(BasePlatformAdapter):
"aes_key": kw["aes_key_for_api"],
"encrypt_type": 1,
},
"encode_type": kw.get("encode_type"),
"bits_per_sample": kw.get("bits_per_sample"),
"sample_rate": kw.get("sample_rate"),
"playtime": kw.get("playtime", 0),
},
}
if mime.startswith("audio/"):
return MEDIA_FILE, lambda **kw: {
"type": ITEM_FILE,
"file_item": {
"media": {
"encrypt_query_param": kw["encrypt_query_param"],
"aes_key": kw["aes_key_for_api"],
"encrypt_type": 1,
},
"file_name": kw["filename"],
"len": str(kw["plaintext_size"]),
},
}
return MEDIA_FILE, lambda **kw: {
"type": ITEM_FILE,
"file_item": {

View file

@ -501,10 +501,10 @@ class TestWeixinMediaBuilder:
)
assert item["video_item"]["video_md5"] == "deadbeef"
def test_voice_builder_for_audio_files(self):
def test_voice_builder_for_audio_files_uses_file_attachment_type(self):
adapter = _make_adapter()
media_type, builder = adapter._outbound_media_builder("note.mp3")
assert media_type == weixin.MEDIA_VOICE
assert media_type == weixin.MEDIA_FILE
item = builder(
encrypt_query_param="eq",
@ -514,8 +514,8 @@ class TestWeixinMediaBuilder:
filename="note.mp3",
rawfilemd5="abc",
)
assert item["type"] == weixin.ITEM_VOICE
assert "voice_item" in item
assert item["type"] == weixin.ITEM_FILE
assert item["file_item"]["file_name"] == "note.mp3"
def test_voice_builder_for_silk_files(self):
adapter = _make_adapter()
@ -593,41 +593,65 @@ class TestWeixinVoiceSending:
return adapter
@patch.object(WeixinAdapter, "_send_file", new_callable=AsyncMock)
@patch.object(WeixinAdapter, "_prepare_voice_payload")
def test_send_voice_uses_silk_payload(self, prepare_mock, send_file_mock, tmp_path):
def test_send_voice_downgrades_to_document_attachment(self, send_file_mock, tmp_path):
adapter = self._connected_adapter()
source = tmp_path / "voice.ogg"
silk = tmp_path / "voice.silk"
source.write_bytes(b"ogg")
silk.write_bytes(b"silk")
prepare_mock.return_value = str(silk)
send_file_mock.return_value = "msg-1"
result = asyncio.run(adapter.send_voice("wxid_test123", str(source)))
assert result.success is True
prepare_mock.assert_called_once_with(str(source))
send_file_mock.assert_awaited_once_with("wxid_test123", str(silk), "")
send_file_mock.assert_awaited_once_with(
"wxid_test123",
str(source),
"[voice message as attachment]",
force_file_attachment=True,
)
@patch("gateway.platforms.weixin.pilk.encode")
@patch.object(WeixinAdapter, "_transcode_audio_to_wav")
def test_prepare_voice_payload_transcodes_to_silk(self, transcode_mock, pilk_encode_mock, tmp_path):
def test_voice_builder_for_silk_files_can_be_forced_to_file_attachment(self):
adapter = _make_adapter()
src = tmp_path / "voice.ogg"
src.write_bytes(b"ogg")
wav = tmp_path / "voice.wav"
wav.write_bytes(b"wav")
transcode_mock.return_value = str(wav)
media_type, builder = adapter._outbound_media_builder(
"recording.silk",
force_file_attachment=True,
)
assert media_type == weixin.MEDIA_FILE
def _fake_encode(infile, outfile, **kwargs):
Path(outfile).write_bytes(b"silk-bytes")
item = builder(
encrypt_query_param="eq",
aes_key_for_api="fakekey",
ciphertext_size=512,
plaintext_size=500,
filename="recording.silk",
rawfilemd5="abc",
)
assert item["type"] == weixin.ITEM_FILE
assert item["file_item"]["file_name"] == "recording.silk"
pilk_encode_mock.side_effect = _fake_encode
@patch.object(weixin, "_api_post", new_callable=AsyncMock)
@patch.object(weixin, "_upload_ciphertext", new_callable=AsyncMock)
@patch.object(weixin, "_get_upload_url", new_callable=AsyncMock)
def test_send_file_sets_voice_playtime_from_silk_duration(
self,
get_upload_url_mock,
upload_ciphertext_mock,
api_post_mock,
tmp_path,
):
adapter = self._connected_adapter()
silk = tmp_path / "voice.silk"
silk.write_bytes(b"\x02#!SILK_V3\x01\x00")
get_upload_url_mock.return_value = {"upload_full_url": "https://cdn.example.com/upload"}
upload_ciphertext_mock.return_value = "enc-q"
api_post_mock.return_value = {"success": True}
silk_path = adapter._prepare_voice_payload(str(src))
with patch("gateway.platforms.weixin.pilk.get_duration", return_value=1260) as duration_mock:
asyncio.run(adapter._send_file("wxid_test123", str(silk), ""))
assert silk_path.endswith('.silk')
assert Path(silk_path).read_bytes() == b"silk-bytes"
pilk_encode_mock.assert_called_once_with(str(wav), silk_path, tencent=True)
assert not wav.exists()
os.unlink(silk_path)
duration_mock.assert_called_once_with(str(silk))
payload = api_post_mock.await_args.kwargs["payload"]
voice_item = payload["msg"]["item_list"][0]["voice_item"]
assert voice_item["playtime"] == 1260
assert voice_item["encode_type"] == 6
assert voice_item["sample_rate"] == 24000
assert voice_item["bits_per_sample"] == 16