hermes-agent/tests/gateway/test_weixin.py
2026-04-17 06:50:36 -07:00

760 lines
28 KiB
Python

"""Tests for the Weixin platform adapter."""
import asyncio
import base64
import json
import os
from pathlib import Path
from unittest.mock import AsyncMock, patch
from gateway.config import PlatformConfig
from gateway.config import GatewayConfig, HomeChannel, Platform, _apply_env_overrides
from gateway.platforms.base import SendResult
from gateway.platforms import weixin
from gateway.platforms.weixin import ContextTokenStore, WeixinAdapter
from tools.send_message_tool import _parse_target_ref, _send_to_platform
def _make_adapter() -> WeixinAdapter:
return WeixinAdapter(
PlatformConfig(
enabled=True,
token="test-token",
extra={"account_id": "test-account"},
)
)
class TestWeixinFormatting:
def test_format_message_preserves_markdown(self):
adapter = _make_adapter()
content = "# Title\n\n## Plan\n\nUse **bold** and [docs](https://example.com)."
assert adapter.format_message(content) == content
def test_format_message_preserves_markdown_tables(self):
adapter = _make_adapter()
content = (
"| Setting | Value |\n"
"| --- | --- |\n"
"| Timeout | 30s |\n"
"| Retries | 3 |\n"
)
assert adapter.format_message(content) == content.strip()
def test_format_message_preserves_fenced_code_blocks(self):
adapter = _make_adapter()
content = "## Snippet\n\n```python\nprint('hi')\n```"
assert adapter.format_message(content) == content
def test_format_message_returns_empty_string_for_none(self):
adapter = _make_adapter()
assert adapter.format_message(None) == ""
class TestWeixinChunking:
def test_split_text_splits_short_chatty_replies_into_separate_bubbles(self):
adapter = _make_adapter()
content = adapter.format_message("第一行\n第二行\n第三行")
chunks = adapter._split_text(content)
assert chunks == ["第一行", "第二行", "第三行"]
def test_split_text_keeps_structured_table_block_together(self):
adapter = _make_adapter()
content = adapter.format_message(
"- Setting: Timeout\n Value: 30s\n- Setting: Retries\n Value: 3"
)
chunks = adapter._split_text(content)
assert chunks == ["- Setting: Timeout\n Value: 30s\n- Setting: Retries\n Value: 3"]
def test_split_text_keeps_four_line_structured_blocks_together(self):
adapter = _make_adapter()
content = adapter.format_message(
"今天结论:\n"
"- 留存下降 3%\n"
"- 转化上涨 8%\n"
"- 主要问题在首日激活"
)
chunks = adapter._split_text(content)
assert chunks == ["今天结论:\n- 留存下降 3%\n- 转化上涨 8%\n- 主要问题在首日激活"]
def test_split_text_keeps_heading_with_body_together(self):
adapter = _make_adapter()
content = adapter.format_message("## 结论\n这是正文")
chunks = adapter._split_text(content)
assert chunks == ["## 结论\n这是正文"]
def test_split_text_keeps_short_reformatted_table_in_single_chunk(self):
adapter = _make_adapter()
content = adapter.format_message(
"| Setting | Value |\n"
"| --- | --- |\n"
"| Timeout | 30s |\n"
"| Retries | 3 |\n"
)
chunks = adapter._split_text(content)
assert chunks == [content]
def test_split_text_keeps_complete_code_block_together_when_possible(self):
adapter = _make_adapter()
adapter.MAX_MESSAGE_LENGTH = 80
content = adapter.format_message(
"## Intro\n\nShort paragraph.\n\n```python\nprint('hello world')\nprint('again')\n```\n\nTail paragraph."
)
chunks = adapter._split_text(content)
assert len(chunks) >= 2
assert any(
"```python\nprint('hello world')\nprint('again')\n```" in chunk
for chunk in chunks
)
assert all(chunk.count("```") % 2 == 0 for chunk in chunks)
def test_split_text_safely_splits_long_code_blocks(self):
adapter = _make_adapter()
adapter.MAX_MESSAGE_LENGTH = 70
lines = "\n".join(f"line_{idx:02d} = {idx}" for idx in range(10))
content = adapter.format_message(f"```python\n{lines}\n```")
chunks = adapter._split_text(content)
assert len(chunks) > 1
assert all(len(chunk) <= adapter.MAX_MESSAGE_LENGTH for chunk in chunks)
assert all(chunk.count("```") >= 2 for chunk in chunks)
def test_split_text_can_restore_legacy_multiline_splitting_via_config(self):
adapter = WeixinAdapter(
PlatformConfig(
enabled=True,
extra={
"account_id": "acct",
"token": "***",
"split_multiline_messages": True,
},
)
)
content = adapter.format_message("第一行\n第二行\n第三行")
chunks = adapter._split_text(content)
assert chunks == ["第一行", "第二行", "第三行"]
class TestWeixinConfig:
def test_apply_env_overrides_configures_weixin(self):
config = GatewayConfig()
with patch.dict(
os.environ,
{
"WEIXIN_ACCOUNT_ID": "bot-account",
"WEIXIN_TOKEN": "bot-token",
"WEIXIN_BASE_URL": "https://ilink.example.com/",
"WEIXIN_CDN_BASE_URL": "https://cdn.example.com/c2c/",
"WEIXIN_DM_POLICY": "allowlist",
"WEIXIN_SPLIT_MULTILINE_MESSAGES": "true",
"WEIXIN_ALLOWED_USERS": "wxid_1,wxid_2",
"WEIXIN_HOME_CHANNEL": "wxid_1",
"WEIXIN_HOME_CHANNEL_NAME": "Primary DM",
},
clear=True,
):
_apply_env_overrides(config)
platform_config = config.platforms[Platform.WEIXIN]
assert platform_config.enabled is True
assert platform_config.token == "bot-token"
assert platform_config.extra["account_id"] == "bot-account"
assert platform_config.extra["base_url"] == "https://ilink.example.com"
assert platform_config.extra["cdn_base_url"] == "https://cdn.example.com/c2c"
assert platform_config.extra["dm_policy"] == "allowlist"
assert platform_config.extra["split_multiline_messages"] == "true"
assert platform_config.extra["allow_from"] == "wxid_1,wxid_2"
assert platform_config.home_channel == HomeChannel(Platform.WEIXIN, "wxid_1", "Primary DM")
def test_get_connected_platforms_includes_weixin_with_token(self):
config = GatewayConfig(
platforms={
Platform.WEIXIN: PlatformConfig(
enabled=True,
token="bot-token",
extra={"account_id": "bot-account"},
)
}
)
assert config.get_connected_platforms() == [Platform.WEIXIN]
def test_get_connected_platforms_requires_account_id(self):
config = GatewayConfig(
platforms={
Platform.WEIXIN: PlatformConfig(
enabled=True,
token="bot-token",
)
}
)
assert config.get_connected_platforms() == []
class TestWeixinStatePersistence:
def test_save_weixin_account_preserves_existing_file_on_replace_failure(self, tmp_path, monkeypatch):
account_path = tmp_path / "weixin" / "accounts" / "acct.json"
account_path.parent.mkdir(parents=True, exist_ok=True)
original = {"token": "old-token", "base_url": "https://old.example.com"}
account_path.write_text(json.dumps(original), encoding="utf-8")
def _boom(_src, _dst):
raise OSError("disk full")
monkeypatch.setattr("utils.os.replace", _boom)
try:
weixin.save_weixin_account(
str(tmp_path),
account_id="acct",
token="new-token",
base_url="https://new.example.com",
user_id="wxid_new",
)
except OSError:
pass
else:
raise AssertionError("expected save_weixin_account to propagate replace failure")
assert json.loads(account_path.read_text(encoding="utf-8")) == original
def test_context_token_persist_preserves_existing_file_on_replace_failure(self, tmp_path, monkeypatch):
token_path = tmp_path / "weixin" / "accounts" / "acct.context-tokens.json"
token_path.parent.mkdir(parents=True, exist_ok=True)
token_path.write_text(json.dumps({"user-a": "old-token"}), encoding="utf-8")
def _boom(_src, _dst):
raise OSError("disk full")
monkeypatch.setattr("utils.os.replace", _boom)
store = ContextTokenStore(str(tmp_path))
with patch.object(weixin.logger, "warning") as warning_mock:
store.set("acct", "user-b", "new-token")
assert json.loads(token_path.read_text(encoding="utf-8")) == {"user-a": "old-token"}
warning_mock.assert_called_once()
def test_save_sync_buf_preserves_existing_file_on_replace_failure(self, tmp_path, monkeypatch):
sync_path = tmp_path / "weixin" / "accounts" / "acct.sync.json"
sync_path.parent.mkdir(parents=True, exist_ok=True)
sync_path.write_text(json.dumps({"get_updates_buf": "old-sync"}), encoding="utf-8")
def _boom(_src, _dst):
raise OSError("disk full")
monkeypatch.setattr("utils.os.replace", _boom)
try:
weixin._save_sync_buf(str(tmp_path), "acct", "new-sync")
except OSError:
pass
else:
raise AssertionError("expected _save_sync_buf to propagate replace failure")
assert json.loads(sync_path.read_text(encoding="utf-8")) == {"get_updates_buf": "old-sync"}
class TestWeixinSendMessageIntegration:
def test_parse_target_ref_accepts_weixin_ids(self):
assert _parse_target_ref("weixin", "wxid_test123") == ("wxid_test123", None, True)
assert _parse_target_ref("weixin", "filehelper") == ("filehelper", None, True)
assert _parse_target_ref("weixin", "group@chatroom") == ("group@chatroom", None, True)
@patch("tools.send_message_tool._send_weixin", new_callable=AsyncMock)
def test_send_to_platform_routes_weixin_media_to_native_helper(self, send_weixin_mock):
send_weixin_mock.return_value = {"success": True, "platform": "weixin", "chat_id": "wxid_test123"}
config = PlatformConfig(enabled=True, token="bot-token", extra={"account_id": "bot-account"})
result = asyncio.run(
_send_to_platform(
Platform.WEIXIN,
config,
"wxid_test123",
"hello",
media_files=[("/tmp/demo.png", False)],
)
)
assert result["success"] is True
send_weixin_mock.assert_awaited_once_with(
config,
"wxid_test123",
"hello",
media_files=[("/tmp/demo.png", False)],
)
class TestWeixinChunkDelivery:
def _connected_adapter(self) -> WeixinAdapter:
adapter = _make_adapter()
adapter._session = object()
adapter._send_session = adapter._session
adapter._token = "test-token"
adapter._base_url = "https://weixin.example.com"
adapter._token_store.get = lambda account_id, chat_id: "ctx-token"
return adapter
@patch("gateway.platforms.weixin.asyncio.sleep", new_callable=AsyncMock)
@patch("gateway.platforms.weixin._send_message", new_callable=AsyncMock)
def test_send_waits_between_multiple_chunks(self, send_message_mock, sleep_mock):
adapter = self._connected_adapter()
adapter.MAX_MESSAGE_LENGTH = 12
# Use double newlines so _pack_markdown_blocks splits into 3 blocks
result = asyncio.run(adapter.send("wxid_test123", "first\n\nsecond\n\nthird"))
assert result.success is True
assert send_message_mock.await_count == 3
assert sleep_mock.await_count == 2
@patch("gateway.platforms.weixin.asyncio.sleep", new_callable=AsyncMock)
@patch("gateway.platforms.weixin._send_message", new_callable=AsyncMock)
def test_send_retries_failed_chunk_before_continuing(self, send_message_mock, sleep_mock):
adapter = self._connected_adapter()
adapter.MAX_MESSAGE_LENGTH = 12
calls = {"count": 0}
async def flaky_send(*args, **kwargs):
calls["count"] += 1
if calls["count"] == 2:
raise RuntimeError("temporary iLink failure")
send_message_mock.side_effect = flaky_send
# Use double newlines so _pack_markdown_blocks splits into 3 blocks
result = asyncio.run(adapter.send("wxid_test123", "first\n\nsecond\n\nthird"))
assert result.success is True
# 3 chunks, but chunk 2 fails once and retries → 4 _send_message calls total
assert send_message_mock.await_count == 4
# The retried chunk should reuse the same client_id for deduplication
first_try = send_message_mock.await_args_list[1].kwargs
retry = send_message_mock.await_args_list[2].kwargs
assert first_try["text"] == retry["text"]
assert first_try["client_id"] == retry["client_id"]
class TestWeixinOutboundMedia:
def test_send_image_file_accepts_keyword_image_path(self):
adapter = _make_adapter()
expected = SendResult(success=True, message_id="msg-1")
adapter.send_document = AsyncMock(return_value=expected)
result = asyncio.run(
adapter.send_image_file(
chat_id="wxid_test123",
image_path="/tmp/demo.png",
caption="截图说明",
reply_to="reply-1",
metadata={"thread_id": "t-1"},
)
)
assert result == expected
adapter.send_document.assert_awaited_once_with(
chat_id="wxid_test123",
file_path="/tmp/demo.png",
caption="截图说明",
metadata={"thread_id": "t-1"},
)
def test_send_document_accepts_keyword_file_path(self):
adapter = _make_adapter()
adapter._session = object()
adapter._send_session = adapter._session
adapter._token = "test-token"
adapter._send_file = AsyncMock(return_value="msg-2")
result = asyncio.run(
adapter.send_document(
chat_id="wxid_test123",
file_path="/tmp/report.pdf",
caption="报告请看",
file_name="renamed.pdf",
reply_to="reply-1",
metadata={"thread_id": "t-1"},
)
)
assert result.success is True
assert result.message_id == "msg-2"
adapter._send_file.assert_awaited_once_with("wxid_test123", "/tmp/report.pdf", "报告请看")
def test_send_file_uses_post_for_upload_full_url_and_hex_encoded_aes_key(self, tmp_path):
class _UploadResponse:
def __init__(self):
self.status = 200
self.headers = {"x-encrypted-param": "enc-param"}
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc, tb):
return False
async def read(self):
return b""
async def text(self):
return ""
class _RecordingSession:
def __init__(self):
self.post_calls = []
def post(self, url, **kwargs):
self.post_calls.append((url, kwargs))
return _UploadResponse()
def put(self, *_args, **_kwargs):
raise AssertionError("upload_full_url branch should use POST")
image_path = tmp_path / "demo.png"
image_path.write_bytes(b"fake-png-bytes")
adapter = _make_adapter()
session = _RecordingSession()
adapter._session = session
adapter._send_session = session
adapter._token = "test-token"
adapter._base_url = "https://weixin.example.com"
adapter._cdn_base_url = "https://cdn.example.com/c2c"
adapter._token_store.get = lambda account_id, chat_id: None
aes_key = bytes(range(16))
expected_aes_key = base64.b64encode(aes_key.hex().encode("ascii")).decode("ascii")
with patch("gateway.platforms.weixin._get_upload_url", new=AsyncMock(return_value={"upload_full_url": "https://upload.example.com/media"})), \
patch("gateway.platforms.weixin._api_post", new_callable=AsyncMock) as api_post_mock, \
patch("gateway.platforms.weixin.secrets.token_hex", return_value="filekey-123"), \
patch("gateway.platforms.weixin.secrets.token_bytes", return_value=aes_key):
message_id = asyncio.run(adapter._send_file("wxid_test123", str(image_path), ""))
assert message_id.startswith("hermes-weixin-")
assert len(session.post_calls) == 1
upload_url, upload_kwargs = session.post_calls[0]
assert upload_url == "https://upload.example.com/media"
assert upload_kwargs["headers"] == {"Content-Type": "application/octet-stream"}
assert upload_kwargs["data"]
assert upload_kwargs["timeout"].total == 120
payload = api_post_mock.await_args.kwargs["payload"]
media = payload["msg"]["item_list"][0]["image_item"]["media"]
assert media["encrypt_query_param"] == "enc-param"
assert media["aes_key"] == expected_aes_key
class TestWeixinRemoteMediaSafety:
def test_download_remote_media_blocks_unsafe_urls(self):
adapter = _make_adapter()
with patch("tools.url_safety.is_safe_url", return_value=False):
try:
asyncio.run(adapter._download_remote_media("http://127.0.0.1/private.png"))
except ValueError as exc:
assert "Blocked unsafe URL" in str(exc)
else:
raise AssertionError("expected ValueError for unsafe URL")
class TestWeixinMarkdownLinks:
"""Markdown links should be preserved so WeChat can render them natively."""
def test_format_message_preserves_markdown_links(self):
adapter = _make_adapter()
content = "Check [the docs](https://example.com) and [GitHub](https://github.com) for details"
assert adapter.format_message(content) == content
def test_format_message_preserves_links_inside_code_blocks(self):
adapter = _make_adapter()
content = "See below:\n\n```\n[link](https://example.com)\n```\n\nDone."
result = adapter.format_message(content)
assert "[link](https://example.com)" in result
class TestWeixinBlankMessagePrevention:
"""Regression tests for the blank-bubble bugs.
Three separate guards now prevent a blank WeChat message from ever being
dispatched:
1. ``_split_text_for_weixin_delivery("")`` returns ``[]`` — not ``[""]``.
2. ``send()`` filters out empty/whitespace-only chunks before calling
``_send_text_chunk``.
3. ``_send_message()`` raises ``ValueError`` for empty text as a last-resort
safety net.
"""
def test_split_text_returns_empty_list_for_empty_string(self):
adapter = _make_adapter()
assert adapter._split_text("") == []
def test_split_text_returns_empty_list_for_empty_string_split_per_line(self):
adapter = WeixinAdapter(
PlatformConfig(
enabled=True,
extra={
"account_id": "acct",
"token": "test-tok",
"split_multiline_messages": True,
},
)
)
assert adapter._split_text("") == []
@patch("gateway.platforms.weixin._send_message", new_callable=AsyncMock)
def test_send_empty_content_does_not_call_send_message(self, send_message_mock):
adapter = _make_adapter()
adapter._session = object()
adapter._send_session = adapter._session
adapter._token = "test-token"
adapter._base_url = "https://weixin.example.com"
adapter._token_store.get = lambda account_id, chat_id: "ctx-token"
result = asyncio.run(adapter.send("wxid_test123", ""))
# Empty content → no chunks → no _send_message calls
assert result.success is True
send_message_mock.assert_not_awaited()
def test_send_message_rejects_empty_text(self):
"""_send_message raises ValueError for empty/whitespace text."""
import pytest
with pytest.raises(ValueError, match="text must not be empty"):
asyncio.run(
weixin._send_message(
AsyncMock(),
base_url="https://example.com",
token="tok",
to="wxid_test",
text="",
context_token=None,
client_id="cid",
)
)
class TestWeixinStreamingCursorSuppression:
"""WeChat doesn't support message editing — cursor must be suppressed."""
def test_supports_message_editing_is_false(self):
adapter = _make_adapter()
assert adapter.SUPPORTS_MESSAGE_EDITING is False
class TestWeixinMediaBuilder:
"""Media builder uses base64(hex_key), not base64(raw_bytes) for aes_key."""
def test_image_builder_aes_key_is_base64_of_hex(self):
import base64
adapter = _make_adapter()
media_type, builder = adapter._outbound_media_builder("photo.jpg")
assert media_type == weixin.MEDIA_IMAGE
fake_hex_key = "0123456789abcdef0123456789abcdef"
expected_aes = base64.b64encode(fake_hex_key.encode("ascii")).decode("ascii")
item = builder(
encrypt_query_param="eq",
aes_key_for_api=expected_aes,
ciphertext_size=1024,
plaintext_size=1000,
filename="photo.jpg",
rawfilemd5="abc123",
)
assert item["image_item"]["media"]["aes_key"] == expected_aes
def test_video_builder_includes_md5(self):
adapter = _make_adapter()
media_type, builder = adapter._outbound_media_builder("clip.mp4")
assert media_type == weixin.MEDIA_VIDEO
item = builder(
encrypt_query_param="eq",
aes_key_for_api="fakekey",
ciphertext_size=2048,
plaintext_size=2000,
filename="clip.mp4",
rawfilemd5="deadbeef",
)
assert item["video_item"]["video_md5"] == "deadbeef"
def test_voice_builder_for_audio_files_uses_file_attachment_type(self):
adapter = _make_adapter()
media_type, builder = adapter._outbound_media_builder("note.mp3")
assert media_type == weixin.MEDIA_FILE
item = builder(
encrypt_query_param="eq",
aes_key_for_api="fakekey",
ciphertext_size=512,
plaintext_size=500,
filename="note.mp3",
rawfilemd5="abc",
)
assert item["type"] == weixin.ITEM_FILE
assert item["file_item"]["file_name"] == "note.mp3"
def test_voice_builder_for_silk_files(self):
adapter = _make_adapter()
media_type, builder = adapter._outbound_media_builder("recording.silk")
assert media_type == weixin.MEDIA_VOICE
class TestWeixinSendImageFileParameterName:
"""Regression test for send_image_file parameter name mismatch.
The gateway calls send_image_file(chat_id=..., image_path=...) but the
WeixinAdapter previously used 'path' as the parameter name, causing
image sending to fail. This test ensures the interface stays correct.
"""
@patch.object(WeixinAdapter, "send_document", new_callable=AsyncMock)
def test_send_image_file_uses_image_path_parameter(self, send_document_mock):
"""Verify send_image_file accepts image_path and forwards to send_document."""
adapter = _make_adapter()
adapter._session = object()
adapter._send_session = adapter._session
adapter._token = "test-token"
send_document_mock.return_value = weixin.SendResult(success=True, message_id="test-id")
# This is the call pattern used by gateway/run.py extract_media
result = asyncio.run(
adapter.send_image_file(
chat_id="wxid_test123",
image_path="/tmp/test_image.png",
caption="Test caption",
metadata={"thread_id": "thread-123"},
)
)
assert result.success is True
send_document_mock.assert_awaited_once_with(
chat_id="wxid_test123",
file_path="/tmp/test_image.png",
caption="Test caption",
metadata={"thread_id": "thread-123"},
)
@patch.object(WeixinAdapter, "send_document", new_callable=AsyncMock)
def test_send_image_file_works_without_optional_params(self, send_document_mock):
"""Verify send_image_file works with minimal required params."""
adapter = _make_adapter()
adapter._session = object()
adapter._send_session = adapter._session
adapter._token = "test-token"
send_document_mock.return_value = weixin.SendResult(success=True, message_id="test-id")
result = asyncio.run(
adapter.send_image_file(
chat_id="wxid_test123",
image_path="/tmp/test_image.jpg",
)
)
assert result.success is True
send_document_mock.assert_awaited_once_with(
chat_id="wxid_test123",
file_path="/tmp/test_image.jpg",
caption=None,
metadata=None,
)
class TestWeixinVoiceSending:
def _connected_adapter(self) -> WeixinAdapter:
adapter = _make_adapter()
adapter._session = object()
adapter._send_session = adapter._session
adapter._token = "test-token"
adapter._base_url = "https://weixin.example.com"
adapter._token_store.get = lambda account_id, chat_id: "ctx-token"
return adapter
@patch.object(WeixinAdapter, "_send_file", new_callable=AsyncMock)
def test_send_voice_downgrades_to_document_attachment(self, send_file_mock, tmp_path):
adapter = self._connected_adapter()
source = tmp_path / "voice.ogg"
source.write_bytes(b"ogg")
send_file_mock.return_value = "msg-1"
result = asyncio.run(adapter.send_voice("wxid_test123", str(source)))
assert result.success is True
send_file_mock.assert_awaited_once_with(
"wxid_test123",
str(source),
"[voice message as attachment]",
force_file_attachment=True,
)
def test_voice_builder_for_silk_files_can_be_forced_to_file_attachment(self):
adapter = _make_adapter()
media_type, builder = adapter._outbound_media_builder(
"recording.silk",
force_file_attachment=True,
)
assert media_type == weixin.MEDIA_FILE
item = builder(
encrypt_query_param="eq",
aes_key_for_api="fakekey",
ciphertext_size=512,
plaintext_size=500,
filename="recording.silk",
rawfilemd5="abc",
)
assert item["type"] == weixin.ITEM_FILE
assert item["file_item"]["file_name"] == "recording.silk"
@patch.object(weixin, "_api_post", new_callable=AsyncMock)
@patch.object(weixin, "_upload_ciphertext", new_callable=AsyncMock)
@patch.object(weixin, "_get_upload_url", new_callable=AsyncMock)
def test_send_file_sets_voice_metadata_for_silk_payload(
self,
get_upload_url_mock,
upload_ciphertext_mock,
api_post_mock,
tmp_path,
):
adapter = self._connected_adapter()
silk = tmp_path / "voice.silk"
silk.write_bytes(b"\x02#!SILK_V3\x01\x00")
get_upload_url_mock.return_value = {"upload_full_url": "https://cdn.example.com/upload"}
upload_ciphertext_mock.return_value = "enc-q"
api_post_mock.return_value = {"success": True}
asyncio.run(adapter._send_file("wxid_test123", str(silk), ""))
payload = api_post_mock.await_args.kwargs["payload"]
voice_item = payload["msg"]["item_list"][0]["voice_item"]
assert voice_item.get("playtime", 0) == 0
assert voice_item["encode_type"] == 6
assert voice_item["sample_rate"] == 24000
assert voice_item["bits_per_sample"] == 16