From 5ca52bae5b3faaf124f5d81d1b7562f5628edfbf Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Thu, 16 Apr 2026 14:54:38 +0800 Subject: [PATCH] fix(gateway/weixin): split poll/send sessions, reuse live adapter for cron & send_message - gateway/platforms/weixin.py: - Split aiohttp.ClientSession into _poll_session and _send_session - Add _LIVE_ADAPTERS registry so send_weixin_direct() reuses the connected gateway adapter instead of creating a competing session - Fixes silent message loss when gateway is running (iLink token contention) - cron/scheduler.py: - Support comma-separated deliver values (e.g. 'feishu,weixin') for multi-target delivery - Delay pconfig/enabled check until standalone fallback so live adapters work even when platform is not in gateway config - tools/send_message_tool.py: - Synthesize PlatformConfig from WEIXIN_* env vars when gateway config lacks a weixin entry - Fall back to WEIXIN_HOME_CHANNEL env var for home channel resolution - tests/gateway/test_weixin.py: - Update mocks to include _send_session --- cron/scheduler.py | 242 ++++++++++++++++++++--------------- gateway/platforms/weixin.py | 97 +++++++++----- tests/gateway/test_weixin.py | 2 + tools/send_message_tool.py | 28 +++- 4 files changed, 235 insertions(+), 134 deletions(-) diff --git a/cron/scheduler.py b/cron/scheduler.py index 9a0f561b0..2d7b7e9b9 100644 --- a/cron/scheduler.py +++ b/cron/scheduler.py @@ -27,7 +27,7 @@ except ImportError: except ImportError: msvcrt = None from pathlib import Path -from typing import Optional +from typing import List, Optional # Add parent directory to path for imports BEFORE repo-level imports. # Without this, standalone invocations (e.g. after `hermes update` reloads @@ -76,15 +76,14 @@ def _resolve_origin(job: dict) -> Optional[dict]: return None -def _resolve_delivery_target(job: dict) -> Optional[dict]: - """Resolve the concrete auto-delivery target for a cron job, if any.""" - deliver = job.get("deliver", "local") +def _resolve_single_delivery_target(job: dict, deliver_value: str) -> Optional[dict]: + """Resolve one concrete auto-delivery target for a cron job.""" origin = _resolve_origin(job) - if deliver == "local": + if deliver_value == "local": return None - if deliver == "origin": + if deliver_value == "origin": if origin: return { "platform": origin["platform"], @@ -108,8 +107,8 @@ def _resolve_delivery_target(job: dict) -> Optional[dict]: } return None - if ":" in deliver: - platform_name, rest = deliver.split(":", 1) + if ":" in deliver_value: + platform_name, rest = deliver_value.split(":", 1) platform_key = platform_name.lower() from tools.send_message_tool import _parse_target_ref @@ -139,7 +138,7 @@ def _resolve_delivery_target(job: dict) -> Optional[dict]: "thread_id": thread_id, } - platform_name = deliver + platform_name = deliver_value if origin and origin.get("platform") == platform_name: return { "platform": platform_name, @@ -160,6 +159,30 @@ def _resolve_delivery_target(job: dict) -> Optional[dict]: } +def _resolve_delivery_targets(job: dict) -> List[dict]: + """Resolve all concrete auto-delivery targets for a cron job (supports comma-separated deliver).""" + deliver = job.get("deliver", "local") + if deliver == "local": + return [] + parts = [p.strip() for p in str(deliver).split(",") if p.strip()] + seen = set() + targets = [] + for part in parts: + target = _resolve_single_delivery_target(job, part) + if target: + key = (target["platform"].lower(), str(target["chat_id"]), target.get("thread_id")) + if key not in seen: + seen.add(key) + targets.append(target) + return targets + + +def _resolve_delivery_target(job: dict) -> Optional[dict]: + """Resolve the concrete auto-delivery target for a cron job, if any.""" + targets = _resolve_delivery_targets(job) + return targets[0] if targets else None + + # Media extension sets — keep in sync with gateway/platforms/base.py:_process_message_background _AUDIO_EXTS = frozenset({'.ogg', '.opus', '.mp3', '.wav', '.m4a'}) _VIDEO_EXTS = frozenset({'.mp4', '.mov', '.avi', '.mkv', '.webm', '.3gp'}) @@ -200,7 +223,7 @@ def _send_media_via_adapter(adapter, chat_id: str, media_files: list, metadata: def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> Optional[str]: """ - Deliver job output to the configured target (origin chat, specific platform, etc.). + Deliver job output to the configured target(s) (origin chat, specific platform, etc.). When ``adapters`` and ``loop`` are provided (gateway is running), tries to use the live adapter first — this supports E2EE rooms (e.g. Matrix) where @@ -209,33 +232,14 @@ def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> Option Returns None on success, or an error string on failure. """ - target = _resolve_delivery_target(job) - if not target: + targets = _resolve_delivery_targets(job) + if not targets: if job.get("deliver", "local") != "local": msg = f"no delivery target resolved for deliver={job.get('deliver', 'local')}" logger.warning("Job '%s': %s", job["id"], msg) return msg return None # local-only jobs don't deliver — not a failure - platform_name = target["platform"] - chat_id = target["chat_id"] - thread_id = target.get("thread_id") - - # Diagnostic: log thread_id for topic-aware delivery debugging - origin = job.get("origin") or {} - origin_thread = origin.get("thread_id") - if origin_thread and not thread_id: - logger.warning( - "Job '%s': origin has thread_id=%s but delivery target lost it " - "(deliver=%s, target=%s)", - job["id"], origin_thread, job.get("deliver", "local"), target, - ) - elif thread_id: - logger.debug( - "Job '%s': delivering to %s:%s thread_id=%s", - job["id"], platform_name, chat_id, thread_id, - ) - from tools.send_message_tool import _send_to_platform from gateway.config import load_gateway_config, Platform @@ -258,24 +262,6 @@ def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> Option "bluebubbles": Platform.BLUEBUBBLES, "qqbot": Platform.QQBOT, } - platform = platform_map.get(platform_name.lower()) - if not platform: - msg = f"unknown platform '{platform_name}'" - logger.warning("Job '%s': %s", job["id"], msg) - return msg - - try: - config = load_gateway_config() - except Exception as e: - msg = f"failed to load gateway config: {e}" - logger.error("Job '%s': %s", job["id"], msg) - return msg - - pconfig = config.platforms.get(platform) - if not pconfig or not pconfig.enabled: - msg = f"platform '{platform_name}' not configured/enabled" - logger.warning("Job '%s': %s", job["id"], msg) - return msg # Optionally wrap the content with a header/footer so the user knows this # is a cron delivery. Wrapping is on by default; set cron.wrap_response: false @@ -304,67 +290,117 @@ def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> Option from gateway.platforms.base import BasePlatformAdapter media_files, cleaned_delivery_content = BasePlatformAdapter.extract_media(delivery_content) - # Prefer the live adapter when the gateway is running — this supports E2EE - # rooms (e.g. Matrix) where the standalone HTTP path cannot encrypt. - runtime_adapter = (adapters or {}).get(platform) - if runtime_adapter is not None and loop is not None and getattr(loop, "is_running", lambda: False)(): - send_metadata = {"thread_id": thread_id} if thread_id else None - try: - # Send cleaned text (MEDIA tags stripped) — not the raw content - text_to_send = cleaned_delivery_content.strip() - adapter_ok = True - if text_to_send: - future = asyncio.run_coroutine_threadsafe( - runtime_adapter.send(chat_id, text_to_send, metadata=send_metadata), - loop, - ) - send_result = future.result(timeout=60) - if send_result and not getattr(send_result, "success", True): - err = getattr(send_result, "error", "unknown") - logger.warning( - "Job '%s': live adapter send to %s:%s failed (%s), falling back to standalone", - job["id"], platform_name, chat_id, err, - ) - adapter_ok = False # fall through to standalone path + try: + config = load_gateway_config() + except Exception as e: + msg = f"failed to load gateway config: {e}" + logger.error("Job '%s': %s", job["id"], msg) + return msg - # Send extracted media files as native attachments via the live adapter - if adapter_ok and media_files: - _send_media_via_adapter(runtime_adapter, chat_id, media_files, send_metadata, loop, job) + delivery_errors = [] - if adapter_ok: - logger.info("Job '%s': delivered to %s:%s via live adapter", job["id"], platform_name, chat_id) - return None - except Exception as e: + for target in targets: + platform_name = target["platform"] + chat_id = target["chat_id"] + thread_id = target.get("thread_id") + + # Diagnostic: log thread_id for topic-aware delivery debugging + origin = job.get("origin") or {} + origin_thread = origin.get("thread_id") + if origin_thread and not thread_id: logger.warning( - "Job '%s': live adapter delivery to %s:%s failed (%s), falling back to standalone", - job["id"], platform_name, chat_id, e, + "Job '%s': origin has thread_id=%s but delivery target lost it " + "(deliver=%s, target=%s)", + job["id"], origin_thread, job.get("deliver", "local"), target, + ) + elif thread_id: + logger.debug( + "Job '%s': delivering to %s:%s thread_id=%s", + job["id"], platform_name, chat_id, thread_id, ) - # Standalone path: run the async send in a fresh event loop (safe from any thread) - coro = _send_to_platform(platform, pconfig, chat_id, cleaned_delivery_content, thread_id=thread_id, media_files=media_files) - try: - result = asyncio.run(coro) - except RuntimeError: - # asyncio.run() checks for a running loop before awaiting the coroutine; - # when it raises, the original coro was never started — close it to - # prevent "coroutine was never awaited" RuntimeWarning, then retry in a - # fresh thread that has no running loop. - coro.close() - import concurrent.futures - with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool: - future = pool.submit(asyncio.run, _send_to_platform(platform, pconfig, chat_id, cleaned_delivery_content, thread_id=thread_id, media_files=media_files)) - result = future.result(timeout=30) - except Exception as e: - msg = f"delivery to {platform_name}:{chat_id} failed: {e}" - logger.error("Job '%s': %s", job["id"], msg) - return msg + platform = platform_map.get(platform_name.lower()) + if not platform: + msg = f"unknown platform '{platform_name}'" + logger.warning("Job '%s': %s", job["id"], msg) + delivery_errors.append(msg) + continue - if result and result.get("error"): - msg = f"delivery error: {result['error']}" - logger.error("Job '%s': %s", job["id"], msg) - return msg + # Prefer the live adapter when the gateway is running — this supports E2EE + # rooms (e.g. Matrix) where the standalone HTTP path cannot encrypt. + runtime_adapter = (adapters or {}).get(platform) + delivered = False + if runtime_adapter is not None and loop is not None and getattr(loop, "is_running", lambda: False)(): + send_metadata = {"thread_id": thread_id} if thread_id else None + try: + # Send cleaned text (MEDIA tags stripped) — not the raw content + text_to_send = cleaned_delivery_content.strip() + adapter_ok = True + if text_to_send: + future = asyncio.run_coroutine_threadsafe( + runtime_adapter.send(chat_id, text_to_send, metadata=send_metadata), + loop, + ) + send_result = future.result(timeout=60) + if send_result and not getattr(send_result, "success", True): + err = getattr(send_result, "error", "unknown") + logger.warning( + "Job '%s': live adapter send to %s:%s failed (%s), falling back to standalone", + job["id"], platform_name, chat_id, err, + ) + adapter_ok = False # fall through to standalone path - logger.info("Job '%s': delivered to %s:%s", job["id"], platform_name, chat_id) + # Send extracted media files as native attachments via the live adapter + if adapter_ok and media_files: + _send_media_via_adapter(runtime_adapter, chat_id, media_files, send_metadata, loop, job) + + if adapter_ok: + logger.info("Job '%s': delivered to %s:%s via live adapter", job["id"], platform_name, chat_id) + delivered = True + except Exception as e: + logger.warning( + "Job '%s': live adapter delivery to %s:%s failed (%s), falling back to standalone", + job["id"], platform_name, chat_id, e, + ) + + if not delivered: + pconfig = config.platforms.get(platform) + if not pconfig or not pconfig.enabled: + msg = f"platform '{platform_name}' not configured/enabled" + logger.warning("Job '%s': %s", job["id"], msg) + delivery_errors.append(msg) + continue + + # Standalone path: run the async send in a fresh event loop (safe from any thread) + coro = _send_to_platform(platform, pconfig, chat_id, cleaned_delivery_content, thread_id=thread_id, media_files=media_files) + try: + result = asyncio.run(coro) + except RuntimeError: + # asyncio.run() checks for a running loop before awaiting the coroutine; + # when it raises, the original coro was never started — close it to + # prevent "coroutine was never awaited" RuntimeWarning, then retry in a + # fresh thread that has no running loop. + coro.close() + import concurrent.futures + with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool: + future = pool.submit(asyncio.run, _send_to_platform(platform, pconfig, chat_id, cleaned_delivery_content, thread_id=thread_id, media_files=media_files)) + result = future.result(timeout=30) + except Exception as e: + msg = f"delivery to {platform_name}:{chat_id} failed: {e}" + logger.error("Job '%s': %s", job["id"], msg) + delivery_errors.append(msg) + continue + + if result and result.get("error"): + msg = f"delivery error: {result['error']}" + logger.error("Job '%s': %s", job["id"], msg) + delivery_errors.append(msg) + continue + + logger.info("Job '%s': delivered to %s:%s", job["id"], platform_name, chat_id) + + if delivery_errors: + return "; ".join(delivery_errors) return None diff --git a/gateway/platforms/weixin.py b/gateway/platforms/weixin.py index f0b03efa8..baf344ee9 100644 --- a/gateway/platforms/weixin.py +++ b/gateway/platforms/weixin.py @@ -96,6 +96,8 @@ MEDIA_VIDEO = 2 MEDIA_FILE = 3 MEDIA_VOICE = 4 +_LIVE_ADAPTERS: Dict[str, Any] = {} + ITEM_TEXT = 1 ITEM_IMAGE = 2 ITEM_VOICE = 3 @@ -1052,7 +1054,8 @@ class WeixinAdapter(BasePlatformAdapter): self._hermes_home = hermes_home self._token_store = ContextTokenStore(hermes_home) self._typing_cache = TypingTicketCache() - self._session: Optional[aiohttp.ClientSession] = None + self._poll_session: Optional[aiohttp.ClientSession] = None + self._send_session: Optional[aiohttp.ClientSession] = None self._poll_task: Optional[asyncio.Task] = None self._dedup = MessageDeduplicator(ttl_seconds=MESSAGE_DEDUP_TTL_SECONDS) @@ -1127,14 +1130,17 @@ class WeixinAdapter(BasePlatformAdapter): except Exception as exc: logger.debug("[%s] Token lock unavailable (non-fatal): %s", self.name, exc) - self._session = aiohttp.ClientSession(trust_env=True) + self._poll_session = aiohttp.ClientSession(trust_env=True) + self._send_session = aiohttp.ClientSession(trust_env=True) self._token_store.restore(self._account_id) self._poll_task = asyncio.create_task(self._poll_loop(), name="weixin-poll") self._mark_connected() + _LIVE_ADAPTERS[self._token] = self logger.info("[%s] Connected account=%s base=%s", self.name, _safe_id(self._account_id), self._base_url) return True async def disconnect(self) -> None: + _LIVE_ADAPTERS.pop(self._token, None) self._running = False if self._poll_task and not self._poll_task.done(): self._poll_task.cancel() @@ -1143,15 +1149,18 @@ class WeixinAdapter(BasePlatformAdapter): except asyncio.CancelledError: pass self._poll_task = None - if self._session and not self._session.closed: - await self._session.close() - self._session = None + if self._poll_session and not self._poll_session.closed: + await self._poll_session.close() + self._poll_session = None + if self._send_session and not self._send_session.closed: + await self._send_session.close() + self._send_session = None self._release_platform_lock() self._mark_disconnected() logger.info("[%s] Disconnected", self.name) async def _poll_loop(self) -> None: - assert self._session is not None + assert self._poll_session is not None sync_buf = _load_sync_buf(self._hermes_home, self._account_id) timeout_ms = LONG_POLL_TIMEOUT_MS consecutive_failures = 0 @@ -1159,7 +1168,7 @@ class WeixinAdapter(BasePlatformAdapter): while self._running: try: response = await _get_updates( - self._session, + self._poll_session, base_url=self._base_url, token=self._token, sync_buf=sync_buf, @@ -1216,7 +1225,7 @@ class WeixinAdapter(BasePlatformAdapter): logger.error("[%s] unhandled inbound error from=%s: %s", self.name, _safe_id(message.get("from_user_id")), exc, exc_info=True) async def _process_message(self, message: Dict[str, Any]) -> None: - assert self._session is not None + assert self._poll_session is not None sender_id = str(message.get("from_user_id") or "").strip() if not sender_id: return @@ -1309,7 +1318,7 @@ class WeixinAdapter(BasePlatformAdapter): media = _media_reference(item, "image_item") try: data = await _download_and_decrypt_media( - self._session, + self._poll_session, cdn_base_url=self._cdn_base_url, encrypted_query_param=media.get("encrypt_query_param"), aes_key_b64=(item.get("image_item") or {}).get("aeskey") @@ -1327,7 +1336,7 @@ class WeixinAdapter(BasePlatformAdapter): media = _media_reference(item, "video_item") try: data = await _download_and_decrypt_media( - self._session, + self._poll_session, cdn_base_url=self._cdn_base_url, encrypted_query_param=media.get("encrypt_query_param"), aes_key_b64=media.get("aes_key"), @@ -1346,7 +1355,7 @@ class WeixinAdapter(BasePlatformAdapter): mime = _mime_from_filename(filename) try: data = await _download_and_decrypt_media( - self._session, + self._poll_session, cdn_base_url=self._cdn_base_url, encrypted_query_param=media.get("encrypt_query_param"), aes_key_b64=media.get("aes_key"), @@ -1365,7 +1374,7 @@ class WeixinAdapter(BasePlatformAdapter): return None try: data = await _download_and_decrypt_media( - self._session, + self._poll_session, cdn_base_url=self._cdn_base_url, encrypted_query_param=media.get("encrypt_query_param"), aes_key_b64=media.get("aes_key"), @@ -1378,13 +1387,13 @@ class WeixinAdapter(BasePlatformAdapter): return None async def _maybe_fetch_typing_ticket(self, user_id: str, context_token: Optional[str]) -> None: - if not self._session or not self._token: + if not self._poll_session or not self._token: return if self._typing_cache.get(user_id): return try: response = await _get_config( - self._session, + self._poll_session, base_url=self._base_url, token=self._token, user_id=user_id, @@ -1414,7 +1423,7 @@ class WeixinAdapter(BasePlatformAdapter): for attempt in range(self._send_chunk_retries + 1): try: await _send_message( - self._session, + self._send_session, base_url=self._base_url, token=self._token, to=chat_id, @@ -1449,7 +1458,7 @@ class WeixinAdapter(BasePlatformAdapter): reply_to: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None, ) -> SendResult: - if not self._session or not self._token: + if not self._send_session or not self._token: return SendResult(success=False, error="Not connected") context_token = self._token_store.get(self._account_id, chat_id) last_message_id: Optional[str] = None @@ -1508,14 +1517,14 @@ class WeixinAdapter(BasePlatformAdapter): return SendResult(success=False, error=str(exc)) async def send_typing(self, chat_id: str, metadata: Optional[Dict[str, Any]] = None) -> None: - if not self._session or not self._token: + if not self._send_session or not self._token: return typing_ticket = self._typing_cache.get(chat_id) if not typing_ticket: return try: await _send_typing( - self._session, + self._send_session, base_url=self._base_url, token=self._token, to_user_id=chat_id, @@ -1526,14 +1535,14 @@ class WeixinAdapter(BasePlatformAdapter): logger.debug("[%s] typing start failed for %s: %s", self.name, _safe_id(chat_id), exc) async def stop_typing(self, chat_id: str) -> None: - if not self._session or not self._token: + if not self._send_session or not self._token: return typing_ticket = self._typing_cache.get(chat_id) if not typing_ticket: return try: await _send_typing( - self._session, + self._send_session, base_url=self._base_url, token=self._token, to_user_id=chat_id, @@ -1585,7 +1594,7 @@ class WeixinAdapter(BasePlatformAdapter): caption: str = "", metadata: Optional[Dict[str, Any]] = None, ) -> SendResult: - if not self._session or not self._token: + if not self._send_session or not self._token: return SendResult(success=False, error="Not connected") try: message_id = await self._send_file(chat_id, file_path, caption) @@ -1602,7 +1611,7 @@ class WeixinAdapter(BasePlatformAdapter): reply_to: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None, ) -> SendResult: - if not self._session or not self._token: + if not self._send_session or not self._token: return SendResult(success=False, error="Not connected") try: message_id = await self._send_file(chat_id, video_path, caption or "") @@ -1619,7 +1628,7 @@ class WeixinAdapter(BasePlatformAdapter): reply_to: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None, ) -> SendResult: - if not self._session or not self._token: + if not self._send_session or not self._token: return SendResult(success=False, error="Not connected") # Native outbound Weixin voice bubbles are not proven-working in the @@ -1644,8 +1653,8 @@ class WeixinAdapter(BasePlatformAdapter): if not is_safe_url(url): raise ValueError(f"Blocked unsafe URL (SSRF protection): {url}") - assert self._session is not None - async with self._session.get(url, timeout=aiohttp.ClientTimeout(total=30)) as response: + assert self._send_session is not None + async with self._send_session.get(url, timeout=aiohttp.ClientTimeout(total=30)) as response: response.raise_for_status() data = await response.read() suffix = Path(url.split("?", 1)[0]).suffix or ".bin" @@ -1660,7 +1669,7 @@ class WeixinAdapter(BasePlatformAdapter): caption: str, force_file_attachment: bool = False, ) -> str: - assert self._session is not None and self._token is not None + assert self._send_session is not None and self._token is not None plaintext = Path(path).read_bytes() media_type, item_builder = self._outbound_media_builder(path, force_file_attachment=force_file_attachment) filekey = secrets.token_hex(16) @@ -1668,7 +1677,7 @@ class WeixinAdapter(BasePlatformAdapter): rawsize = len(plaintext) rawfilemd5 = hashlib.md5(plaintext).hexdigest() upload_response = await _get_upload_url( - self._session, + self._send_session, base_url=self._base_url, token=self._token, to_user_id=chat_id, @@ -1694,7 +1703,7 @@ class WeixinAdapter(BasePlatformAdapter): raise RuntimeError(f"getUploadUrl returned neither upload_param nor upload_full_url: {upload_response}") encrypted_query_param = await _upload_ciphertext( - self._session, + self._send_session, ciphertext=ciphertext, upload_url=upload_url, ) @@ -1722,7 +1731,7 @@ class WeixinAdapter(BasePlatformAdapter): if caption: last_message_id = f"hermes-weixin-{uuid.uuid4().hex}" await _send_message( - self._session, + self._send_session, base_url=self._base_url, token=self._token, to=chat_id, @@ -1733,7 +1742,7 @@ class WeixinAdapter(BasePlatformAdapter): last_message_id = f"hermes-weixin-{uuid.uuid4().hex}" await _api_post( - self._session, + self._send_session, base_url=self._base_url, endpoint=EP_SEND_MESSAGE, payload={ @@ -1857,6 +1866,33 @@ async def send_weixin_direct( token_store.restore(account_id) context_token = token_store.get(account_id, chat_id) + live_adapter = _LIVE_ADAPTERS.get(resolved_token) + send_session = getattr(live_adapter, '_send_session', None) + if live_adapter is not None and send_session is not None and not send_session.closed: + last_result: Optional[SendResult] = None + cleaned = live_adapter.format_message(message) + if cleaned: + last_result = await live_adapter.send(chat_id, cleaned) + if not last_result.success: + return {"error": f"Weixin send failed: {last_result.error}"} + + for media_path, _is_voice in media_files or []: + ext = Path(media_path).suffix.lower() + if ext in {".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp"}: + last_result = await live_adapter.send_image_file(chat_id, media_path) + else: + last_result = await live_adapter.send_document(chat_id, media_path) + if not last_result.success: + return {"error": f"Weixin media send failed: {last_result.error}"} + + return { + "success": True, + "platform": "weixin", + "chat_id": chat_id, + "message_id": last_result.message_id if last_result else None, + "context_token_used": bool(context_token), + } + async with aiohttp.ClientSession(trust_env=True) as session: adapter = WeixinAdapter( PlatformConfig( @@ -1870,6 +1906,7 @@ async def send_weixin_direct( }, ) ) + adapter._send_session = session adapter._session = session adapter._token = resolved_token adapter._account_id = account_id diff --git a/tests/gateway/test_weixin.py b/tests/gateway/test_weixin.py index 8fc292001..cd911e80b 100644 --- a/tests/gateway/test_weixin.py +++ b/tests/gateway/test_weixin.py @@ -311,6 +311,7 @@ class TestWeixinChunkDelivery: def _connected_adapter(self) -> WeixinAdapter: adapter = _make_adapter() adapter._session = object() + adapter._send_session = adapter._session adapter._token = "test-token" adapter._base_url = "https://weixin.example.com" adapter._token_store.get = lambda account_id, chat_id: "ctx-token" @@ -420,6 +421,7 @@ class TestWeixinBlankMessagePrevention: def test_send_empty_content_does_not_call_send_message(self, send_message_mock): adapter = _make_adapter() adapter._session = object() + adapter._send_session = adapter._session adapter._token = "test-token" adapter._base_url = "https://weixin.example.com" adapter._token_store.get = lambda account_id, chat_id: "ctx-token" diff --git a/tools/send_message_tool.py b/tools/send_message_tool.py index 4bc93ce42..bb2747686 100644 --- a/tools/send_message_tool.py +++ b/tools/send_message_tool.py @@ -215,7 +215,27 @@ def _handle_send(args): pconfig = config.platforms.get(platform) if not pconfig or not pconfig.enabled: - return tool_error(f"Platform '{platform_name}' is not configured. Set up credentials in ~/.hermes/config.yaml or environment variables.") + # Weixin can be configured purely via .env; synthesize a pconfig so + # send_message and cron delivery work without a gateway.yaml entry. + if platform_name == "weixin": + import os + wx_token = os.getenv("WEIXIN_TOKEN", "").strip() + wx_account = os.getenv("WEIXIN_ACCOUNT_ID", "").strip() + if wx_token and wx_account: + from gateway.config import PlatformConfig + pconfig = PlatformConfig( + enabled=True, + token=wx_token, + extra={ + "account_id": wx_account, + "base_url": os.getenv("WEIXIN_BASE_URL", "").strip(), + "cdn_base_url": os.getenv("WEIXIN_CDN_BASE_URL", "").strip(), + }, + ) + else: + return tool_error(f"Platform '{platform_name}' is not configured. Set up credentials in ~/.hermes/config.yaml or environment variables.") + else: + return tool_error(f"Platform '{platform_name}' is not configured. Set up credentials in ~/.hermes/config.yaml or environment variables.") from gateway.platforms.base import BasePlatformAdapter @@ -225,6 +245,12 @@ def _handle_send(args): used_home_channel = False if not chat_id: home = config.get_home_channel(platform) + if not home and platform_name == "weixin": + import os + wx_home = os.getenv("WEIXIN_HOME_CHANNEL", "").strip() + if wx_home: + from gateway.config import HomeChannel + home = HomeChannel(platform=platform, chat_id=wx_home, name="Weixin Home") if home: chat_id = home.chat_id used_home_channel = True