diff --git a/gateway/platforms/weixin.py b/gateway/platforms/weixin.py index dc4e7cf96..a83dff5a8 100644 --- a/gateway/platforms/weixin.py +++ b/gateway/platforms/weixin.py @@ -112,6 +112,7 @@ TYPING_STOP = 2 _HEADER_RE = re.compile(r"^(#{1,6})\s+(.+?)\s*$") _TABLE_RULE_RE = re.compile(r"^\s*\|?(?:\s*:?-{3,}:?\s*\|)+\s*:?-{3,}:?\s*\|?\s*$") _FENCE_RE = re.compile(r"^```([^\n`]*)\s*$") +_MARKDOWN_LINK_RE = re.compile(r"\[([^\]]+)\]\(([^)]+)\)") def check_weixin_requirements() -> bool: @@ -398,15 +399,16 @@ async def _send_message( context_token: Optional[str], client_id: str, ) -> None: + if not text or not text.strip(): + raise ValueError("_send_message: text must not be empty") message: Dict[str, Any] = { "from_user_id": "", "to_user_id": to, "client_id": client_id, "message_type": MSG_TYPE_BOT, "message_state": MSG_STATE_FINISH, + "item_list": [{"type": ITEM_TEXT, "text_item": {"text": text}}], } - if text: - message["item_list"] = [{"type": ITEM_TEXT, "text_item": {"text": text}}] if context_token: message["context_token"] = context_token await _api_post( @@ -499,13 +501,15 @@ async def _upload_ciphertext( session: "aiohttp.ClientSession", *, ciphertext: bytes, - cdn_base_url: str, - upload_param: str, - filekey: str, + upload_url: str, ) -> str: - url = _cdn_upload_url(cdn_base_url, upload_param, filekey) + """Upload encrypted media to the CDN. + + Accepts either a constructed CDN URL (from upload_param) or a direct + upload_full_url — both use POST with the raw ciphertext as the body. + """ timeout = aiohttp.ClientTimeout(total=120) - async with session.post(url, data=ciphertext, headers={"Content-Type": "application/octet-stream"}, timeout=timeout) as response: + async with session.post(upload_url, data=ciphertext, headers={"Content-Type": "application/octet-stream"}, timeout=timeout) as response: if response.status == 200: encrypted_param = response.headers.get("x-encrypted-param") if encrypted_param: @@ -649,7 +653,7 @@ def _normalize_markdown_blocks(content: str) -> str: result.append(_rewrite_table_block_for_weixin(table_lines)) continue - result.append(_rewrite_headers_for_weixin(line)) + result.append(_MARKDOWN_LINK_RE.sub(r"\1 (\2)", _rewrite_headers_for_weixin(line))) i += 1 normalized = "\n".join(item.rstrip() for item in result) @@ -811,6 +815,8 @@ def _split_text_for_weixin_delivery( ``platforms.weixin.extra.split_multiline_messages`` (``true`` / ``false``) or the env var ``WEIXIN_SPLIT_MULTILINE_MESSAGES``. """ + if not content: + return [] if split_per_line: # Legacy: one message per top-level delivery unit. if len(content) <= max_length and "\n" not in content: @@ -821,14 +827,14 @@ def _split_text_for_weixin_delivery( chunks.append(unit) continue chunks.extend(_pack_markdown_blocks_for_weixin(unit, max_length)) - return chunks or [content] + return [c for c in chunks if c] or [content] # Compact (default): single message when under the limit — unless the # content looks like a short chatty exchange, in which case split into # separate bubbles for a more natural chat feel. if len(content) <= max_length: return ( - _split_delivery_units_for_weixin(content) + [u for u in _split_delivery_units_for_weixin(content) if u] if _should_split_short_chat_block_for_weixin(content) else [content] ) @@ -1042,6 +1048,10 @@ class WeixinAdapter(BasePlatformAdapter): MAX_MESSAGE_LENGTH = 4000 + # WeChat does not support editing sent messages — streaming must use the + # fallback "send-final-only" path so the cursor (▉) is never left visible. + SUPPORTS_MESSAGE_EDITING = False + def __init__(self, config: PlatformConfig): super().__init__(config, Platform.WEIXIN) extra = config.extra or {} @@ -1451,7 +1461,7 @@ class WeixinAdapter(BasePlatformAdapter): context_token = self._token_store.get(self._account_id, chat_id) last_message_id: Optional[str] = None try: - chunks = self._split_text(self.format_message(content)) + chunks = [c for c in self._split_text(self.format_message(content)) if c and c.strip()] for idx, chunk in enumerate(chunks): client_id = f"hermes-weixin-{uuid.uuid4().hex}" await self._send_text_chunk( @@ -1555,6 +1565,33 @@ class WeixinAdapter(BasePlatformAdapter): logger.error("[%s] send_document failed to=%s: %s", self.name, _safe_id(chat_id), exc) return SendResult(success=False, error=str(exc)) + async def send_video( + self, + chat_id: str, + video_path: str, + caption: Optional[str] = None, + reply_to: Optional[str] = None, + metadata: Optional[Dict[str, Any]] = None, + ) -> SendResult: + if not self._session or not self._token: + return SendResult(success=False, error="Not connected") + try: + message_id = await self._send_file(chat_id, video_path, caption or "") + return SendResult(success=True, message_id=message_id) + except Exception as exc: + logger.error("[%s] send_video failed to=%s: %s", self.name, _safe_id(chat_id), exc) + return SendResult(success=False, error=str(exc)) + + async def send_voice( + self, + chat_id: str, + audio_path: str, + caption: Optional[str] = None, + reply_to: Optional[str] = None, + metadata: Optional[Dict[str, Any]] = None, + ) -> SendResult: + return await self.send_document(chat_id, audio_path, caption=caption or "", metadata=metadata) + async def _download_remote_media(self, url: str) -> str: from tools.url_safety import is_safe_url @@ -1577,6 +1614,7 @@ class WeixinAdapter(BasePlatformAdapter): filekey = secrets.token_hex(16) aes_key = secrets.token_bytes(16) rawsize = len(plaintext) + rawfilemd5 = hashlib.md5(plaintext).hexdigest() upload_response = await _get_upload_url( self._session, base_url=self._base_url, @@ -1585,41 +1623,42 @@ class WeixinAdapter(BasePlatformAdapter): media_type=media_type, filekey=filekey, rawsize=rawsize, - rawfilemd5=hashlib.md5(plaintext).hexdigest(), + rawfilemd5=rawfilemd5, filesize=_aes_padded_size(rawsize), aeskey_hex=aes_key.hex(), ) upload_param = str(upload_response.get("upload_param") or "") upload_full_url = str(upload_response.get("upload_full_url") or "") ciphertext = _aes128_ecb_encrypt(plaintext, aes_key) - if upload_param: - encrypted_query_param = await _upload_ciphertext( - self._session, - ciphertext=ciphertext, - cdn_base_url=self._cdn_base_url, - upload_param=upload_param, - filekey=filekey, - ) - elif upload_full_url: - timeout = aiohttp.ClientTimeout(total=120) - async with self._session.put( - upload_full_url, - data=ciphertext, - headers={"Content-Type": "application/octet-stream"}, - timeout=timeout, - ) as response: - response.raise_for_status() - encrypted_query_param = response.headers.get("x-encrypted-param") or filekey + + # Prefer upload_full_url (direct CDN), fall back to constructed CDN URL + # from upload_param. Both paths use POST — the old PUT for + # upload_full_url caused 404s on the WeChat CDN. + if upload_full_url: + upload_url = upload_full_url + elif upload_param: + upload_url = _cdn_upload_url(self._cdn_base_url, upload_param, filekey) else: raise RuntimeError(f"getUploadUrl returned neither upload_param nor upload_full_url: {upload_response}") + encrypted_query_param = await _upload_ciphertext( + self._session, + ciphertext=ciphertext, + upload_url=upload_url, + ) + context_token = self._token_store.get(self._account_id, chat_id) + # The iLink API expects aes_key as base64(hex_string), not base64(raw_bytes). + # Sending base64(raw_bytes) causes images to show as grey boxes on the + # receiver side because the decryption key doesn't match. + aes_key_for_api = base64.b64encode(aes_key.hex().encode("ascii")).decode("ascii") media_item = item_builder( encrypt_query_param=encrypted_query_param, - aes_key_b64=base64.b64encode(aes_key).decode("ascii"), + aes_key_for_api=aes_key_for_api, ciphertext_size=len(ciphertext), plaintext_size=rawsize, filename=Path(path).name, + rawfilemd5=rawfilemd5, ) last_message_id = None @@ -1659,39 +1698,53 @@ class WeixinAdapter(BasePlatformAdapter): def _outbound_media_builder(self, path: str): mime = mimetypes.guess_type(path)[0] or "application/octet-stream" if mime.startswith("image/"): - return MEDIA_IMAGE, lambda **kwargs: { + return MEDIA_IMAGE, lambda **kw: { "type": ITEM_IMAGE, "image_item": { "media": { - "encrypt_query_param": kwargs["encrypt_query_param"], - "aes_key": kwargs["aes_key_b64"], + "encrypt_query_param": kw["encrypt_query_param"], + "aes_key": kw["aes_key_for_api"], "encrypt_type": 1, }, - "mid_size": kwargs["ciphertext_size"], + "mid_size": kw["ciphertext_size"], }, } if mime.startswith("video/"): - return MEDIA_VIDEO, lambda **kwargs: { + return MEDIA_VIDEO, lambda **kw: { "type": ITEM_VIDEO, "video_item": { "media": { - "encrypt_query_param": kwargs["encrypt_query_param"], - "aes_key": kwargs["aes_key_b64"], + "encrypt_query_param": kw["encrypt_query_param"], + "aes_key": kw["aes_key_for_api"], "encrypt_type": 1, }, - "video_size": kwargs["ciphertext_size"], + "video_size": kw["ciphertext_size"], + "play_length": kw.get("play_length", 0), + "video_md5": kw.get("rawfilemd5", ""), }, } - return MEDIA_FILE, lambda **kwargs: { + if mime.startswith("audio/") or path.endswith(".silk"): + return MEDIA_VOICE, lambda **kw: { + "type": ITEM_VOICE, + "voice_item": { + "media": { + "encrypt_query_param": kw["encrypt_query_param"], + "aes_key": kw["aes_key_for_api"], + "encrypt_type": 1, + }, + "playtime": kw.get("playtime", 0), + }, + } + return MEDIA_FILE, lambda **kw: { "type": ITEM_FILE, "file_item": { "media": { - "encrypt_query_param": kwargs["encrypt_query_param"], - "aes_key": kwargs["aes_key_b64"], + "encrypt_query_param": kw["encrypt_query_param"], + "aes_key": kw["aes_key_for_api"], "encrypt_type": 1, }, - "file_name": kwargs["filename"], - "len": str(kwargs["plaintext_size"]), + "file_name": kw["filename"], + "len": str(kw["plaintext_size"]), }, } diff --git a/gateway/run.py b/gateway/run.py index 94f1dde53..0b778e2f6 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -7665,10 +7665,18 @@ class GatewayRunner: from gateway.stream_consumer import GatewayStreamConsumer, StreamConsumerConfig _adapter = self.adapters.get(source.platform) if _adapter: + # Platforms that don't support editing sent messages + # (e.g. WeChat) must not show a cursor in intermediate + # sends — the cursor would be permanently visible because + # it can never be edited away. Use an empty cursor for + # such platforms so streaming still delivers the final + # response, just without the typing indicator. + _adapter_supports_edit = getattr(_adapter, "SUPPORTS_MESSAGE_EDITING", True) + _effective_cursor = _scfg.cursor if _adapter_supports_edit else "" _consumer_cfg = StreamConsumerConfig( edit_interval=_scfg.edit_interval, buffer_threshold=_scfg.buffer_threshold, - cursor=_scfg.cursor, + cursor=_effective_cursor, ) _stream_consumer = GatewayStreamConsumer( adapter=_adapter, diff --git a/tests/gateway/test_weixin.py b/tests/gateway/test_weixin.py index f2afe1049..4633171fe 100644 --- a/tests/gateway/test_weixin.py +++ b/tests/gateway/test_weixin.py @@ -30,7 +30,7 @@ class TestWeixinFormatting: assert ( adapter.format_message(content) - == "【Title】\n\n**Plan**\n\nUse **bold** and [docs](https://example.com)." + == "【Title】\n\n**Plan**\n\nUse **bold** and docs (https://example.com)." ) def test_format_message_rewrites_markdown_tables(self): @@ -374,3 +374,149 @@ class TestWeixinRemoteMediaSafety: assert "Blocked unsafe URL" in str(exc) else: raise AssertionError("expected ValueError for unsafe URL") + + +class TestWeixinMarkdownLinks: + """Markdown links should be converted to plaintext since WeChat can't render them.""" + + def test_format_message_converts_markdown_links_to_plain_text(self): + adapter = _make_adapter() + + content = "Check [the docs](https://example.com) and [GitHub](https://github.com) for details" + assert ( + adapter.format_message(content) + == "Check the docs (https://example.com) and GitHub (https://github.com) for details" + ) + + def test_format_message_preserves_links_inside_code_blocks(self): + adapter = _make_adapter() + + content = "See below:\n\n```\n[link](https://example.com)\n```\n\nDone." + result = adapter.format_message(content) + assert "[link](https://example.com)" in result + + +class TestWeixinBlankMessagePrevention: + """Regression tests for the blank-bubble bugs. + + Three separate guards now prevent a blank WeChat message from ever being + dispatched: + + 1. ``_split_text_for_weixin_delivery("")`` returns ``[]`` — not ``[""]``. + 2. ``send()`` filters out empty/whitespace-only chunks before calling + ``_send_text_chunk``. + 3. ``_send_message()`` raises ``ValueError`` for empty text as a last-resort + safety net. + """ + + def test_split_text_returns_empty_list_for_empty_string(self): + adapter = _make_adapter() + assert adapter._split_text("") == [] + + def test_split_text_returns_empty_list_for_empty_string_split_per_line(self): + adapter = WeixinAdapter( + PlatformConfig( + enabled=True, + extra={ + "account_id": "acct", + "token": "test-tok", + "split_multiline_messages": True, + }, + ) + ) + assert adapter._split_text("") == [] + + @patch("gateway.platforms.weixin._send_message", new_callable=AsyncMock) + def test_send_empty_content_does_not_call_send_message(self, send_message_mock): + adapter = _make_adapter() + adapter._session = object() + adapter._token = "test-token" + adapter._base_url = "https://weixin.example.com" + adapter._token_store.get = lambda account_id, chat_id: "ctx-token" + + result = asyncio.run(adapter.send("wxid_test123", "")) + # Empty content → no chunks → no _send_message calls + assert result.success is True + send_message_mock.assert_not_awaited() + + def test_send_message_rejects_empty_text(self): + """_send_message raises ValueError for empty/whitespace text.""" + import pytest + with pytest.raises(ValueError, match="text must not be empty"): + asyncio.run( + weixin._send_message( + AsyncMock(), + base_url="https://example.com", + token="tok", + to="wxid_test", + text="", + context_token=None, + client_id="cid", + ) + ) + + +class TestWeixinStreamingCursorSuppression: + """WeChat doesn't support message editing — cursor must be suppressed.""" + + def test_supports_message_editing_is_false(self): + adapter = _make_adapter() + assert adapter.SUPPORTS_MESSAGE_EDITING is False + + +class TestWeixinMediaBuilder: + """Media builder uses base64(hex_key), not base64(raw_bytes) for aes_key.""" + + def test_image_builder_aes_key_is_base64_of_hex(self): + import base64 + adapter = _make_adapter() + media_type, builder = adapter._outbound_media_builder("photo.jpg") + assert media_type == weixin.MEDIA_IMAGE + + fake_hex_key = "0123456789abcdef0123456789abcdef" + expected_aes = base64.b64encode(fake_hex_key.encode("ascii")).decode("ascii") + item = builder( + encrypt_query_param="eq", + aes_key_for_api=expected_aes, + ciphertext_size=1024, + plaintext_size=1000, + filename="photo.jpg", + rawfilemd5="abc123", + ) + assert item["image_item"]["media"]["aes_key"] == expected_aes + + def test_video_builder_includes_md5(self): + adapter = _make_adapter() + media_type, builder = adapter._outbound_media_builder("clip.mp4") + assert media_type == weixin.MEDIA_VIDEO + + item = builder( + encrypt_query_param="eq", + aes_key_for_api="fakekey", + ciphertext_size=2048, + plaintext_size=2000, + filename="clip.mp4", + rawfilemd5="deadbeef", + ) + assert item["video_item"]["video_md5"] == "deadbeef" + + def test_voice_builder_for_audio_files(self): + adapter = _make_adapter() + media_type, builder = adapter._outbound_media_builder("note.mp3") + assert media_type == weixin.MEDIA_VOICE + + item = builder( + encrypt_query_param="eq", + aes_key_for_api="fakekey", + ciphertext_size=512, + plaintext_size=500, + filename="note.mp3", + rawfilemd5="abc", + ) + assert item["type"] == weixin.ITEM_VOICE + assert "voice_item" in item + + def test_voice_builder_for_silk_files(self): + adapter = _make_adapter() + media_type, builder = adapter._outbound_media_builder("recording.silk") + assert media_type == weixin.MEDIA_VOICE