From 5f0caf54d61d526d4183920b41c6be55f0b2b5cf Mon Sep 17 00:00:00 2001 From: chqchshj <62175669+chqchshj@users.noreply.github.com> Date: Sat, 11 Apr 2026 14:25:18 -0700 Subject: [PATCH] feat(gateway): add WeCom callback-mode adapter for self-built apps MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add a second WeCom integration mode for regular enterprise self-built applications. Unlike the existing bot/websocket adapter (wecom.py), this handles WeCom's standard callback flow: WeCom POSTs encrypted XML to an HTTP endpoint, the adapter decrypts, queues for the agent, and immediately acknowledges. The agent's reply is delivered proactively via the message/send API. Key design choice: always acknowledge immediately and use proactive send — agent sessions take 3-30 minutes, so the 5-second inline reply window is never useful. The original PR's Future/pending-reply machinery was removed in favour of this simpler architecture. Features: - AES-CBC encrypt/decrypt (BizMsgCrypt-compatible) - Multi-app routing scoped by corp_id:user_id - Legacy bare user_id fallback for backward compat - Access-token management with auto-refresh - WECOM_CALLBACK_* env var overrides - Port-in-use pre-check before binding - Health endpoint at /health Salvaged from PR #7774 by @chqchshj. Simplified by removing the inline reply Future system and fixing: secrets.choice for nonce generation, immediate plain-text acknowledgment (not encrypted XML containing 'success'), and initial token refresh error handling. --- cron/scheduler.py | 3 +- gateway/config.py | 25 +- gateway/platforms/webhook.py | 1 + gateway/platforms/wecom_callback.py | 387 +++++++++++++++++++++++++++ gateway/platforms/wecom_crypto.py | 142 ++++++++++ gateway/run.py | 16 +- hermes_cli/config.py | 3 + hermes_cli/dump.py | 1 + hermes_cli/gateway.py | 31 +++ tests/gateway/test_wecom_callback.py | 185 +++++++++++++ tools/cronjob_tools.py | 2 +- tools/send_message_tool.py | 1 + toolsets.py | 8 +- 13 files changed, 800 insertions(+), 5 deletions(-) create mode 100644 gateway/platforms/wecom_callback.py create mode 100644 gateway/platforms/wecom_crypto.py create mode 100644 tests/gateway/test_wecom_callback.py diff --git a/cron/scheduler.py b/cron/scheduler.py index 870ebe141..1848cb29a 100644 --- a/cron/scheduler.py +++ b/cron/scheduler.py @@ -44,7 +44,7 @@ logger = logging.getLogger(__name__) _KNOWN_DELIVERY_PLATFORMS = frozenset({ "telegram", "discord", "slack", "whatsapp", "signal", "matrix", "mattermost", "homeassistant", "dingtalk", "feishu", - "wecom", "weixin", "sms", "email", "webhook", "bluebubbles", + "wecom", "wecom_callback", "weixin", "sms", "email", "webhook", "bluebubbles", }) from cron.jobs import get_due_jobs, mark_job_run, save_job_output, advance_next_run @@ -234,6 +234,7 @@ def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> Option "dingtalk": Platform.DINGTALK, "feishu": Platform.FEISHU, "wecom": Platform.WECOM, + "wecom_callback": Platform.WECOM_CALLBACK, "weixin": Platform.WEIXIN, "email": Platform.EMAIL, "sms": Platform.SMS, diff --git a/gateway/config.py b/gateway/config.py index 34ef31d7b..342af9764 100644 --- a/gateway/config.py +++ b/gateway/config.py @@ -63,6 +63,7 @@ class Platform(Enum): WEBHOOK = "webhook" FEISHU = "feishu" WECOM = "wecom" + WECOM_CALLBACK = "wecom_callback" WEIXIN = "weixin" BLUEBUBBLES = "bluebubbles" @@ -291,9 +292,14 @@ class GatewayConfig: # Feishu uses extra dict for app credentials elif platform == Platform.FEISHU and config.extra.get("app_id"): connected.append(platform) - # WeCom uses extra dict for bot credentials + # WeCom bot mode uses extra dict for bot credentials elif platform == Platform.WECOM and config.extra.get("bot_id"): connected.append(platform) + # WeCom callback mode uses corp_id or apps list + elif platform == Platform.WECOM_CALLBACK and ( + config.extra.get("corp_id") or config.extra.get("apps") + ): + connected.append(platform) # BlueBubbles uses extra dict for local server config elif platform == Platform.BLUEBUBBLES and config.extra.get("server_url") and config.extra.get("password"): connected.append(platform) @@ -987,6 +993,23 @@ def _apply_env_overrides(config: GatewayConfig) -> None: name=os.getenv("WECOM_HOME_CHANNEL_NAME", "Home"), ) + # WeCom callback mode (self-built apps) + wecom_callback_corp_id = os.getenv("WECOM_CALLBACK_CORP_ID") + wecom_callback_corp_secret = os.getenv("WECOM_CALLBACK_CORP_SECRET") + if wecom_callback_corp_id and wecom_callback_corp_secret: + if Platform.WECOM_CALLBACK not in config.platforms: + config.platforms[Platform.WECOM_CALLBACK] = PlatformConfig() + config.platforms[Platform.WECOM_CALLBACK].enabled = True + config.platforms[Platform.WECOM_CALLBACK].extra.update({ + "corp_id": wecom_callback_corp_id, + "corp_secret": wecom_callback_corp_secret, + "agent_id": os.getenv("WECOM_CALLBACK_AGENT_ID", ""), + "token": os.getenv("WECOM_CALLBACK_TOKEN", ""), + "encoding_aes_key": os.getenv("WECOM_CALLBACK_ENCODING_AES_KEY", ""), + "host": os.getenv("WECOM_CALLBACK_HOST", "0.0.0.0"), + "port": int(os.getenv("WECOM_CALLBACK_PORT", "8645")), + }) + # Weixin (personal WeChat via iLink Bot API) weixin_token = os.getenv("WEIXIN_TOKEN") weixin_account_id = os.getenv("WEIXIN_ACCOUNT_ID") diff --git a/gateway/platforms/webhook.py b/gateway/platforms/webhook.py index bb874f8f5..dfe7a70f3 100644 --- a/gateway/platforms/webhook.py +++ b/gateway/platforms/webhook.py @@ -201,6 +201,7 @@ class WebhookAdapter(BasePlatformAdapter): "dingtalk", "feishu", "wecom", + "wecom_callback", "weixin", "bluebubbles", ): diff --git a/gateway/platforms/wecom_callback.py b/gateway/platforms/wecom_callback.py new file mode 100644 index 000000000..4bb67d5cf --- /dev/null +++ b/gateway/platforms/wecom_callback.py @@ -0,0 +1,387 @@ +"""WeCom callback-mode adapter for self-built enterprise applications. + +Unlike the bot/websocket adapter in ``wecom.py``, this handles the standard +WeCom callback flow: WeCom POSTs encrypted XML to an HTTP endpoint, the +adapter decrypts it, queues the message for the agent, and immediately +acknowledges. The agent's reply is delivered later via the proactive +``message/send`` API using an access-token. + +Supports multiple self-built apps under one gateway instance, scoped by +``corp_id:user_id`` to avoid cross-corp collisions. +""" + +from __future__ import annotations + +import asyncio +import logging +import socket as _socket +import time +from typing import Any, Dict, List, Optional +from xml.etree import ElementTree as ET + +try: + from aiohttp import web + + AIOHTTP_AVAILABLE = True +except ImportError: + web = None # type: ignore[assignment] + AIOHTTP_AVAILABLE = False + +try: + import httpx + + HTTPX_AVAILABLE = True +except ImportError: + httpx = None # type: ignore[assignment] + HTTPX_AVAILABLE = False + +from gateway.config import Platform, PlatformConfig +from gateway.platforms.base import BasePlatformAdapter, MessageEvent, MessageType, SendResult +from gateway.platforms.wecom_crypto import WXBizMsgCrypt, WeComCryptoError + +logger = logging.getLogger(__name__) + +DEFAULT_HOST = "0.0.0.0" +DEFAULT_PORT = 8645 +DEFAULT_PATH = "/wecom/callback" +ACCESS_TOKEN_TTL_SECONDS = 7200 +MESSAGE_DEDUP_TTL_SECONDS = 300 + + +def check_wecom_callback_requirements() -> bool: + return AIOHTTP_AVAILABLE and HTTPX_AVAILABLE + + +class WecomCallbackAdapter(BasePlatformAdapter): + def __init__(self, config: PlatformConfig): + super().__init__(config, Platform.WECOM_CALLBACK) + extra = config.extra or {} + self._host = str(extra.get("host") or DEFAULT_HOST) + self._port = int(extra.get("port") or DEFAULT_PORT) + self._path = str(extra.get("path") or DEFAULT_PATH) + self._apps: List[Dict[str, Any]] = self._normalize_apps(extra) + self._runner: Optional[web.AppRunner] = None + self._site: Optional[web.TCPSite] = None + self._app: Optional[web.Application] = None + self._http_client: Optional[httpx.AsyncClient] = None + self._message_queue: asyncio.Queue[MessageEvent] = asyncio.Queue() + self._poll_task: Optional[asyncio.Task] = None + self._seen_messages: Dict[str, float] = {} + self._user_app_map: Dict[str, str] = {} + self._access_tokens: Dict[str, Dict[str, Any]] = {} + + # ------------------------------------------------------------------ + # App normalisation + # ------------------------------------------------------------------ + + @staticmethod + def _user_app_key(corp_id: str, user_id: str) -> str: + return f"{corp_id}:{user_id}" if corp_id else user_id + + @staticmethod + def _normalize_apps(extra: Dict[str, Any]) -> List[Dict[str, Any]]: + apps = extra.get("apps") + if isinstance(apps, list) and apps: + return [dict(app) for app in apps if isinstance(app, dict)] + if extra.get("corp_id"): + return [ + { + "name": extra.get("name") or "default", + "corp_id": extra.get("corp_id", ""), + "corp_secret": extra.get("corp_secret", ""), + "agent_id": str(extra.get("agent_id", "")), + "token": extra.get("token", ""), + "encoding_aes_key": extra.get("encoding_aes_key", ""), + } + ] + return [] + + # ------------------------------------------------------------------ + # Lifecycle + # ------------------------------------------------------------------ + + async def connect(self) -> bool: + if not self._apps: + logger.warning("[WecomCallback] No callback apps configured") + return False + if not check_wecom_callback_requirements(): + logger.warning("[WecomCallback] aiohttp/httpx not installed") + return False + + # Quick port-in-use check. + try: + with _socket.socket(_socket.AF_INET, _socket.SOCK_STREAM) as sock: + sock.settimeout(1) + sock.connect(("127.0.0.1", self._port)) + logger.error("[WecomCallback] Port %d already in use", self._port) + return False + except (ConnectionRefusedError, OSError): + pass + + try: + self._http_client = httpx.AsyncClient(timeout=20.0) + self._app = web.Application() + self._app.router.add_get("/health", self._handle_health) + self._app.router.add_get(self._path, self._handle_verify) + self._app.router.add_post(self._path, self._handle_callback) + self._runner = web.AppRunner(self._app) + await self._runner.setup() + self._site = web.TCPSite(self._runner, self._host, self._port) + await self._site.start() + self._poll_task = asyncio.create_task(self._poll_loop()) + self._mark_connected() + logger.info( + "[WecomCallback] HTTP server listening on %s:%s%s", + self._host, self._port, self._path, + ) + for app in self._apps: + try: + await self._refresh_access_token(app) + except Exception as exc: + logger.warning( + "[WecomCallback] Initial token refresh failed for app '%s': %s", + app.get("name", "default"), exc, + ) + return True + except Exception: + await self._cleanup() + logger.exception("[WecomCallback] Failed to start") + return False + + async def disconnect(self) -> None: + self._running = False + if self._poll_task: + self._poll_task.cancel() + try: + await self._poll_task + except asyncio.CancelledError: + pass + self._poll_task = None + await self._cleanup() + self._mark_disconnected() + logger.info("[WecomCallback] Disconnected") + + async def _cleanup(self) -> None: + self._site = None + if self._runner: + await self._runner.cleanup() + self._runner = None + self._app = None + if self._http_client: + await self._http_client.aclose() + self._http_client = None + + # ------------------------------------------------------------------ + # Outbound: proactive send via access-token API + # ------------------------------------------------------------------ + + async def send( + self, + chat_id: str, + content: str, + reply_to: Optional[str] = None, + metadata: Optional[Dict[str, Any]] = None, + ) -> SendResult: + app = self._resolve_app_for_chat(chat_id) + touser = chat_id.split(":", 1)[1] if ":" in chat_id else chat_id + try: + token = await self._get_access_token(app) + payload = { + "touser": touser, + "msgtype": "text", + "agentid": int(str(app.get("agent_id") or 0)), + "text": {"content": content[:2048]}, + "safe": 0, + } + resp = await self._http_client.post( + f"https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={token}", + json=payload, + ) + data = resp.json() + if data.get("errcode") != 0: + return SendResult(success=False, error=str(data)) + return SendResult( + success=True, + message_id=str(data.get("msgid", "")), + raw_response=data, + ) + except Exception as exc: + return SendResult(success=False, error=str(exc)) + + def _resolve_app_for_chat(self, chat_id: str) -> Dict[str, Any]: + """Pick the app associated with *chat_id*, falling back sensibly.""" + app_name = self._user_app_map.get(chat_id) + if not app_name and ":" not in chat_id: + # Legacy bare user_id — try to find a unique match. + matching = [k for k in self._user_app_map if k.endswith(f":{chat_id}")] + if len(matching) == 1: + app_name = self._user_app_map.get(matching[0]) + app = self._get_app_by_name(app_name) if app_name else None + return app or self._apps[0] + + async def get_chat_info(self, chat_id: str) -> Dict[str, Any]: + return {"name": chat_id, "type": "dm"} + + # ------------------------------------------------------------------ + # Inbound: HTTP callback handlers + # ------------------------------------------------------------------ + + async def _handle_health(self, request: web.Request) -> web.Response: + return web.json_response({"status": "ok", "platform": "wecom_callback"}) + + async def _handle_verify(self, request: web.Request) -> web.Response: + """GET endpoint — WeCom URL verification handshake.""" + msg_signature = request.query.get("msg_signature", "") + timestamp = request.query.get("timestamp", "") + nonce = request.query.get("nonce", "") + echostr = request.query.get("echostr", "") + for app in self._apps: + try: + crypt = self._crypt_for_app(app) + plain = crypt.verify_url(msg_signature, timestamp, nonce, echostr) + return web.Response(text=plain, content_type="text/plain") + except Exception: + continue + return web.Response(status=403, text="signature verification failed") + + async def _handle_callback(self, request: web.Request) -> web.Response: + """POST endpoint — receive an encrypted message callback.""" + msg_signature = request.query.get("msg_signature", "") + timestamp = request.query.get("timestamp", "") + nonce = request.query.get("nonce", "") + body = await request.text() + + for app in self._apps: + try: + decrypted = self._decrypt_request( + app, body, msg_signature, timestamp, nonce, + ) + event = self._build_event(app, decrypted) + if event is not None: + # Record which app this user belongs to. + if event.source and event.source.user_id: + map_key = self._user_app_key( + str(app.get("corp_id") or ""), event.source.user_id, + ) + self._user_app_map[map_key] = app["name"] + await self._message_queue.put(event) + # Immediately acknowledge — the agent's reply will arrive + # later via the proactive message/send API. + return web.Response(text="success", content_type="text/plain") + except WeComCryptoError: + continue + except Exception: + logger.exception("[WecomCallback] Error handling message") + break + return web.Response(status=400, text="invalid callback payload") + + async def _poll_loop(self) -> None: + """Drain the message queue and dispatch to the gateway runner.""" + while True: + event = await self._message_queue.get() + try: + task = asyncio.create_task(self.handle_message(event)) + self._background_tasks.add(task) + task.add_done_callback(self._background_tasks.discard) + except Exception: + logger.exception("[WecomCallback] Failed to enqueue event") + + # ------------------------------------------------------------------ + # XML / crypto helpers + # ------------------------------------------------------------------ + + def _decrypt_request( + self, app: Dict[str, Any], body: str, + msg_signature: str, timestamp: str, nonce: str, + ) -> str: + root = ET.fromstring(body) + encrypt = root.findtext("Encrypt", default="") + crypt = self._crypt_for_app(app) + return crypt.decrypt(msg_signature, timestamp, nonce, encrypt).decode("utf-8") + + def _build_event(self, app: Dict[str, Any], xml_text: str) -> Optional[MessageEvent]: + root = ET.fromstring(xml_text) + msg_type = (root.findtext("MsgType") or "").lower() + # Silently acknowledge lifecycle events. + if msg_type == "event": + event_name = (root.findtext("Event") or "").lower() + if event_name in {"enter_agent", "subscribe"}: + return None + if msg_type not in {"text", "event"}: + return None + + user_id = root.findtext("FromUserName", default="") + corp_id = root.findtext("ToUserName", default=app.get("corp_id", "")) + scoped_chat_id = self._user_app_key(corp_id, user_id) + content = root.findtext("Content", default="").strip() + if not content and msg_type == "event": + content = "/start" + msg_id = ( + root.findtext("MsgId") + or f"{user_id}:{root.findtext('CreateTime', default='0')}" + ) + source = self.build_source( + chat_id=scoped_chat_id, + chat_name=user_id, + chat_type="dm", + user_id=user_id, + user_name=user_id, + ) + return MessageEvent( + text=content, + message_type=MessageType.TEXT, + source=source, + raw_message=xml_text, + message_id=msg_id, + ) + + def _crypt_for_app(self, app: Dict[str, Any]) -> WXBizMsgCrypt: + return WXBizMsgCrypt( + token=str(app.get("token") or ""), + encoding_aes_key=str(app.get("encoding_aes_key") or ""), + receive_id=str(app.get("corp_id") or ""), + ) + + def _get_app_by_name(self, name: Optional[str]) -> Optional[Dict[str, Any]]: + if not name: + return None + for app in self._apps: + if app.get("name") == name: + return app + return None + + # ------------------------------------------------------------------ + # Access-token management + # ------------------------------------------------------------------ + + async def _get_access_token(self, app: Dict[str, Any]) -> str: + cached = self._access_tokens.get(app["name"]) + now = time.time() + if cached and cached.get("expires_at", 0) > now + 60: + return cached["token"] + return await self._refresh_access_token(app) + + async def _refresh_access_token(self, app: Dict[str, Any]) -> str: + resp = await self._http_client.get( + "https://qyapi.weixin.qq.com/cgi-bin/gettoken", + params={ + "corpid": app.get("corp_id"), + "corpsecret": app.get("corp_secret"), + }, + ) + data = resp.json() + if data.get("errcode") != 0: + raise RuntimeError(f"WeCom token refresh failed: {data}") + token = data["access_token"] + expires_in = int(data.get("expires_in", ACCESS_TOKEN_TTL_SECONDS)) + self._access_tokens[app["name"]] = { + "token": token, + "expires_at": time.time() + expires_in, + } + logger.info( + "[WecomCallback] Token refreshed for app '%s' (corp=%s), expires in %ss", + app.get("name", "default"), + app.get("corp_id", ""), + expires_in, + ) + return token diff --git a/gateway/platforms/wecom_crypto.py b/gateway/platforms/wecom_crypto.py new file mode 100644 index 000000000..f984ca80c --- /dev/null +++ b/gateway/platforms/wecom_crypto.py @@ -0,0 +1,142 @@ +"""WeCom BizMsgCrypt-compatible AES-CBC encryption for callback mode. + +Implements the same wire format as Tencent's official ``WXBizMsgCrypt`` +SDK so that WeCom can verify, encrypt, and decrypt callback payloads. +""" + +from __future__ import annotations + +import base64 +import hashlib +import os +import secrets +import socket +import struct +from typing import Optional +from xml.etree import ElementTree as ET + +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes + + +class WeComCryptoError(Exception): + pass + + +class SignatureError(WeComCryptoError): + pass + + +class DecryptError(WeComCryptoError): + pass + + +class EncryptError(WeComCryptoError): + pass + + +class PKCS7Encoder: + block_size = 32 + + @classmethod + def encode(cls, text: bytes) -> bytes: + amount_to_pad = cls.block_size - (len(text) % cls.block_size) + if amount_to_pad == 0: + amount_to_pad = cls.block_size + pad = bytes([amount_to_pad]) * amount_to_pad + return text + pad + + @classmethod + def decode(cls, decrypted: bytes) -> bytes: + if not decrypted: + raise DecryptError("empty decrypted payload") + pad = decrypted[-1] + if pad < 1 or pad > cls.block_size: + raise DecryptError("invalid PKCS7 padding") + if decrypted[-pad:] != bytes([pad]) * pad: + raise DecryptError("malformed PKCS7 padding") + return decrypted[:-pad] + + +def _sha1_signature(token: str, timestamp: str, nonce: str, encrypt: str) -> str: + parts = sorted([token, timestamp, nonce, encrypt]) + return hashlib.sha1("".join(parts).encode("utf-8")).hexdigest() + + +class WXBizMsgCrypt: + """Minimal WeCom callback crypto helper compatible with BizMsgCrypt semantics.""" + + def __init__(self, token: str, encoding_aes_key: str, receive_id: str): + if not token: + raise ValueError("token is required") + if not encoding_aes_key: + raise ValueError("encoding_aes_key is required") + if len(encoding_aes_key) != 43: + raise ValueError("encoding_aes_key must be 43 chars") + if not receive_id: + raise ValueError("receive_id is required") + + self.token = token + self.receive_id = receive_id + self.key = base64.b64decode(encoding_aes_key + "=") + self.iv = self.key[:16] + + def verify_url(self, msg_signature: str, timestamp: str, nonce: str, echostr: str) -> str: + plain = self.decrypt(msg_signature, timestamp, nonce, echostr) + return plain.decode("utf-8") + + def decrypt(self, msg_signature: str, timestamp: str, nonce: str, encrypt: str) -> bytes: + expected = _sha1_signature(self.token, timestamp, nonce, encrypt) + if expected != msg_signature: + raise SignatureError("signature mismatch") + try: + cipher_text = base64.b64decode(encrypt) + except Exception as exc: + raise DecryptError(f"invalid base64 payload: {exc}") from exc + try: + cipher = Cipher(algorithms.AES(self.key), modes.CBC(self.iv), backend=default_backend()) + decryptor = cipher.decryptor() + padded = decryptor.update(cipher_text) + decryptor.finalize() + plain = PKCS7Encoder.decode(padded) + content = plain[16:] # skip 16-byte random prefix + xml_length = socket.ntohl(struct.unpack("I", content[:4])[0]) + xml_content = content[4:4 + xml_length] + receive_id = content[4 + xml_length:].decode("utf-8") + except WeComCryptoError: + raise + except Exception as exc: + raise DecryptError(f"decrypt failed: {exc}") from exc + + if receive_id != self.receive_id: + raise DecryptError("receive_id mismatch") + return xml_content + + def encrypt(self, plaintext: str, nonce: Optional[str] = None, timestamp: Optional[str] = None) -> str: + nonce = nonce or self._random_nonce() + timestamp = timestamp or str(int(__import__("time").time())) + encrypt = self._encrypt_bytes(plaintext.encode("utf-8")) + signature = _sha1_signature(self.token, timestamp, nonce, encrypt) + root = ET.Element("xml") + ET.SubElement(root, "Encrypt").text = encrypt + ET.SubElement(root, "MsgSignature").text = signature + ET.SubElement(root, "TimeStamp").text = timestamp + ET.SubElement(root, "Nonce").text = nonce + return ET.tostring(root, encoding="unicode") + + def _encrypt_bytes(self, raw: bytes) -> str: + try: + random_prefix = os.urandom(16) + msg_len = struct.pack("I", socket.htonl(len(raw))) + payload = random_prefix + msg_len + raw + self.receive_id.encode("utf-8") + padded = PKCS7Encoder.encode(payload) + cipher = Cipher(algorithms.AES(self.key), modes.CBC(self.iv), backend=default_backend()) + encryptor = cipher.encryptor() + encrypted = encryptor.update(padded) + encryptor.finalize() + return base64.b64encode(encrypted).decode("utf-8") + except Exception as exc: + raise EncryptError(f"encrypt failed: {exc}") from exc + + @staticmethod + def _random_nonce(length: int = 10) -> str: + alphabet = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ" + return "".join(secrets.choice(alphabet) for _ in range(length)) diff --git a/gateway/run.py b/gateway/run.py index 5e1ed0e86..9434c9e5f 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -1426,6 +1426,7 @@ class GatewayRunner: "MATRIX_ALLOWED_USERS", "DINGTALK_ALLOWED_USERS", "FEISHU_ALLOWED_USERS", "WECOM_ALLOWED_USERS", + "WECOM_CALLBACK_ALLOWED_USERS", "WEIXIN_ALLOWED_USERS", "BLUEBUBBLES_ALLOWED_USERS", "GATEWAY_ALLOWED_USERS") @@ -1439,6 +1440,7 @@ class GatewayRunner: "MATRIX_ALLOW_ALL_USERS", "DINGTALK_ALLOW_ALL_USERS", "FEISHU_ALLOW_ALL_USERS", "WECOM_ALLOW_ALL_USERS", + "WECOM_CALLBACK_ALLOW_ALL_USERS", "WEIXIN_ALLOW_ALL_USERS", "BLUEBUBBLES_ALLOW_ALL_USERS") ) @@ -2043,6 +2045,16 @@ class GatewayRunner: return None return FeishuAdapter(config) + elif platform == Platform.WECOM_CALLBACK: + from gateway.platforms.wecom_callback import ( + WecomCallbackAdapter, + check_wecom_callback_requirements, + ) + if not check_wecom_callback_requirements(): + logger.warning("WeComCallback: aiohttp/httpx not installed") + return None + return WecomCallbackAdapter(config) + elif platform == Platform.WECOM: from gateway.platforms.wecom import WeComAdapter, check_wecom_requirements if not check_wecom_requirements(): @@ -2132,6 +2144,7 @@ class GatewayRunner: Platform.DINGTALK: "DINGTALK_ALLOWED_USERS", Platform.FEISHU: "FEISHU_ALLOWED_USERS", Platform.WECOM: "WECOM_ALLOWED_USERS", + Platform.WECOM_CALLBACK: "WECOM_CALLBACK_ALLOWED_USERS", Platform.WEIXIN: "WEIXIN_ALLOWED_USERS", Platform.BLUEBUBBLES: "BLUEBUBBLES_ALLOWED_USERS", } @@ -2148,6 +2161,7 @@ class GatewayRunner: Platform.DINGTALK: "DINGTALK_ALLOW_ALL_USERS", Platform.FEISHU: "FEISHU_ALLOW_ALL_USERS", Platform.WECOM: "WECOM_ALLOW_ALL_USERS", + Platform.WECOM_CALLBACK: "WECOM_CALLBACK_ALLOW_ALL_USERS", Platform.WEIXIN: "WEIXIN_ALLOW_ALL_USERS", Platform.BLUEBUBBLES: "BLUEBUBBLES_ALLOW_ALL_USERS", } @@ -6218,7 +6232,7 @@ class GatewayRunner: Platform.TELEGRAM, Platform.DISCORD, Platform.SLACK, Platform.WHATSAPP, Platform.SIGNAL, Platform.MATTERMOST, Platform.MATRIX, Platform.HOMEASSISTANT, Platform.EMAIL, Platform.SMS, Platform.DINGTALK, - Platform.FEISHU, Platform.WECOM, Platform.WEIXIN, Platform.BLUEBUBBLES, Platform.LOCAL, + Platform.FEISHU, Platform.WECOM, Platform.WECOM_CALLBACK, Platform.WEIXIN, Platform.BLUEBUBBLES, Platform.LOCAL, }) async def _handle_update_command(self, event: MessageEvent) -> str: diff --git a/hermes_cli/config.py b/hermes_cli/config.py index 1545d15aa..f551a195d 100644 --- a/hermes_cli/config.py +++ b/hermes_cli/config.py @@ -38,6 +38,9 @@ _EXTRA_ENV_KEYS = frozenset({ "DINGTALK_CLIENT_ID", "DINGTALK_CLIENT_SECRET", "FEISHU_APP_ID", "FEISHU_APP_SECRET", "FEISHU_ENCRYPT_KEY", "FEISHU_VERIFICATION_TOKEN", "WECOM_BOT_ID", "WECOM_SECRET", + "WECOM_CALLBACK_CORP_ID", "WECOM_CALLBACK_CORP_SECRET", "WECOM_CALLBACK_AGENT_ID", + "WECOM_CALLBACK_TOKEN", "WECOM_CALLBACK_ENCODING_AES_KEY", + "WECOM_CALLBACK_HOST", "WECOM_CALLBACK_PORT", "WEIXIN_ACCOUNT_ID", "WEIXIN_TOKEN", "WEIXIN_BASE_URL", "WEIXIN_CDN_BASE_URL", "WEIXIN_HOME_CHANNEL", "WEIXIN_HOME_CHANNEL_NAME", "WEIXIN_DM_POLICY", "WEIXIN_GROUP_POLICY", "WEIXIN_ALLOWED_USERS", "WEIXIN_GROUP_ALLOWED_USERS", "WEIXIN_ALLOW_ALL_USERS", diff --git a/hermes_cli/dump.py b/hermes_cli/dump.py index 00441c0cc..caa6b7e8c 100644 --- a/hermes_cli/dump.py +++ b/hermes_cli/dump.py @@ -119,6 +119,7 @@ def _configured_platforms() -> list[str]: "dingtalk": "DINGTALK_CLIENT_ID", "feishu": "FEISHU_APP_ID", "wecom": "WECOM_BOT_ID", + "wecom_callback": "WECOM_CALLBACK_CORP_ID", "weixin": "WEIXIN_ACCOUNT_ID", } return [name for name, env in checks.items() if os.getenv(env)] diff --git a/hermes_cli/gateway.py b/hermes_cli/gateway.py index 505bad0b5..908d8992a 100644 --- a/hermes_cli/gateway.py +++ b/hermes_cli/gateway.py @@ -1821,6 +1821,37 @@ _PLATFORMS = [ "help": "Chat ID for scheduled results and notifications."}, ], }, + { + "key": "wecom_callback", + "label": "WeCom Callback (Self-Built App)", + "emoji": "💬", + "token_var": "WECOM_CALLBACK_CORP_ID", + "setup_instructions": [ + "1. Go to WeCom Admin Console → Applications → Create Self-Built App", + "2. Note the Corp ID (top of admin console) and create a Corp Secret", + "3. Under Receive Messages, configure the callback URL to point to your server", + "4. Copy the Token and EncodingAESKey from the callback configuration", + "5. The adapter runs an HTTP server — ensure the port is reachable from WeCom", + "6. Restrict access with WECOM_CALLBACK_ALLOWED_USERS for production use", + ], + "vars": [ + {"name": "WECOM_CALLBACK_CORP_ID", "prompt": "Corp ID", "password": False, + "help": "Your WeCom enterprise Corp ID."}, + {"name": "WECOM_CALLBACK_CORP_SECRET", "prompt": "Corp Secret", "password": True, + "help": "The secret for your self-built application."}, + {"name": "WECOM_CALLBACK_AGENT_ID", "prompt": "Agent ID", "password": False, + "help": "The Agent ID of your self-built application."}, + {"name": "WECOM_CALLBACK_TOKEN", "prompt": "Callback Token", "password": True, + "help": "The Token from your WeCom callback configuration."}, + {"name": "WECOM_CALLBACK_ENCODING_AES_KEY", "prompt": "Encoding AES Key", "password": True, + "help": "The EncodingAESKey from your WeCom callback configuration."}, + {"name": "WECOM_CALLBACK_PORT", "prompt": "Callback server port (default: 8645)", "password": False, + "help": "Port for the HTTP callback server."}, + {"name": "WECOM_CALLBACK_ALLOWED_USERS", "prompt": "Allowed user IDs (comma-separated, or empty)", "password": False, + "is_allowlist": True, + "help": "Restrict which WeCom users can interact with the app."}, + ], + }, { "key": "weixin", "label": "Weixin / WeChat", diff --git a/tests/gateway/test_wecom_callback.py b/tests/gateway/test_wecom_callback.py new file mode 100644 index 000000000..88c084ae3 --- /dev/null +++ b/tests/gateway/test_wecom_callback.py @@ -0,0 +1,185 @@ +"""Tests for the WeCom callback-mode adapter.""" + +import asyncio +from xml.etree import ElementTree as ET + +import pytest + +from gateway.config import PlatformConfig +from gateway.platforms.wecom_callback import WecomCallbackAdapter +from gateway.platforms.wecom_crypto import WXBizMsgCrypt + + +def _app(name="test-app", corp_id="ww1234567890", agent_id="1000002"): + return { + "name": name, + "corp_id": corp_id, + "corp_secret": "test-secret", + "agent_id": agent_id, + "token": "test-callback-token", + "encoding_aes_key": "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFG", + } + + +def _config(apps=None): + return PlatformConfig( + enabled=True, + extra={"mode": "callback", "host": "127.0.0.1", "port": 0, "apps": apps or [_app()]}, + ) + + +class TestWecomCrypto: + def test_roundtrip_encrypt_decrypt(self): + app = _app() + crypt = WXBizMsgCrypt(app["token"], app["encoding_aes_key"], app["corp_id"]) + encrypted_xml = crypt.encrypt( + "hello", nonce="nonce123", timestamp="123456", + ) + root = ET.fromstring(encrypted_xml) + decrypted = crypt.decrypt( + root.findtext("MsgSignature", default=""), + root.findtext("TimeStamp", default=""), + root.findtext("Nonce", default=""), + root.findtext("Encrypt", default=""), + ) + assert b"hello" in decrypted + + def test_signature_mismatch_raises(self): + app = _app() + crypt = WXBizMsgCrypt(app["token"], app["encoding_aes_key"], app["corp_id"]) + encrypted_xml = crypt.encrypt("", nonce="n", timestamp="1") + root = ET.fromstring(encrypted_xml) + from gateway.platforms.wecom_crypto import SignatureError + with pytest.raises(SignatureError): + crypt.decrypt("bad-sig", "1", "n", root.findtext("Encrypt", default="")) + + +class TestWecomCallbackEventConstruction: + def test_build_event_extracts_text_message(self): + adapter = WecomCallbackAdapter(_config()) + xml_text = """ + + ww1234567890 + zhangsan + 1710000000 + text + \u4f60\u597d + 123456789 + + """ + event = adapter._build_event(_app(), xml_text) + assert event is not None + assert event.source is not None + assert event.source.user_id == "zhangsan" + assert event.source.chat_id == "ww1234567890:zhangsan" + assert event.message_id == "123456789" + assert event.text == "\u4f60\u597d" + + def test_build_event_returns_none_for_subscribe(self): + adapter = WecomCallbackAdapter(_config()) + xml_text = """ + + ww1234567890 + zhangsan + 1710000000 + event + subscribe + + """ + event = adapter._build_event(_app(), xml_text) + assert event is None + + +class TestWecomCallbackRouting: + def test_user_app_key_scopes_across_corps(self): + adapter = WecomCallbackAdapter(_config()) + assert adapter._user_app_key("corpA", "alice") == "corpA:alice" + assert adapter._user_app_key("corpB", "alice") == "corpB:alice" + assert adapter._user_app_key("corpA", "alice") != adapter._user_app_key("corpB", "alice") + + @pytest.mark.asyncio + async def test_send_selects_correct_app_for_scoped_chat_id(self): + apps = [ + _app(name="corp-a", corp_id="corpA", agent_id="1001"), + _app(name="corp-b", corp_id="corpB", agent_id="2002"), + ] + adapter = WecomCallbackAdapter(_config(apps=apps)) + adapter._user_app_map["corpB:alice"] = "corp-b" + adapter._access_tokens["corp-b"] = {"token": "tok-b", "expires_at": 9999999999} + + calls = {} + + class FakeResponse: + def json(self): + return {"errcode": 0, "msgid": "ok1"} + + class FakeClient: + async def post(self, url, json): + calls["url"] = url + calls["json"] = json + return FakeResponse() + + adapter._http_client = FakeClient() + result = await adapter.send("corpB:alice", "hello") + + assert result.success is True + assert calls["json"]["touser"] == "alice" + assert calls["json"]["agentid"] == 2002 + assert "tok-b" in calls["url"] + + @pytest.mark.asyncio + async def test_send_falls_back_from_bare_user_id_when_unique(self): + apps = [_app(name="corp-a", corp_id="corpA", agent_id="1001")] + adapter = WecomCallbackAdapter(_config(apps=apps)) + adapter._user_app_map["corpA:alice"] = "corp-a" + adapter._access_tokens["corp-a"] = {"token": "tok-a", "expires_at": 9999999999} + + calls = {} + + class FakeResponse: + def json(self): + return {"errcode": 0, "msgid": "ok2"} + + class FakeClient: + async def post(self, url, json): + calls["url"] = url + calls["json"] = json + return FakeResponse() + + adapter._http_client = FakeClient() + result = await adapter.send("alice", "hello") + + assert result.success is True + assert calls["json"]["agentid"] == 1001 + + +class TestWecomCallbackPollLoop: + @pytest.mark.asyncio + async def test_poll_loop_dispatches_handle_message(self, monkeypatch): + adapter = WecomCallbackAdapter(_config()) + calls = [] + + async def fake_handle_message(event): + calls.append(event.text) + + monkeypatch.setattr(adapter, "handle_message", fake_handle_message) + event = adapter._build_event( + _app(), + """ + + ww1234567890 + lisi + 1710000000 + text + test + m2 + + """, + ) + task = asyncio.create_task(adapter._poll_loop()) + await adapter._message_queue.put(event) + await asyncio.sleep(0.05) + task.cancel() + with pytest.raises(asyncio.CancelledError): + await task + assert calls == ["test"] diff --git a/tools/cronjob_tools.py b/tools/cronjob_tools.py index e2db93381..80c88e353 100644 --- a/tools/cronjob_tools.py +++ b/tools/cronjob_tools.py @@ -456,7 +456,7 @@ Important safety rule: cron-run sessions should not recursively schedule more cr }, "deliver": { "type": "string", - "description": "Delivery target: origin, local, telegram, discord, slack, whatsapp, signal, weixin, matrix, mattermost, homeassistant, dingtalk, feishu, wecom, email, sms, bluebubbles, or platform:chat_id or platform:chat_id:thread_id for Telegram topics. Examples: 'origin', 'local', 'telegram', 'telegram:-1001234567890:17585', 'discord:#engineering'" + "description": "Delivery target: origin, local, telegram, discord, slack, whatsapp, signal, weixin, matrix, mattermost, homeassistant, dingtalk, feishu, wecom, wecom_callback, email, sms, bluebubbles, or platform:chat_id or platform:chat_id:thread_id for Telegram topics. Examples: 'origin', 'local', 'telegram', 'telegram:-1001234567890:17585', 'discord:#engineering'" }, "skills": { "type": "array", diff --git a/tools/send_message_tool.py b/tools/send_message_tool.py index 0287b5e04..60503c0bc 100644 --- a/tools/send_message_tool.py +++ b/tools/send_message_tool.py @@ -158,6 +158,7 @@ def _handle_send(args): "dingtalk": Platform.DINGTALK, "feishu": Platform.FEISHU, "wecom": Platform.WECOM, + "wecom_callback": Platform.WECOM_CALLBACK, "weixin": Platform.WEIXIN, "email": Platform.EMAIL, "sms": Platform.SMS, diff --git a/toolsets.py b/toolsets.py index 6fbc963e6..57e03d250 100644 --- a/toolsets.py +++ b/toolsets.py @@ -365,6 +365,12 @@ TOOLSETS = { "includes": [] }, + "hermes-wecom-callback": { + "description": "WeCom callback toolset - enterprise self-built app messaging (full access)", + "tools": _HERMES_CORE_TOOLS, + "includes": [] + }, + "hermes-sms": { "description": "SMS bot toolset - interact with Hermes via SMS (Twilio)", "tools": _HERMES_CORE_TOOLS, @@ -380,7 +386,7 @@ TOOLSETS = { "hermes-gateway": { "description": "Gateway toolset - union of all messaging platform tools", "tools": [], - "includes": ["hermes-telegram", "hermes-discord", "hermes-whatsapp", "hermes-slack", "hermes-signal", "hermes-bluebubbles", "hermes-homeassistant", "hermes-email", "hermes-sms", "hermes-mattermost", "hermes-matrix", "hermes-dingtalk", "hermes-feishu", "hermes-wecom", "hermes-weixin", "hermes-webhook"] + "includes": ["hermes-telegram", "hermes-discord", "hermes-whatsapp", "hermes-slack", "hermes-signal", "hermes-bluebubbles", "hermes-homeassistant", "hermes-email", "hermes-sms", "hermes-mattermost", "hermes-matrix", "hermes-dingtalk", "hermes-feishu", "hermes-wecom", "hermes-wecom-callback", "hermes-weixin", "hermes-webhook"] } }