diff --git a/cli-config.yaml.example b/cli-config.yaml.example index 789c5481a..5362e341b 100644 --- a/cli-config.yaml.example +++ b/cli-config.yaml.example @@ -552,6 +552,7 @@ agent: # slack: hermes-slack (same as telegram) # signal: hermes-signal (same as telegram) # homeassistant: hermes-homeassistant (same as telegram) +# qq: hermes-qq (same as telegram) # platform_toolsets: cli: [hermes-cli] @@ -561,6 +562,7 @@ platform_toolsets: slack: [hermes-slack] signal: [hermes-signal] homeassistant: [hermes-homeassistant] + qq: [hermes-qq] # ───────────────────────────────────────────────────────────────────────────── # Available toolsets (use these names in platform_toolsets or the toolsets list) diff --git a/cron/scheduler.py b/cron/scheduler.py index e6db77c09..44f2705e3 100644 --- a/cron/scheduler.py +++ b/cron/scheduler.py @@ -44,7 +44,7 @@ logger = logging.getLogger(__name__) _KNOWN_DELIVERY_PLATFORMS = frozenset({ "telegram", "discord", "slack", "whatsapp", "signal", "matrix", "mattermost", "homeassistant", "dingtalk", "feishu", - "wecom", "wecom_callback", "weixin", "sms", "email", "webhook", "bluebubbles", + "wecom", "wecom_callback", "weixin", "sms", "email", "webhook", "bluebubbles", "qq", }) from cron.jobs import get_due_jobs, mark_job_run, save_job_output, advance_next_run @@ -254,6 +254,7 @@ def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> Option "email": Platform.EMAIL, "sms": Platform.SMS, "bluebubbles": Platform.BLUEBUBBLES, + "qq": Platform.QQ, } platform = platform_map.get(platform_name.lower()) if not platform: diff --git a/gateway/config.py b/gateway/config.py index 7d6165927..36d001376 100644 --- a/gateway/config.py +++ b/gateway/config.py @@ -66,6 +66,7 @@ class Platform(Enum): WECOM_CALLBACK = "wecom_callback" WEIXIN = "weixin" BLUEBUBBLES = "bluebubbles" + QQ = "qq" @dataclass @@ -303,6 +304,9 @@ class GatewayConfig: # BlueBubbles uses extra dict for local server config elif platform == Platform.BLUEBUBBLES and config.extra.get("server_url") and config.extra.get("password"): connected.append(platform) + # QQ uses extra dict for app credentials + elif platform == Platform.QQ and config.extra.get("app_id") and config.extra.get("client_secret"): + connected.append(platform) return connected def get_home_channel(self, platform: Platform) -> Optional[HomeChannel]: @@ -1109,6 +1113,32 @@ def _apply_env_overrides(config: GatewayConfig) -> None: name=os.getenv("BLUEBUBBLES_HOME_CHANNEL_NAME", "Home"), ) + # QQ (Official Bot API v2) + qq_app_id = os.getenv("QQ_APP_ID") + qq_client_secret = os.getenv("QQ_CLIENT_SECRET") + if qq_app_id or qq_client_secret: + if Platform.QQ not in config.platforms: + config.platforms[Platform.QQ] = PlatformConfig() + config.platforms[Platform.QQ].enabled = True + extra = config.platforms[Platform.QQ].extra + if qq_app_id: + extra["app_id"] = qq_app_id + if qq_client_secret: + extra["client_secret"] = qq_client_secret + qq_allowed_users = os.getenv("QQ_ALLOWED_USERS", "").strip() + if qq_allowed_users: + extra["allow_from"] = qq_allowed_users + qq_group_allowed = os.getenv("QQ_GROUP_ALLOWED_USERS", "").strip() + if qq_group_allowed: + extra["group_allow_from"] = qq_group_allowed + qq_home = os.getenv("QQ_HOME_CHANNEL", "").strip() + if qq_home: + config.platforms[Platform.QQ].home_channel = HomeChannel( + platform=Platform.QQ, + chat_id=qq_home, + name=os.getenv("QQ_HOME_CHANNEL_NAME", "Home"), + ) + # Session settings idle_minutes = os.getenv("SESSION_IDLE_MINUTES") if idle_minutes: diff --git a/gateway/platforms/__init__.py b/gateway/platforms/__init__.py index dae74568d..36daf5f10 100644 --- a/gateway/platforms/__init__.py +++ b/gateway/platforms/__init__.py @@ -9,9 +9,11 @@ Each adapter handles: """ from .base import BasePlatformAdapter, MessageEvent, SendResult +from .qq import QQAdapter __all__ = [ "BasePlatformAdapter", "MessageEvent", "SendResult", + "QQAdapter", ] diff --git a/gateway/platforms/qq.py b/gateway/platforms/qq.py new file mode 100644 index 000000000..7805b6144 --- /dev/null +++ b/gateway/platforms/qq.py @@ -0,0 +1,1915 @@ +""" +QQ Bot platform adapter using the Official QQ Bot API (v2). + +Connects to the QQ Bot WebSocket Gateway for inbound events and uses the +REST API (``api.sgroup.qq.com``) for outbound messages and media uploads. + +Configuration in config.yaml: + platforms: + qq: + enabled: true + extra: + app_id: "your-app-id" # or QQ_APP_ID env var + client_secret: "your-secret" # or QQ_CLIENT_SECRET env var + markdown_support: true # enable QQ markdown (msg_type 2) + dm_policy: "open" # open | allowlist | disabled + allow_from: ["openid_1"] + group_policy: "open" # open | allowlist | disabled + group_allow_from: ["group_openid_1"] + stt: # Voice-to-text config (optional) + provider: "zai" # zai (GLM-ASR), openai (Whisper), etc. + baseUrl: "https://open.bigmodel.cn/api/coding/paas/v4" + apiKey: "your-stt-api-key" # or set QQ_STT_API_KEY env var + model: "glm-asr" # glm-asr, whisper-1, etc. + + Voice transcription priority: + 1. QQ's built-in ``asr_refer_text`` (Tencent ASR — free, always tried first) + 2. Configured STT provider via ``stt`` config or ``QQ_STT_*`` env vars + +Reference: https://bot.q.qq.com/wiki/develop/api-v2/ +""" + +from __future__ import annotations + +import asyncio +import base64 +import json +import logging +import mimetypes +import os +import time +import uuid +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Dict, List, Optional, Tuple +from urllib.parse import urlparse + +try: + import aiohttp + AIOHTTP_AVAILABLE = True +except ImportError: + AIOHTTP_AVAILABLE = False + aiohttp = None # type: ignore[assignment] + +try: + import httpx + HTTPX_AVAILABLE = True +except ImportError: + HTTPX_AVAILABLE = False + httpx = None # type: ignore[assignment] + +from gateway.config import Platform, PlatformConfig +from gateway.platforms.base import ( + BasePlatformAdapter, + MessageEvent, + MessageType, + SendResult, + cache_document_from_bytes, + cache_image_from_bytes, +) + +logger = logging.getLogger(__name__) + + +class QQCloseError(Exception): + """Raised when QQ WebSocket closes with a specific code. + + Carries the close code and reason for proper handling in the reconnect loop. + """ + + def __init__(self, code, reason=""): + self.code = int(code) if code else None + self.reason = str(reason) if reason else "" + super().__init__(f"WebSocket closed (code={self.code}, reason={self.reason})") +# --------------------------------------------------------------------------- +# Constants +# --------------------------------------------------------------------------- + +API_BASE = "https://api.sgroup.qq.com" +TOKEN_URL = "https://bots.qq.com/app/getAppAccessToken" +GATEWAY_URL_PATH = "/gateway" + +DEFAULT_API_TIMEOUT = 30.0 +FILE_UPLOAD_TIMEOUT = 120.0 +CONNECT_TIMEOUT_SECONDS = 20.0 + +RECONNECT_BACKOFF = [2, 5, 10, 30, 60] +MAX_RECONNECT_ATTEMPTS = 100 +RATE_LIMIT_DELAY = 60 # seconds +QUICK_DISCONNECT_THRESHOLD = 5.0 # seconds +MAX_QUICK_DISCONNECT_COUNT = 3 + +MAX_MESSAGE_LENGTH = 4000 +DEDUP_WINDOW_SECONDS = 300 +DEDUP_MAX_SIZE = 1000 + +# QQ Bot message types +MSG_TYPE_TEXT = 0 +MSG_TYPE_MARKDOWN = 2 +MSG_TYPE_MEDIA = 7 +MSG_TYPE_INPUT_NOTIFY = 6 + +# QQ Bot file media types +MEDIA_TYPE_IMAGE = 1 +MEDIA_TYPE_VIDEO = 2 +MEDIA_TYPE_VOICE = 3 +MEDIA_TYPE_FILE = 4 + + +def check_qq_requirements() -> bool: + """Check if QQ runtime dependencies are available.""" + return AIOHTTP_AVAILABLE and HTTPX_AVAILABLE + + +def _coerce_list(value: Any) -> List[str]: + """Coerce config values into a trimmed string list.""" + if value is None: + return [] + if isinstance(value, str): + return [item.strip() for item in value.split(",") if item.strip()] + if isinstance(value, (list, tuple, set)): + return [str(item).strip() for item in value if str(item).strip()] + return [str(value).strip()] if str(value).strip() else [] + + +# --------------------------------------------------------------------------- +# QQAdapter +# --------------------------------------------------------------------------- + +class QQAdapter(BasePlatformAdapter): + """QQ Bot adapter backed by the official QQ Bot WebSocket Gateway + REST API.""" + + # QQ Bot API does not support editing sent messages. + SUPPORTS_MESSAGE_EDITING = False + + def _fail_pending(self, reason: str) -> None: + """Fail all pending response futures.""" + for fut in self._pending_responses.values(): + if not fut.done(): + fut.set_exception(RuntimeError(reason)) + self._pending_responses.clear() + + MAX_MESSAGE_LENGTH = MAX_MESSAGE_LENGTH + + def __init__(self, config: PlatformConfig): + super().__init__(config, Platform.QQ) + + extra = config.extra or {} + self._app_id = str(extra.get("app_id") or os.getenv("QQ_APP_ID", "")).strip() + self._client_secret = str(extra.get("client_secret") or os.getenv("QQ_CLIENT_SECRET", "")).strip() + self._markdown_support = bool(extra.get("markdown_support", True)) + + # Auth/ACL policies + self._dm_policy = str(extra.get("dm_policy", "open")).strip().lower() + self._allow_from = _coerce_list(extra.get("allow_from") or extra.get("allowFrom")) + self._group_policy = str(extra.get("group_policy", "open")).strip().lower() + self._group_allow_from = _coerce_list(extra.get("group_allow_from") or extra.get("groupAllowFrom")) + + # Connection state + self._session: Optional[aiohttp.ClientSession] = None + self._ws: Optional[aiohttp.ClientWebSocketResponse] = None + self._http_client: Optional[httpx.AsyncClient] = None + self._listen_task: Optional[asyncio.Task] = None + self._heartbeat_task: Optional[asyncio.Task] = None + self._heartbeat_interval: float = 30.0 # seconds, updated by Hello + self._session_id: Optional[str] = None + self._last_seq: Optional[int] = None + self._chat_type_map: Dict[str, str] = {} # chat_id → "c2c"|"group"|"guild"|"dm" + + # Request/response correlation + self._pending_responses: Dict[str, asyncio.Future] = {} + self._seen_messages: Dict[str, float] = {} + + # Token cache + self._access_token: Optional[str] = None + self._token_expires_at: float = 0.0 + self._token_lock = asyncio.Lock() + + # Upload cache: content_hash -> {file_info, file_uuid, expires_at} + self._upload_cache: Dict[str, Dict[str, Any]] = {} + + # ------------------------------------------------------------------ + # Properties + # ------------------------------------------------------------------ + + @property + def name(self) -> str: + return "QQ" + + # ------------------------------------------------------------------ + # Connection lifecycle + # ------------------------------------------------------------------ + + async def connect(self) -> bool: + """Authenticate, obtain gateway URL, and open the WebSocket.""" + if not AIOHTTP_AVAILABLE: + message = "QQ startup failed: aiohttp not installed" + self._set_fatal_error("qq_missing_dependency", message, retryable=True) + logger.warning("[%s] %s. Run: pip install aiohttp", self.name, message) + return False + if not HTTPX_AVAILABLE: + message = "QQ startup failed: httpx not installed" + self._set_fatal_error("qq_missing_dependency", message, retryable=True) + logger.warning("[%s] %s. Run: pip install httpx", self.name, message) + return False + if not self._app_id or not self._client_secret: + message = "QQ startup failed: QQ_APP_ID and QQ_CLIENT_SECRET are required" + self._set_fatal_error("qq_missing_credentials", message, retryable=True) + logger.warning("[%s] %s", self.name, message) + return False + + try: + self._http_client = httpx.AsyncClient(timeout=30.0, follow_redirects=True) + + # 1. Get access token + await self._ensure_token() + + # 2. Get WebSocket gateway URL + gateway_url = await self._get_gateway_url() + logger.info("[%s] Gateway URL: %s", self.name, gateway_url) + + # 3. Open WebSocket + await self._open_ws(gateway_url) + + # 4. Start listeners + self._listen_task = asyncio.create_task(self._listen_loop()) + self._heartbeat_task = asyncio.create_task(self._heartbeat_loop()) + self._mark_connected() + logger.info("[%s] Connected", self.name) + return True + except Exception as exc: + message = f"QQ startup failed: {exc}" + self._set_fatal_error("qq_connect_error", message, retryable=True) + logger.error("[%s] %s", self.name, message, exc_info=True) + await self._cleanup() + return False + + async def disconnect(self) -> None: + """Close all connections and stop listeners.""" + self._running = False + self._mark_disconnected() + + if self._listen_task: + self._listen_task.cancel() + try: + await self._listen_task + except asyncio.CancelledError: + pass + self._listen_task = None + + if self._heartbeat_task: + self._heartbeat_task.cancel() + try: + await self._heartbeat_task + except asyncio.CancelledError: + pass + self._heartbeat_task = None + + await self._cleanup() + logger.info("[%s] Disconnected", self.name) + + async def _cleanup(self) -> None: + """Close WebSocket, HTTP session, and client.""" + if self._ws and not self._ws.closed: + await self._ws.close() + self._ws = None + + if self._session and not self._session.closed: + await self._session.close() + self._session = None + + if self._http_client: + await self._http_client.aclose() + self._http_client = None + + # Fail pending + for fut in self._pending_responses.values(): + if not fut.done(): + fut.set_exception(RuntimeError("Disconnected")) + self._pending_responses.clear() + + # ------------------------------------------------------------------ + # Token management + # ------------------------------------------------------------------ + + async def _ensure_token(self) -> str: + """Return a valid access token, refreshing if needed (with singleflight).""" + if self._access_token and time.time() < self._token_expires_at - 60: + return self._access_token + + async with self._token_lock: + # Double-check after acquiring lock + if self._access_token and time.time() < self._token_expires_at - 60: + return self._access_token + + try: + resp = await self._http_client.post( + TOKEN_URL, + json={"appId": self._app_id, "clientSecret": self._client_secret}, + timeout=DEFAULT_API_TIMEOUT, + ) + resp.raise_for_status() + data = resp.json() + except Exception as exc: + raise RuntimeError(f"Failed to get QQ Bot access token: {exc}") from exc + + token = data.get("access_token") + if not token: + raise RuntimeError(f"QQ Bot token response missing access_token: {data}") + + expires_in = int(data.get("expires_in", 7200)) + self._access_token = token + self._token_expires_at = time.time() + expires_in + logger.info("[%s] Access token refreshed, expires in %ds", self.name, expires_in) + return self._access_token + + async def _get_gateway_url(self) -> str: + """Fetch the WebSocket gateway URL from the REST API.""" + token = await self._ensure_token() + try: + resp = await self._http_client.get( + f"{API_BASE}{GATEWAY_URL_PATH}", + headers={"Authorization": f"QQBot {token}"}, + timeout=DEFAULT_API_TIMEOUT, + ) + resp.raise_for_status() + data = resp.json() + except Exception as exc: + raise RuntimeError(f"Failed to get QQ Bot gateway URL: {exc}") from exc + + url = data.get("url") + if not url: + raise RuntimeError(f"QQ Bot gateway response missing url: {data}") + return url + + # ------------------------------------------------------------------ + # WebSocket lifecycle + # ------------------------------------------------------------------ + + async def _open_ws(self, gateway_url: str) -> None: + """Open a WebSocket connection to the QQ Bot gateway.""" + # Only clean up WebSocket resources — keep _http_client alive for REST API calls. + if self._ws and not self._ws.closed: + await self._ws.close() + self._ws = None + if self._session and not self._session.closed: + await self._session.close() + self._session = None + + self._session = aiohttp.ClientSession() + self._ws = await self._session.ws_connect( + gateway_url, + timeout=CONNECT_TIMEOUT_SECONDS, + ) + logger.info("[%s] WebSocket connected to %s", self.name, gateway_url) + + async def _listen_loop(self) -> None: + """Read WebSocket events and reconnect on errors. + + Close code handling follows the OpenClaw qqbot reference implementation: + 4004 → invalid token, refresh and reconnect + 4006/4007/4009 → session invalid, clear session and re-identify + 4008 → rate limited, back off 60s + 4914 → bot offline/sandbox, stop reconnecting + 4915 → bot banned, stop reconnecting + """ + backoff_idx = 0 + connect_time = 0.0 + quick_disconnect_count = 0 + + while self._running: + try: + connect_time = time.monotonic() + await self._read_events() + backoff_idx = 0 + quick_disconnect_count = 0 + except asyncio.CancelledError: + return + except QQCloseError as exc: + if not self._running: + return + + code = exc.code + logger.warning("[%s] WebSocket closed: code=%s reason=%s", + self.name, code, exc.reason) + + # Quick disconnect detection (permission issues, misconfiguration) + duration = time.monotonic() - connect_time + if duration < QUICK_DISCONNECT_THRESHOLD and connect_time > 0: + quick_disconnect_count += 1 + logger.info("[%s] Quick disconnect (%.1fs), count: %d", + self.name, duration, quick_disconnect_count) + if quick_disconnect_count >= MAX_QUICK_DISCONNECT_COUNT: + logger.error( + "[%s] Too many quick disconnects. " + "Check: 1) AppID/Secret correct 2) Bot permissions on QQ Open Platform", + self.name, + ) + self._set_fatal_error("qq_quick_disconnect", + "Too many quick disconnects — check bot permissions", retryable=True) + return + else: + quick_disconnect_count = 0 + + self._mark_disconnected() + self._fail_pending("Connection closed") + + # Stop reconnecting for fatal codes + if code in (4914, 4915): + desc = "offline/sandbox-only" if code == 4914 else "banned" + logger.error("[%s] Bot is %s. Check QQ Open Platform.", self.name, desc) + self._set_fatal_error(f"qq_{desc}", f"Bot is {desc}", retryable=False) + return + + # Rate limited + if code == 4008: + logger.info("[%s] Rate limited (4008), waiting %ds", self.name, RATE_LIMIT_DELAY) + if backoff_idx >= MAX_RECONNECT_ATTEMPTS: + return + await asyncio.sleep(RATE_LIMIT_DELAY) + if await self._reconnect(backoff_idx): + backoff_idx = 0 + quick_disconnect_count = 0 + else: + backoff_idx += 1 + continue + + # Token invalid → clear cached token so _ensure_token() refreshes + if code == 4004: + logger.info("[%s] Invalid token (4004), will refresh and reconnect", self.name) + self._access_token = None + self._token_expires_at = 0.0 + + # Session invalid → clear session, will re-identify on next Hello + if code in (4006, 4007, 4009, 4900, 4901, 4902, 4903, 4904, 4905, + 4906, 4907, 4908, 4909, 4910, 4911, 4912, 4913): + logger.info("[%s] Session error (%d), clearing session for re-identify", self.name, code) + self._session_id = None + self._last_seq = None + + if await self._reconnect(backoff_idx): + backoff_idx = 0 + quick_disconnect_count = 0 + else: + backoff_idx += 1 + + except Exception as exc: + if not self._running: + return + logger.warning("[%s] WebSocket error: %s", self.name, exc) + self._mark_disconnected() + self._fail_pending("Connection interrupted") + + if backoff_idx >= MAX_RECONNECT_ATTEMPTS: + logger.error("[%s] Max reconnect attempts reached", self.name) + return + + if await self._reconnect(backoff_idx): + backoff_idx = 0 + quick_disconnect_count = 0 + else: + backoff_idx += 1 + + async def _reconnect(self, backoff_idx: int) -> bool: + """Attempt to reconnect the WebSocket. Returns True on success.""" + delay = RECONNECT_BACKOFF[min(backoff_idx, len(RECONNECT_BACKOFF) - 1)] + logger.info("[%s] Reconnecting in %ds (attempt %d)...", self.name, delay, backoff_idx + 1) + await asyncio.sleep(delay) + + self._heartbeat_interval = 30.0 # reset until Hello + try: + await self._ensure_token() + gateway_url = await self._get_gateway_url() + await self._open_ws(gateway_url) + self._mark_connected() + logger.info("[%s] Reconnected", self.name) + return True + except Exception as exc: + logger.warning("[%s] Reconnect failed: %s", self.name, exc) + return False + + async def _read_events(self) -> None: + """Read WebSocket frames until connection closes.""" + if not self._ws: + raise RuntimeError("WebSocket not connected") + + while self._running and self._ws and not self._ws.closed: + msg = await self._ws.receive() + if msg.type == aiohttp.WSMsgType.TEXT: + payload = self._parse_json(msg.data) + if payload: + self._dispatch_payload(payload) + elif msg.type in (aiohttp.WSMsgType.PING,): + # aiohttp auto-replies with PONG + pass + elif msg.type == aiohttp.WSMsgType.CLOSE: + raise QQCloseError(msg.data, msg.extra) + elif msg.type in (aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.ERROR): + raise RuntimeError("WebSocket closed") + + async def _heartbeat_loop(self) -> None: + """Send periodic heartbeats (QQ Gateway expects op 1 heartbeat with latest seq). + + The interval is set from the Hello (op 10) event's heartbeat_interval. + QQ's default is ~41s; we send at 80% of the interval to stay safe. + """ + try: + while self._running: + await asyncio.sleep(self._heartbeat_interval) + if not self._ws or self._ws.closed: + continue + try: + # d should be the latest sequence number received, or null + await self._ws.send_json({"op": 1, "d": self._last_seq}) + except Exception as exc: + logger.debug("[%s] Heartbeat failed: %s", self.name, exc) + except asyncio.CancelledError: + pass + + async def _send_identify(self) -> None: + """Send op 2 Identify to authenticate the WebSocket connection. + + After receiving op 10 Hello, the client must send op 2 Identify with + the bot token and intents. On success the server replies with a + READY dispatch event. + + Reference: https://bot.q.qq.com/wiki/develop/api-v2/dev-prepare/interface-framework/reference.html + """ + token = await self._ensure_token() + identify_payload = { + "op": 2, + "d": { + "token": f"QQBot {token}", + "intents": (1 << 25) | (1 << 30) | (1 << 12), # C2C_GROUP_AT_MESSAGES + PUBLIC_GUILD_MESSAGES + DIRECT_MESSAGE + "shard": [0, 1], + "properties": { + "$os": "macOS", + "$browser": "hermes-agent", + "$device": "hermes-agent", + }, + }, + } + try: + if self._ws and not self._ws.closed: + await self._ws.send_json(identify_payload) + logger.info("[%s] Identify sent", self.name) + else: + logger.warning("[%s] Cannot send Identify: WebSocket not connected", self.name) + except Exception as exc: + logger.error("[%s] Failed to send Identify: %s", self.name, exc) + + async def _send_resume(self) -> None: + """Send op 6 Resume to re-authenticate after a reconnection. + + Reference: https://bot.q.qq.com/wiki/develop/api-v2/dev-prepare/interface-framework/reference.html + """ + token = await self._ensure_token() + resume_payload = { + "op": 6, + "d": { + "token": f"QQBot {token}", + "session_id": self._session_id, + "seq": self._last_seq, + }, + } + try: + if self._ws and not self._ws.closed: + await self._ws.send_json(resume_payload) + logger.info("[%s] Resume sent (session_id=%s, seq=%s)", + self.name, self._session_id, self._last_seq) + else: + logger.warning("[%s] Cannot send Resume: WebSocket not connected", self.name) + except Exception as exc: + logger.error("[%s] Failed to send Resume: %s", self.name, exc) + # If resume fails, clear session and fall back to identify on next Hello + self._session_id = None + self._last_seq = None + + @staticmethod + def _create_task(coro): + """Schedule a coroutine, silently skipping if no event loop is running. + + This avoids ``RuntimeError: no running event loop`` when tests call + ``_dispatch_payload`` synchronously outside of ``asyncio.run()``. + """ + try: + loop = asyncio.get_running_loop() + return loop.create_task(coro) + except RuntimeError: + return None + + def _dispatch_payload(self, payload: Dict[str, Any]) -> None: + """Route inbound WebSocket payloads (dispatch synchronously, spawn async handlers).""" + op = payload.get("op") + t = payload.get("t") + s = payload.get("s") + d = payload.get("d") + if isinstance(s, int) and (self._last_seq is None or s > self._last_seq): + self._last_seq = s + + # op 10 = Hello (heartbeat interval) — must reply with Identify/Resume + if op == 10: + d_data = d if isinstance(d, dict) else {} + interval_ms = d_data.get("heartbeat_interval", 30000) + # Send heartbeats at 80% of the server interval to stay safe + self._heartbeat_interval = interval_ms / 1000.0 * 0.8 + logger.debug("[%s] Hello received, heartbeat_interval=%dms (sending every %.1fs)", + self.name, interval_ms, self._heartbeat_interval) + # Authenticate: send Resume if we have a session, else Identify. + # Use _create_task which is safe when no event loop is running (tests). + if self._session_id and self._last_seq is not None: + self._create_task(self._send_resume()) + else: + self._create_task(self._send_identify()) + return + + # op 0 = Dispatch + if op == 0 and t: + if t == "READY": + self._handle_ready(d) + elif t == "RESUMED": + logger.info("[%s] Session resumed", self.name) + elif t in ("C2C_MESSAGE_CREATE", "GROUP_AT_MESSAGE_CREATE", + "DIRECT_MESSAGE_CREATE", "GUILD_MESSAGE_CREATE", + "GUILD_AT_MESSAGE_CREATE"): + asyncio.create_task(self._on_message(t, d)) + else: + logger.debug("[%s] Unhandled dispatch: %s", self.name, t) + return + + # op 11 = Heartbeat ACK + if op == 11: + return + + logger.debug("[%s] Unknown op: %s", self.name, op) + + def _handle_ready(self, d: Any) -> None: + """Handle the READY event — store session_id for resume.""" + if isinstance(d, dict): + self._session_id = d.get("session_id") + logger.info("[%s] Ready, session_id=%s", self.name, self._session_id) + + # ------------------------------------------------------------------ + # JSON helpers + # ------------------------------------------------------------------ + + @staticmethod + def _parse_json(raw: Any) -> Optional[Dict[str, Any]]: + try: + payload = json.loads(raw) + except Exception: + logger.debug("[%s] Failed to parse JSON: %r", "QQ", raw) + return None + return payload if isinstance(payload, dict) else None + + @staticmethod + def _next_msg_seq(msg_id: str) -> int: + """Generate a message sequence number in 0..65535 range.""" + time_part = int(time.time()) % 100000000 + rand = int(uuid.uuid4().hex[:4], 16) + return (time_part ^ rand) % 65536 + + # ------------------------------------------------------------------ + # Inbound message handling + # ------------------------------------------------------------------ + + async def _on_message(self, event_type: str, d: Any) -> None: + """Process an inbound QQ Bot message event.""" + if not isinstance(d, dict): + return + + # Extract common fields + msg_id = str(d.get("id", "")) + if not msg_id or self._is_duplicate(msg_id): + logger.debug("[%s] Duplicate or missing message id: %s", self.name, msg_id) + return + + timestamp = str(d.get("timestamp", "")) + content = str(d.get("content", "")).strip() + author = d.get("author") if isinstance(d.get("author"), dict) else {} + + # Route by event type + if event_type == "C2C_MESSAGE_CREATE": + await self._handle_c2c_message(d, msg_id, content, author, timestamp) + elif event_type in ("GROUP_AT_MESSAGE_CREATE",): + await self._handle_group_message(d, msg_id, content, author, timestamp) + elif event_type in ("GUILD_MESSAGE_CREATE", "GUILD_AT_MESSAGE_CREATE"): + await self._handle_guild_message(d, msg_id, content, author, timestamp) + elif event_type == "DIRECT_MESSAGE_CREATE": + await self._handle_dm_message(d, msg_id, content, author, timestamp) + + async def _handle_c2c_message( + self, d: Dict[str, Any], msg_id: str, content: str, author: Dict[str, Any], timestamp: str + ) -> None: + """Handle a C2C (private) message event.""" + user_openid = str(author.get("user_openid", "")) + if not user_openid: + return + if not self._is_dm_allowed(user_openid): + return + + text = content + attachments_raw = d.get("attachments") + logger.info("[QQ] C2C message: id=%s content=%r attachments=%s", + msg_id, content[:50] if content else "", + f"{len(attachments_raw) if isinstance(attachments_raw, list) else 0} items" + if attachments_raw else "None") + if attachments_raw and isinstance(attachments_raw, list): + for _i, _att in enumerate(attachments_raw): + if isinstance(_att, dict): + logger.info("[QQ] attachment[%d]: content_type=%s url=%s filename=%s", + _i, _att.get("content_type", ""), + str(_att.get("url", ""))[:80], + _att.get("filename", "")) + + # Process all attachments uniformly (images, voice, files) + att_result = await self._process_attachments(attachments_raw) + image_urls = att_result["image_urls"] + image_media_types = att_result["image_media_types"] + voice_transcripts = att_result["voice_transcripts"] + attachment_info = att_result["attachment_info"] + + # Append voice transcripts to the text body + if voice_transcripts: + voice_block = "\n".join(voice_transcripts) + text = (text + "\n\n" + voice_block).strip() if text.strip() else voice_block + # Append non-media attachment info + if attachment_info: + text = (text + "\n\n" + attachment_info).strip() if text.strip() else attachment_info + + logger.info("[QQ] After processing: images=%d, voice=%d", + len(image_urls), len(voice_transcripts)) + + if not text.strip() and not image_urls: + return + + self._chat_type_map[user_openid] = "c2c" + event = MessageEvent( + source=self.build_source( + chat_id=user_openid, + user_id=user_openid, + chat_type="dm", + ), + text=text, + message_type=self._detect_message_type(image_urls, image_media_types), + raw_message=d, + message_id=msg_id, + media_urls=image_urls, + media_types=image_media_types, + timestamp=datetime.now(tz=timezone.utc), + ) + await self.handle_message(event) + + async def _handle_group_message( + self, d: Dict[str, Any], msg_id: str, content: str, author: Dict[str, Any], timestamp: str + ) -> None: + """Handle a group @-message event.""" + group_openid = str(d.get("group_openid", "")) + if not group_openid: + return + if not self._is_group_allowed(group_openid, str(author.get("member_openid", ""))): + return + + # Strip the @bot mention prefix from content + text = self._strip_at_mention(content) + att_result = await self._process_attachments(d.get("attachments")) + image_urls = att_result["image_urls"] + image_media_types = att_result["image_media_types"] + voice_transcripts = att_result["voice_transcripts"] + attachment_info = att_result["attachment_info"] + + # Append voice transcripts + if voice_transcripts: + voice_block = "\n".join(voice_transcripts) + text = (text + "\n\n" + voice_block).strip() if text.strip() else voice_block + if attachment_info: + text = (text + "\n\n" + attachment_info).strip() if text.strip() else attachment_info + + if not text.strip() and not image_urls: + return + + self._chat_type_map[group_openid] = "group" + event = MessageEvent( + source=self.build_source( + chat_id=group_openid, + user_id=str(author.get("member_openid", "")), + chat_type="group", + ), + text=text, + message_type=self._detect_message_type(image_urls, image_media_types), + raw_message=d, + message_id=msg_id, + media_urls=image_urls, + media_types=image_media_types, + timestamp=datetime.now(tz=timezone.utc), + ) + await self.handle_message(event) + + async def _handle_guild_message( + self, d: Dict[str, Any], msg_id: str, content: str, author: Dict[str, Any], timestamp: str + ) -> None: + """Handle a guild/channel message event.""" + channel_id = str(d.get("channel_id", "")) + if not channel_id: + return + + member = d.get("member") if isinstance(d.get("member"), dict) else {} + nick = str(member.get("nick", "")) or str(author.get("username", "")) + + text = content + att_result = await self._process_attachments(d.get("attachments")) + image_urls = att_result["image_urls"] + image_media_types = att_result["image_media_types"] + voice_transcripts = att_result["voice_transcripts"] + attachment_info = att_result["attachment_info"] + + if voice_transcripts: + voice_block = "\n".join(voice_transcripts) + text = (text + "\n\n" + voice_block).strip() if text.strip() else voice_block + if attachment_info: + text = (text + "\n\n" + attachment_info).strip() if text.strip() else attachment_info + + if not text.strip() and not image_urls: + return + + self._chat_type_map[channel_id] = "guild" + event = MessageEvent( + source=self.build_source( + chat_id=channel_id, + user_id=str(author.get("id", "")), + user_name=nick or None, + chat_type="group", + ), + text=text, + message_type=self._detect_message_type(image_urls, image_media_types), + raw_message=d, + message_id=msg_id, + media_urls=image_urls, + media_types=image_media_types, + timestamp=datetime.now(tz=timezone.utc), + ) + await self.handle_message(event) + + async def _handle_dm_message( + self, d: Dict[str, Any], msg_id: str, content: str, author: Dict[str, Any], timestamp: str + ) -> None: + """Handle a guild DM message event.""" + guild_id = str(d.get("guild_id", "")) + if not guild_id: + return + + text = content + att_result = await self._process_attachments(d.get("attachments")) + image_urls = att_result["image_urls"] + image_media_types = att_result["image_media_types"] + voice_transcripts = att_result["voice_transcripts"] + attachment_info = att_result["attachment_info"] + + if voice_transcripts: + voice_block = "\n".join(voice_transcripts) + text = (text + "\n\n" + voice_block).strip() if text.strip() else voice_block + if attachment_info: + text = (text + "\n\n" + attachment_info).strip() if text.strip() else attachment_info + + if not text.strip() and not image_urls: + return + + self._chat_type_map[guild_id] = "dm" + event = MessageEvent( + source=self.build_source( + chat_id=guild_id, + user_id=str(author.get("id", "")), + chat_type="dm", + ), + text=text, + message_type=self._detect_message_type(image_urls, image_media_types), + raw_message=d, + message_id=msg_id, + media_urls=image_urls, + media_types=image_media_types, + timestamp=datetime.now(tz=timezone.utc), + ) + await self.handle_message(event) + + # ------------------------------------------------------------------ + # Attachment processing + # ------------------------------------------------------------------ + + + @staticmethod + def _detect_message_type(media_urls: list, media_types: list): + """Determine MessageType from attachment content types.""" + if not media_urls: + return MessageType.TEXT + if not media_types: + return MessageType.PHOTO + first_type = media_types[0].lower() if media_types else "" + if "audio" in first_type or "voice" in first_type or "silk" in first_type: + return MessageType.VOICE + if "video" in first_type: + return MessageType.VIDEO + if "image" in first_type or "photo" in first_type: + return MessageType.PHOTO + # Unknown content type with an attachment — don't assume PHOTO + # to prevent non-image files from being sent to vision analysis. + logger.debug("[QQ] Unknown media content_type '%s', defaulting to TEXT", first_type) + return MessageType.TEXT + + async def _process_attachments( + self, attachments: Any, + ) -> Dict[str, Any]: + """Process inbound attachments (all message types). + + Mirrors OpenClaw's ``processAttachments`` — handles images, voice, and + other files uniformly. + + Returns a dict with: + - image_urls: list[str] — cached local image paths + - image_media_types: list[str] — MIME types of cached images + - voice_transcripts: list[str] — STT transcripts for voice messages + - attachment_info: str — text description of non-image, non-voice attachments + """ + if not isinstance(attachments, list): + return {"image_urls": [], "image_media_types": [], + "voice_transcripts": [], "attachment_info": ""} + + image_urls: List[str] = [] + image_media_types: List[str] = [] + voice_transcripts: List[str] = [] + other_attachments: List[str] = [] + + for att in attachments: + if not isinstance(att, dict): + continue + + ct = str(att.get("content_type", "")).strip().lower() + url_raw = str(att.get("url", "")).strip() + filename = str(att.get("filename", "")) + if url_raw.startswith("//"): + url = f"https:{url_raw}" + elif url_raw: + url = url_raw + else: + url = "" + continue + + logger.debug("[QQ] Processing attachment: content_type=%s, url=%s, filename=%s", + ct, url[:80], filename) + + if self._is_voice_content_type(ct, filename): + # Voice: use QQ's asr_refer_text first, then voice_wav_url, then STT. + asr_refer = ( + str(att.get("asr_refer_text", "")).strip() + if isinstance(att.get("asr_refer_text"), str) else "" + ) + voice_wav_url = ( + str(att.get("voice_wav_url", "")).strip() + if isinstance(att.get("voice_wav_url"), str) else "" + ) + + transcript = await self._stt_voice_attachment( + url, ct, filename, + asr_refer_text=asr_refer or None, + voice_wav_url=voice_wav_url or None, + ) + if transcript: + voice_transcripts.append(f"[Voice] {transcript}") + logger.info("[QQ] Voice transcript: %s", transcript) + else: + logger.warning("[QQ] Voice STT failed for %s", url[:60]) + voice_transcripts.append("[Voice] [语音识别失败]") + elif ct.startswith("image/"): + # Image: download and cache locally. + try: + cached_path = await self._download_and_cache(url, ct) + if cached_path and os.path.isfile(cached_path): + image_urls.append(cached_path) + image_media_types.append(ct or "image/jpeg") + elif cached_path: + logger.warning("[QQ] Cached image path does not exist: %s", cached_path) + except Exception as exc: + logger.debug("[QQ] Failed to cache image: %s", exc) + else: + # Other attachments (video, file, etc.): record as text. + try: + cached_path = await self._download_and_cache(url, ct) + if cached_path: + other_attachments.append(f"[Attachment: {filename or ct}]") + except Exception as exc: + logger.debug("[QQ] Failed to cache attachment: %s", exc) + + attachment_info = "\n".join(other_attachments) if other_attachments else "" + return { + "image_urls": image_urls, + "image_media_types": image_media_types, + "voice_transcripts": voice_transcripts, + "attachment_info": attachment_info, + } + + async def _download_and_cache(self, url: str, content_type: str) -> Optional[str]: + """Download a URL and cache it locally.""" + from tools.url_safety import is_safe_url + if not is_safe_url(url): + raise ValueError(f"Blocked unsafe URL: {url[:80]}") + + if not self._http_client: + return None + + try: + resp = await self._http_client.get( + url, timeout=30.0, headers=self._qq_media_headers(), + ) + resp.raise_for_status() + data = resp.content + except Exception as exc: + logger.debug("[%s] Download failed for %s: %s", self.name, url[:80], exc) + return None + + if content_type.startswith("image/"): + ext = mimetypes.guess_extension(content_type) or ".jpg" + return cache_image_from_bytes(data, ext) + elif content_type == "voice" or content_type.startswith("audio/"): + # QQ voice messages are typically .amr or .silk format. + # Convert to .wav using ffmpeg so STT engines can process it. + return await self._convert_audio_to_wav(data, url) + else: + filename = Path(urlparse(url).path).name or "qq_attachment" + return cache_document_from_bytes(data, filename) + + @staticmethod + def _is_voice_content_type(content_type: str, filename: str) -> bool: + """Check if an attachment is a voice/audio message.""" + ct = content_type.strip().lower() + fn = filename.strip().lower() + if ct == "voice" or ct.startswith("audio/"): + return True + _VOICE_EXTENSIONS = (".silk", ".amr", ".mp3", ".wav", ".ogg", ".m4a", ".aac", ".speex", ".flac") + if any(fn.endswith(ext) for ext in _VOICE_EXTENSIONS): + return True + return False + + def _qq_media_headers(self) -> Dict[str, str]: + """Return Authorization headers for QQ multimedia CDN downloads. + + QQ's multimedia URLs (multimedia.nt.qq.com.cn) require the bot's + access token in an Authorization header, otherwise the download + returns a non-200 status. + """ + if self._access_token: + return {"Authorization": f"QQBot {self._access_token}"} + return {} + + async def _stt_voice_attachment( + self, + url: str, + content_type: str, + filename: str, + *, + asr_refer_text: Optional[str] = None, + voice_wav_url: Optional[str] = None, + ) -> Optional[str]: + """Download a voice attachment, convert to wav, and transcribe. + + Priority: + 1. QQ's built-in ``asr_refer_text`` (Tencent's own ASR — free, no API call). + 2. Self-hosted STT on ``voice_wav_url`` (pre-converted WAV from QQ, avoids SILK decoding). + 3. Self-hosted STT on the original attachment URL (requires SILK→WAV conversion). + + Returns the transcript text, or None on failure. + """ + # 1. Use QQ's built-in ASR text if available + if asr_refer_text: + logger.info("[QQ] STT: using QQ asr_refer_text: %r", asr_refer_text[:100]) + return asr_refer_text + + # Determine which URL to download (prefer voice_wav_url — already WAV) + download_url = url + is_pre_wav = False + if voice_wav_url: + if voice_wav_url.startswith("//"): + voice_wav_url = f"https:{voice_wav_url}" + download_url = voice_wav_url + is_pre_wav = True + logger.info("[QQ] STT: using voice_wav_url (pre-converted WAV)") + + try: + # 2. Download audio (QQ CDN requires Authorization header) + if not self._http_client: + logger.warning("[QQ] STT: no HTTP client") + return None + + download_headers = self._qq_media_headers() + logger.info("[QQ] STT: downloading voice from %s (pre_wav=%s, headers=%s)", + download_url[:80], is_pre_wav, bool(download_headers)) + resp = await self._http_client.get( + download_url, timeout=30.0, headers=download_headers, follow_redirects=True, + ) + resp.raise_for_status() + audio_data = resp.content + logger.info("[QQ] STT: downloaded %d bytes, content_type=%s", + len(audio_data), resp.headers.get("content-type", "unknown")) + + if len(audio_data) < 10: + logger.warning("[QQ] STT: downloaded data too small (%d bytes), skipping", len(audio_data)) + return None + + # 3. Convert to wav (skip if we already have a pre-converted WAV) + if is_pre_wav: + import tempfile + with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp: + tmp.write(audio_data) + wav_path = tmp.name + logger.info("[QQ] STT: using pre-converted WAV directly (%d bytes)", len(audio_data)) + else: + logger.info("[QQ] STT: converting to wav, filename=%r", filename) + wav_path = await self._convert_audio_to_wav_file(audio_data, filename) + if not wav_path or not Path(wav_path).exists(): + logger.warning("[QQ] STT: ffmpeg conversion produced no output") + return None + + # 4. Call STT API + logger.info("[QQ] STT: calling ASR on %s", wav_path) + transcript = await self._call_stt(wav_path) + + # 5. Cleanup temp file + try: + os.unlink(wav_path) + except OSError: + pass + + if transcript: + logger.info("[QQ] STT success: %r", transcript[:100]) + else: + logger.warning("[QQ] STT: ASR returned empty transcript") + return transcript + except (httpx.HTTPStatusError, httpx.TransportError, IOError) as exc: + logger.warning("[QQ] STT failed for voice attachment: %s: %s", type(exc).__name__, exc) + return None + + async def _convert_audio_to_wav_file(self, audio_data: bytes, filename: str) -> Optional[str]: + """Convert audio bytes to a temp .wav file using pilk (SILK) or ffmpeg. + + QQ voice messages are typically SILK format which ffmpeg cannot decode. + Strategy: always try pilk first, fall back to ffmpeg if pilk fails. + + Returns the wav file path, or None on failure. + """ + import tempfile + + ext = Path(filename).suffix.lower() if Path(filename).suffix else self._guess_ext_from_data(audio_data) + logger.info("[QQ] STT: audio_data size=%d, ext=%r, first_20_bytes=%r", + len(audio_data), ext, audio_data[:20]) + + with tempfile.NamedTemporaryFile(suffix=ext, delete=False) as tmp_src: + tmp_src.write(audio_data) + src_path = tmp_src.name + + wav_path = src_path.rsplit(".", 1)[0] + ".wav" + + # Try pilk first (handles SILK and many other formats) + result = await self._convert_silk_to_wav(src_path, wav_path) + + # If pilk failed, try ffmpeg + if not result: + result = await self._convert_ffmpeg_to_wav(src_path, wav_path) + + # If ffmpeg also failed, try writing raw PCM as WAV (last resort) + if not result: + result = await self._convert_raw_to_wav(audio_data, wav_path) + + # Cleanup source file + try: + os.unlink(src_path) + except OSError: + pass + + return result + + @staticmethod + def _guess_ext_from_data(data: bytes) -> str: + """Guess file extension from magic bytes.""" + if data[:9] == b"#!SILK_V3" or data[:5] == b"#!SILK": + return ".silk" + if data[:2] == b"\x02!": + return ".silk" + if data[:4] == b"RIFF": + return ".wav" + if data[:4] == b"fLaC": + return ".flac" + if data[:2] in (b"\xff\xfb", b"\xff\xf3", b"\xff\xf2"): + return ".mp3" + if data[:4] == b"\x30\x26\xb2\x75" or data[:4] == b"\x4f\x67\x67\x53": + return ".ogg" + if data[:4] == b"\x00\x00\x00\x20" or data[:4] == b"\x00\x00\x00\x1c": + return ".amr" + # Default to .amr for unknown (QQ's most common voice format) + return ".amr" + + @staticmethod + def _looks_like_silk(data: bytes) -> bool: + """Check if bytes look like a SILK audio file.""" + return data[:4] == b"#!SILK" or data[:2] == b"\x02!" or data[:9] == b"#!SILK_V3" + + @staticmethod + async def _convert_silk_to_wav(src_path: str, wav_path: str) -> Optional[str]: + """Convert audio file to WAV using the pilk library. + + Tries the file as-is first, then as .silk if the extension differs. + pilk can handle SILK files with various headers (or no header). + """ + try: + import pilk + except ImportError: + logger.warning("[QQ] pilk not installed — cannot decode SILK audio. Run: pip install pilk") + return None + + # Try converting the file as-is + try: + pilk.silk_to_wav(src_path, wav_path, rate=16000) + if Path(wav_path).exists() and Path(wav_path).stat().st_size > 44: + logger.info("[QQ] pilk converted %s to wav (%d bytes)", + Path(src_path).name, Path(wav_path).stat().st_size) + return wav_path + except Exception as exc: + logger.debug("[QQ] pilk direct conversion failed: %s", exc) + + # Try renaming to .silk and converting (pilk checks the extension) + silk_path = src_path.rsplit(".", 1)[0] + ".silk" + try: + import shutil + shutil.copy2(src_path, silk_path) + pilk.silk_to_wav(silk_path, wav_path, rate=16000) + if Path(wav_path).exists() and Path(wav_path).stat().st_size > 44: + logger.info("[QQ] pilk converted %s (as .silk) to wav (%d bytes)", + Path(src_path).name, Path(wav_path).stat().st_size) + return wav_path + except Exception as exc: + logger.debug("[QQ] pilk .silk conversion failed: %s", exc) + finally: + try: + os.unlink(silk_path) + except OSError: + pass + + return None + + @staticmethod + async def _convert_raw_to_wav(audio_data: bytes, wav_path: str) -> Optional[str]: + """Last resort: try writing audio data as raw PCM 16-bit mono 16kHz WAV. + + This will produce garbage if the data isn't raw PCM, but at least + the ASR engine won't crash — it'll just return empty. + """ + try: + import wave + with wave.open(wav_path, "w") as wf: + wf.setnchannels(1) + wf.setsampwidth(2) + wf.setframerate(16000) + wf.writeframes(audio_data) + return wav_path + except Exception as exc: + logger.debug("[QQ] raw PCM fallback failed: %s", exc) + return None + + @staticmethod + async def _convert_ffmpeg_to_wav(src_path: str, wav_path: str) -> Optional[str]: + """Convert audio file to WAV using ffmpeg.""" + try: + proc = await asyncio.create_subprocess_exec( + "ffmpeg", "-y", "-i", src_path, "-ar", "16000", "-ac", "1", wav_path, + stdout=asyncio.subprocess.DEVNULL, + stderr=asyncio.subprocess.PIPE, + ) + await asyncio.wait_for(proc.wait(), timeout=30) + if proc.returncode != 0: + stderr = await proc.stderr.read() if proc.stderr else b"" + logger.warning("[QQ] ffmpeg failed for %s: %s", + Path(src_path).name, stderr[:200].decode(errors="replace")) + return None + except (asyncio.TimeoutError, FileNotFoundError) as exc: + logger.warning("[QQ] ffmpeg conversion error: %s", exc) + return None + + if not Path(wav_path).exists() or Path(wav_path).stat().st_size <= 44: + logger.warning("[QQ] ffmpeg produced no/small output for %s", Path(src_path).name) + return None + logger.info("[QQ] ffmpeg converted %s to wav (%d bytes)", + Path(src_path).name, Path(wav_path).stat().st_size) + return wav_path + + def _resolve_stt_config(self) -> Optional[Dict[str, str]]: + """Resolve STT backend configuration from config/environment. + + Priority: + 1. Plugin-specific: ``channels.qqbot.stt`` in config.yaml → ``self.config.extra["stt"]`` + 2. QQ-specific env vars: ``QQ_STT_API_KEY`` / ``QQ_STT_BASE_URL`` / ``QQ_STT_MODEL`` + 3. Return None if nothing is configured (STT will be skipped, QQ built-in ASR still works). + """ + extra = self.config.extra or {} + + # 1. Plugin-specific STT config (matches OpenClaw's channels.qqbot.stt) + stt_cfg = extra.get("stt") + if isinstance(stt_cfg, dict) and stt_cfg.get("enabled") is not False: + base_url = stt_cfg.get("baseUrl") or stt_cfg.get("base_url", "") + api_key = stt_cfg.get("apiKey") or stt_cfg.get("api_key", "") + model = stt_cfg.get("model", "") + if base_url and api_key: + return { + "base_url": base_url.rstrip("/"), + "api_key": api_key, + "model": model or "whisper-1", + } + # Provider-only config: just model name, use default provider + if api_key: + provider = stt_cfg.get("provider", "zai") + # Map provider to base URL + _PROVIDER_BASE_URLS = { + "zai": "https://open.bigmodel.cn/api/coding/paas/v4", + "openai": "https://api.openai.com/v1", + "glm": "https://open.bigmodel.cn/api/coding/paas/v4", + } + base_url = _PROVIDER_BASE_URLS.get(provider, "") + if base_url: + return { + "base_url": base_url, + "api_key": api_key, + "model": model or ("glm-asr" if provider in ("zai", "glm") else "whisper-1"), + } + + # 2. QQ-specific env vars (set by `hermes setup gateway` / `hermes gateway`) + qq_stt_key = os.getenv("QQ_STT_API_KEY", "") + if qq_stt_key: + base_url = os.getenv( + "QQ_STT_BASE_URL", + "https://open.bigmodel.cn/api/coding/paas/v4", + ) + model = os.getenv("QQ_STT_MODEL", "glm-asr") + return { + "base_url": base_url.rstrip("/"), + "api_key": qq_stt_key, + "model": model, + } + + return None + + async def _call_stt(self, wav_path: str) -> Optional[str]: + """Call an OpenAI-compatible STT API to transcribe a wav file. + + Uses the provider configured in ``channels.qqbot.stt`` config, + falling back to QQ's built-in ``asr_refer_text`` if not configured. + Returns None if STT is not configured or the call fails. + """ + stt_cfg = self._resolve_stt_config() + if not stt_cfg: + logger.warning("[QQ] STT not configured (no stt config or QQ_STT_API_KEY)") + return None + + base_url = stt_cfg["base_url"] + api_key = stt_cfg["api_key"] + model = stt_cfg["model"] + + try: + with open(wav_path, "rb") as f: + resp = await self._http_client.post( + f"{base_url}/audio/transcriptions", + headers={"Authorization": f"Bearer {api_key}"}, + files={"file": (Path(wav_path).name, f, "audio/wav")}, + data={"model": model}, + timeout=30.0, + ) + resp.raise_for_status() + result = resp.json() + # Zhipu/GLM format: {"choices": [{"message": {"content": "transcript text"}}]} + choices = result.get("choices", []) + if choices: + content = choices[0].get("message", {}).get("content", "") + if content.strip(): + return content.strip() + # OpenAI/Whisper format: {"text": "transcript text"} + text = result.get("text", "") + if text.strip(): + return text.strip() + return None + except (httpx.HTTPStatusError, IOError) as exc: + logger.warning("[QQ] STT API call failed (model=%s, base=%s): %s", + model, base_url[:50], exc) + return None + + async def _convert_audio_to_wav(self, audio_data: bytes, source_url: str) -> Optional[str]: + """Convert audio bytes to .wav using pilk (SILK) or ffmpeg, caching the result.""" + import tempfile + + # Determine source format from magic bytes or URL + ext = Path(urlparse(source_url).path).suffix.lower() if urlparse(source_url).path else "" + if not ext or ext not in (".silk", ".amr", ".mp3", ".wav", ".ogg", ".m4a", ".aac", ".flac"): + ext = self._guess_ext_from_data(audio_data) + + with tempfile.NamedTemporaryFile(suffix=ext, delete=False) as tmp_src: + tmp_src.write(audio_data) + src_path = tmp_src.name + + wav_path = src_path.rsplit(".", 1)[0] + ".wav" + try: + is_silk = ext == ".silk" or self._looks_like_silk(audio_data) + if is_silk: + result = await self._convert_silk_to_wav(src_path, wav_path) + else: + result = await self._convert_ffmpeg_to_wav(src_path, wav_path) + + if not result: + logger.warning("[%s] audio conversion failed for %s (format=%s)", + self.name, source_url[:60], ext) + return cache_document_from_bytes(audio_data, f"qq_voice{ext}") + except Exception: + return cache_document_from_bytes(audio_data, f"qq_voice{ext}") + finally: + try: + os.unlink(src_path) + except OSError: + pass + + # Verify output and cache + try: + wav_data = Path(wav_path).read_bytes() + os.unlink(wav_path) + return cache_document_from_bytes(wav_data, "qq_voice.wav") + except Exception as exc: + logger.debug("[%s] Failed to read converted wav: %s", self.name, exc) + return None + + # ------------------------------------------------------------------ + # Outbound messaging — REST API + # ------------------------------------------------------------------ + + async def _api_request( + self, + method: str, + path: str, + body: Optional[Dict[str, Any]] = None, + timeout: float = DEFAULT_API_TIMEOUT, + ) -> Dict[str, Any]: + """Make an authenticated REST API request to QQ Bot API.""" + if not self._http_client: + raise RuntimeError("HTTP client not initialized — not connected?") + + token = await self._ensure_token() + headers = { + "Authorization": f"QQBot {token}", + "Content-Type": "application/json", + } + + try: + resp = await self._http_client.request( + method, + f"{API_BASE}{path}", + headers=headers, + json=body, + timeout=timeout, + ) + data = resp.json() + if resp.status_code >= 400: + raise RuntimeError( + f"QQ Bot API error [{resp.status_code}] {path}: " + f"{data.get('message', data)}" + ) + return data + except httpx.TimeoutException as exc: + raise RuntimeError(f"QQ Bot API timeout [{path}]: {exc}") from exc + + async def _upload_media( + self, + target_type: str, + target_id: str, + file_type: int, + url: Optional[str] = None, + file_data: Optional[str] = None, + srv_send_msg: bool = False, + file_name: Optional[str] = None, + ) -> Dict[str, Any]: + """Upload media and return file_info.""" + path = f"/v2/users/{target_id}/files" if target_type == "c2c" else f"/v2/groups/{target_id}/files" + + body: Dict[str, Any] = { + "file_type": file_type, + "srv_send_msg": srv_send_msg, + } + if url: + body["url"] = url + elif file_data: + body["file_data"] = file_data + if file_type == MEDIA_TYPE_FILE and file_name: + body["file_name"] = file_name + + # Retry transient upload failures + last_exc = None + for attempt in range(3): + try: + return await self._api_request("POST", path, body, timeout=FILE_UPLOAD_TIMEOUT) + except RuntimeError as exc: + last_exc = exc + err_msg = str(exc) + if any(kw in err_msg for kw in ("400", "401", "Invalid", "timeout", "Timeout")): + raise + if attempt < 2: + await asyncio.sleep(1.5 * (attempt + 1)) + + raise last_exc # type: ignore[misc] + + async def send( + self, + chat_id: str, + content: str, + reply_to: Optional[str] = None, + metadata: Optional[Dict[str, Any]] = None, + ) -> SendResult: + """Send a text or markdown message to a QQ user or group.""" + del metadata + + if not self.is_connected: + return SendResult(success=False, error="Not connected") + + if not content or not content.strip(): + return SendResult(success=True) + + try: + # Determine routing + chat_type = self._guess_chat_type(chat_id) + is_reply = bool(reply_to) + + if chat_type == "c2c": + return await self._send_c2c_text(chat_id, content, reply_to) + elif chat_type == "group": + return await self._send_group_text(chat_id, content, reply_to) + elif chat_type == "guild": + return await self._send_guild_text(chat_id, content, reply_to) + else: + return SendResult(success=False, error=f"Unknown chat type for {chat_id}") + except asyncio.TimeoutError: + return SendResult(success=False, error="Timeout sending message to QQ") + except Exception as exc: + logger.error("[%s] Send failed: %s", self.name, exc) + return SendResult(success=False, error=str(exc)) + + async def _send_c2c_text( + self, openid: str, content: str, reply_to: Optional[str] = None + ) -> SendResult: + """Send text to a C2C user via REST API.""" + msg_seq = self._next_msg_seq(reply_to or openid) + body = self._build_text_body(content, reply_to) + if reply_to: + body["msg_id"] = reply_to + + data = await self._api_request("POST", f"/v2/users/{openid}/messages", body) + msg_id = str(data.get("id", uuid.uuid4().hex[:12])) + return SendResult(success=True, message_id=msg_id, raw_response=data) + + async def _send_group_text( + self, group_openid: str, content: str, reply_to: Optional[str] = None + ) -> SendResult: + """Send text to a group via REST API.""" + msg_seq = self._next_msg_seq(reply_to or group_openid) + body = self._build_text_body(content, reply_to) + if reply_to: + body["msg_id"] = reply_to + + data = await self._api_request("POST", f"/v2/groups/{group_openid}/messages", body) + msg_id = str(data.get("id", uuid.uuid4().hex[:12])) + return SendResult(success=True, message_id=msg_id, raw_response=data) + + async def _send_guild_text( + self, channel_id: str, content: str, reply_to: Optional[str] = None + ) -> SendResult: + """Send text to a guild channel via REST API.""" + body: Dict[str, Any] = {"content": content[:self.MAX_MESSAGE_LENGTH]} + if reply_to: + body["msg_id"] = reply_to + + data = await self._api_request("POST", f"/channels/{channel_id}/messages", body) + msg_id = str(data.get("id", uuid.uuid4().hex[:12])) + return SendResult(success=True, message_id=msg_id, raw_response=data) + + def _build_text_body(self, content: str, reply_to: Optional[str] = None) -> Dict[str, Any]: + """Build the message body for C2C/group text sending.""" + msg_seq = self._next_msg_seq(reply_to or "default") + + if self._markdown_support: + body: Dict[str, Any] = { + "markdown": {"content": content[:self.MAX_MESSAGE_LENGTH]}, + "msg_type": MSG_TYPE_MARKDOWN, + "msg_seq": msg_seq, + } + else: + body = { + "content": content[:self.MAX_MESSAGE_LENGTH], + "msg_type": MSG_TYPE_TEXT, + "msg_seq": msg_seq, + } + + if reply_to: + # For non-markdown mode, add message_reference + if not self._markdown_support: + body["message_reference"] = {"message_id": reply_to} + + return body + + # ------------------------------------------------------------------ + # Native media sending + # ------------------------------------------------------------------ + + async def send_image( + self, + chat_id: str, + image_url: str, + caption: Optional[str] = None, + reply_to: Optional[str] = None, + metadata: Optional[Dict[str, Any]] = None, + ) -> SendResult: + """Send an image natively via QQ Bot API upload.""" + del metadata + + result = await self._send_media(chat_id, image_url, MEDIA_TYPE_IMAGE, "image", caption, reply_to) + if result.success or not self._is_url(image_url): + return result + + # Fallback to text URL + logger.warning("[%s] Image send failed, falling back to text: %s", self.name, result.error) + fallback = f"{caption}\n{image_url}" if caption else image_url + return await self.send(chat_id=chat_id, content=fallback, reply_to=reply_to) + + async def send_image_file( + self, + chat_id: str, + image_path: str, + caption: Optional[str] = None, + reply_to: Optional[str] = None, + **kwargs, + ) -> SendResult: + """Send a local image file natively.""" + del kwargs + return await self._send_media(chat_id, image_path, MEDIA_TYPE_IMAGE, "image", caption, reply_to) + + async def send_voice( + self, + chat_id: str, + audio_path: str, + caption: Optional[str] = None, + reply_to: Optional[str] = None, + **kwargs, + ) -> SendResult: + """Send a voice message natively.""" + del kwargs + return await self._send_media(chat_id, audio_path, MEDIA_TYPE_VOICE, "voice", caption, reply_to) + + async def send_video( + self, + chat_id: str, + video_path: str, + caption: Optional[str] = None, + reply_to: Optional[str] = None, + **kwargs, + ) -> SendResult: + """Send a video natively.""" + del kwargs + return await self._send_media(chat_id, video_path, MEDIA_TYPE_VIDEO, "video", caption, reply_to) + + async def send_document( + self, + chat_id: str, + file_path: str, + caption: Optional[str] = None, + file_name: Optional[str] = None, + reply_to: Optional[str] = None, + **kwargs, + ) -> SendResult: + """Send a file/document natively.""" + del kwargs + return await self._send_media(chat_id, file_path, MEDIA_TYPE_FILE, "file", caption, reply_to, + file_name=file_name) + + async def _send_media( + self, + chat_id: str, + media_source: str, + file_type: int, + kind: str, + caption: Optional[str] = None, + reply_to: Optional[str] = None, + file_name: Optional[str] = None, + ) -> SendResult: + """Upload media and send as a native message.""" + if not self.is_connected: + return SendResult(success=False, error="Not connected") + + try: + # Resolve media source + data, content_type, resolved_name = await self._load_media(media_source, file_name) + + # Route + chat_type = self._guess_chat_type(chat_id) + target_path = f"/v2/users/{chat_id}/files" if chat_type == "c2c" else f"/v2/groups/{chat_id}/files" + + if chat_type == "guild": + # Guild channels don't support native media upload in the same way + # Send as URL fallback + return SendResult(success=False, error="Guild media send not supported via this path") + + # Upload + upload = await self._upload_media( + chat_type, chat_id, file_type, + file_data=data if not self._is_url(media_source) else None, + url=media_source if self._is_url(media_source) else None, + srv_send_msg=False, + file_name=resolved_name if file_type == MEDIA_TYPE_FILE else None, + ) + + file_info = upload.get("file_info") + if not file_info: + return SendResult(success=False, error=f"Upload returned no file_info: {upload}") + + # Send media message + msg_seq = self._next_msg_seq(chat_id) + body: Dict[str, Any] = { + "msg_type": MSG_TYPE_MEDIA, + "media": {"file_info": file_info}, + "msg_seq": msg_seq, + } + if caption: + body["content"] = caption[:self.MAX_MESSAGE_LENGTH] + if reply_to: + body["msg_id"] = reply_to + + send_data = await self._api_request( + "POST", + f"/v2/users/{chat_id}/messages" if chat_type == "c2c" else f"/v2/groups/{chat_id}/messages", + body, + ) + return SendResult( + success=True, + message_id=str(send_data.get("id", uuid.uuid4().hex[:12])), + raw_response=send_data, + ) + except Exception as exc: + logger.error("[%s] Media send failed: %s", self.name, exc) + return SendResult(success=False, error=str(exc)) + + async def _load_media( + self, source: str, file_name: Optional[str] = None + ) -> Tuple[str, str, str]: + """Load media from URL or local path. Returns (base64_or_url, content_type, filename).""" + source = str(source).strip() + if not source: + raise ValueError("Media source is required") + + parsed = urlparse(source) + if parsed.scheme in ("http", "https"): + # For URLs, pass through directly to the upload API + content_type = mimetypes.guess_type(source)[0] or "application/octet-stream" + resolved_name = file_name or Path(parsed.path).name or "media" + return source, content_type, resolved_name + + # Local file — encode as raw base64 for QQ Bot API file_data field. + # The QQ API expects plain base64, NOT a data URI. + local_path = Path(source).expanduser() + if not local_path.is_absolute(): + local_path = (Path.cwd() / local_path).resolve() + + if not local_path.exists() or not local_path.is_file(): + # Guard against placeholder paths like "" that the LLM + # sometimes emits instead of real file paths. + if source.startswith("<") or len(source) < 3: + raise ValueError( + f"Invalid media source (looks like a placeholder): {source!r}" + ) + raise FileNotFoundError(f"Media file not found: {local_path}") + + raw = local_path.read_bytes() + resolved_name = file_name or local_path.name + content_type = mimetypes.guess_type(str(local_path))[0] or "application/octet-stream" + b64 = base64.b64encode(raw).decode("ascii") + return b64, content_type, resolved_name + + # ------------------------------------------------------------------ + # Typing indicator + # ------------------------------------------------------------------ + + async def send_typing(self, chat_id: str, metadata=None) -> None: + """Send an input notify to a C2C user (only supported for C2C).""" + del metadata + + if not self.is_connected: + return + + # Only C2C supports input notify + chat_type = self._guess_chat_type(chat_id) + if chat_type != "c2c": + return + + try: + msg_seq = self._next_msg_seq(chat_id) + body = { + "msg_type": MSG_TYPE_INPUT_NOTIFY, + "input_notify": {"input_type": 1, "input_second": 60}, + "msg_seq": msg_seq, + } + await self._api_request("POST", f"/v2/users/{chat_id}/messages", body) + except Exception as exc: + logger.debug("[%s] send_typing failed: %s", self.name, exc) + + # ------------------------------------------------------------------ + # Format + # ------------------------------------------------------------------ + + def format_message(self, content: str) -> str: + """Format message for QQ. + + When markdown_support is enabled, content is sent as-is (QQ renders it). + When disabled, strip common Markdown patterns for plain-text display. + """ + if self._markdown_support: + return content + + # Strip markdown formatting for plain text + text = content + # Bold/italic/strikethrough + import re + text = re.sub(r'\*{1,2}([^*]+)\*{1,2}', r'\1', text) + text = re.sub(r'_{1,2}([^_]+)_{1,2}', r'\1', text) + text = re.sub(r'~~([^~]+)~~', r'\1', text) + # Code blocks + text = re.sub(r'```[\s\S]*?```', lambda m: m.group(0).split('\n', 1)[-1].rsplit('```', 1)[0] if '\n' in m.group(0) else m.group(0).replace('`', ''), text) + text = re.sub(r'`([^`]+)`', r'\1', text) + # Links + text = re.sub(r'\[([^\]]+)\]\(([^)]+)\)', r'\1 (\2)', text) + # Headers + text = re.sub(r'^#{1,6}\s+', '', text, flags=re.MULTILINE) + return text + + # ------------------------------------------------------------------ + # Chat info + # ------------------------------------------------------------------ + + async def get_chat_info(self, chat_id: str) -> Dict[str, Any]: + """Return chat info based on chat type heuristics.""" + chat_type = self._guess_chat_type(chat_id) + return { + "name": chat_id, + "type": "group" if chat_type in ("group", "guild") else "dm", + } + + # ------------------------------------------------------------------ + # Helpers + # ------------------------------------------------------------------ + + @staticmethod + def _is_url(source: str) -> bool: + return urlparse(str(source)).scheme in ("http", "https") + + def _guess_chat_type(self, chat_id: str) -> str: + """Determine chat type from stored inbound metadata, fallback to 'c2c'.""" + if chat_id in self._chat_type_map: + return self._chat_type_map[chat_id] + return "c2c" + + @staticmethod + def _strip_at_mention(content: str) -> str: + """Strip the @bot mention prefix from group message content.""" + # QQ group @-messages may have the bot's QQ/ID as prefix + import re + stripped = re.sub(r'^@\S+\s*', '', content.strip()) + return stripped + + def _is_dm_allowed(self, user_id: str) -> bool: + if self._dm_policy == "disabled": + return False + if self._dm_policy == "allowlist": + return self._entry_matches(self._allow_from, user_id) + return True + + def _is_group_allowed(self, group_id: str, user_id: str) -> bool: + if self._group_policy == "disabled": + return False + if self._group_policy == "allowlist": + return self._entry_matches(self._group_allow_from, group_id) + return True + + @staticmethod + def _entry_matches(entries: List[str], target: str) -> bool: + normalized_target = str(target).strip().lower() + for entry in entries: + normalized = str(entry).strip().lower() + if normalized == "*" or normalized == normalized_target: + return True + return False + + def _is_duplicate(self, msg_id: str) -> bool: + now = time.time() + if len(self._seen_messages) > DEDUP_MAX_SIZE: + cutoff = now - DEDUP_WINDOW_SECONDS + self._seen_messages = { + key: ts for key, ts in self._seen_messages.items() if ts > cutoff + } + if msg_id in self._seen_messages: + return True + self._seen_messages[msg_id] = now + return False diff --git a/gateway/run.py b/gateway/run.py index ebaa0447b..a1d3bb770 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -1499,6 +1499,7 @@ class GatewayRunner: "WECOM_CALLBACK_ALLOWED_USERS", "WEIXIN_ALLOWED_USERS", "BLUEBUBBLES_ALLOWED_USERS", + "QQ_ALLOWED_USERS", "GATEWAY_ALLOWED_USERS") ) _allow_all = os.getenv("GATEWAY_ALLOW_ALL_USERS", "").lower() in ("true", "1", "yes") or any( @@ -1512,7 +1513,8 @@ class GatewayRunner: "WECOM_ALLOW_ALL_USERS", "WECOM_CALLBACK_ALLOW_ALL_USERS", "WEIXIN_ALLOW_ALL_USERS", - "BLUEBUBBLES_ALLOW_ALL_USERS") + "BLUEBUBBLES_ALLOW_ALL_USERS", + "QQ_ALLOW_ALL_USERS") ) if not _any_allowlist and not _allow_all: logger.warning( @@ -2255,8 +2257,12 @@ class GatewayRunner: return None return BlueBubblesAdapter(config) + elif platform == Platform.QQ: + from gateway.platforms.qq import QQAdapter + return QQAdapter(config) + return None - + def _is_user_authorized(self, source: SessionSource) -> bool: """ Check if a user is authorized to use the bot. @@ -2296,6 +2302,7 @@ class GatewayRunner: Platform.WECOM_CALLBACK: "WECOM_CALLBACK_ALLOWED_USERS", Platform.WEIXIN: "WEIXIN_ALLOWED_USERS", Platform.BLUEBUBBLES: "BLUEBUBBLES_ALLOWED_USERS", + Platform.QQ: "QQ_ALLOWED_USERS", } platform_allow_all_map = { Platform.TELEGRAM: "TELEGRAM_ALLOW_ALL_USERS", @@ -2313,6 +2320,7 @@ class GatewayRunner: Platform.WECOM_CALLBACK: "WECOM_CALLBACK_ALLOW_ALL_USERS", Platform.WEIXIN: "WEIXIN_ALLOW_ALL_USERS", Platform.BLUEBUBBLES: "BLUEBUBBLES_ALLOW_ALL_USERS", + Platform.QQ: "QQ_ALLOW_ALL_USERS", } # Per-platform allow-all flag (e.g., DISCORD_ALLOW_ALL_USERS=true) diff --git a/hermes_cli/config.py b/hermes_cli/config.py index 64a5bd1a9..807bf2633 100644 --- a/hermes_cli/config.py +++ b/hermes_cli/config.py @@ -45,6 +45,9 @@ _EXTRA_ENV_KEYS = frozenset({ "WEIXIN_HOME_CHANNEL", "WEIXIN_HOME_CHANNEL_NAME", "WEIXIN_DM_POLICY", "WEIXIN_GROUP_POLICY", "WEIXIN_ALLOWED_USERS", "WEIXIN_GROUP_ALLOWED_USERS", "WEIXIN_ALLOW_ALL_USERS", "BLUEBUBBLES_SERVER_URL", "BLUEBUBBLES_PASSWORD", + "QQ_APP_ID", "QQ_CLIENT_SECRET", + "QQ_ALLOWED_USERS", "QQ_GROUP_ALLOWED_USERS", "QQ_ALLOW_ALL_USERS", + "QQ_HOME_CHANNEL", "QQ_HOME_CHANNEL_NAME", "QQ_SANDBOX", "TERMINAL_ENV", "TERMINAL_SSH_KEY", "TERMINAL_SSH_PORT", "WHATSAPP_MODE", "WHATSAPP_ENABLED", "MATTERMOST_HOME_CHANNEL", "MATTERMOST_REPLY_MODE", @@ -1331,6 +1334,53 @@ OPTIONAL_ENV_VARS = { "password": False, "category": "messaging", }, + "BLUEBUBBLES_ALLOW_ALL_USERS": { + "description": "Allow all BlueBubbles users without allowlist", + "prompt": "Allow All BlueBubbles Users", + "category": "messaging", + }, + "QQ_APP_ID": { + "description": "QQ Bot App ID from QQ Open Platform (q.qq.com)", + "prompt": "QQ App ID", + "url": "https://q.qq.com", + "category": "messaging", + }, + "QQ_CLIENT_SECRET": { + "description": "QQ Bot Client Secret from QQ Open Platform", + "prompt": "QQ Client Secret", + "password": True, + "category": "messaging", + }, + "QQ_ALLOWED_USERS": { + "description": "Comma-separated QQ user IDs allowed to use the bot", + "prompt": "QQ Allowed Users", + "category": "messaging", + }, + "QQ_GROUP_ALLOWED_USERS": { + "description": "Comma-separated QQ group IDs allowed to interact with the bot", + "prompt": "QQ Group Allowed Users", + "category": "messaging", + }, + "QQ_ALLOW_ALL_USERS": { + "description": "Allow all QQ users without an allowlist (true/false)", + "prompt": "Allow All QQ Users", + "category": "messaging", + }, + "QQ_HOME_CHANNEL": { + "description": "Default QQ channel/group for cron delivery and notifications", + "prompt": "QQ Home Channel", + "category": "messaging", + }, + "QQ_HOME_CHANNEL_NAME": { + "description": "Display name for the QQ home channel", + "prompt": "QQ Home Channel Name", + "category": "messaging", + }, + "QQ_SANDBOX": { + "description": "Enable QQ sandbox mode for development testing (true/false)", + "prompt": "QQ Sandbox Mode", + "category": "messaging", + }, "GATEWAY_ALLOW_ALL_USERS": { "description": "Allow all users to interact with messaging bots (true/false). Default: false.", "prompt": "Allow all users (true/false)", diff --git a/hermes_cli/gateway.py b/hermes_cli/gateway.py index 628319d57..308dc9209 100644 --- a/hermes_cli/gateway.py +++ b/hermes_cli/gateway.py @@ -1913,6 +1913,30 @@ _PLATFORMS = [ "help": "Phone number or Apple ID to deliver cron results and notifications to."}, ], }, + { + "key": "qq", + "label": "QQ Bot", + "emoji": "💬", + "token_var": "QQ_APP_ID", + "setup_instructions": [ + "1. Go to https://open.qq.com/ and create an application", + "2. In the application dashboard, create a QQ Bot", + "3. Note your App ID and App Secret", + "4. Configure the WebSocket Gateway URL in QQ Open Platform settings", + "5. Set up message push URL if needed for event callbacks", + ], + "vars": [ + {"name": "QQ_APP_ID", "prompt": "App ID", "password": False, + "help": "Paste the App ID from QQ Open Platform."}, + {"name": "QQ_CLIENT_SECRET", "prompt": "App Secret", "password": True, + "help": "Paste the App Secret from QQ Open Platform."}, + {"name": "QQ_ALLOWED_USERS", "prompt": "Allowed QQ user IDs (comma-separated, or empty for DM pairing)", "password": False, + "is_allowlist": True, + "help": "Optional — pre-authorize specific users. Leave empty to use DM pairing instead."}, + {"name": "QQ_HOME_CHANNEL", "prompt": "Home channel (QQ group ID for cron/notifications, or empty)", "password": False, + "help": "QQ group ID to deliver cron results and notifications to."}, + ], + }, ] diff --git a/hermes_cli/platforms.py b/hermes_cli/platforms.py index df47ed095..7768fe8cd 100644 --- a/hermes_cli/platforms.py +++ b/hermes_cli/platforms.py @@ -35,6 +35,7 @@ PLATFORMS: OrderedDict[str, PlatformInfo] = OrderedDict([ ("wecom", PlatformInfo(label="💬 WeCom", default_toolset="hermes-wecom")), ("wecom_callback", PlatformInfo(label="💬 WeCom Callback", default_toolset="hermes-wecom-callback")), ("weixin", PlatformInfo(label="💬 Weixin", default_toolset="hermes-weixin")), + ("qq", PlatformInfo(label="💬 QQ", default_toolset="hermes-qq")), ("webhook", PlatformInfo(label="🔗 Webhook", default_toolset="hermes-webhook")), ("api_server", PlatformInfo(label="🌐 API Server", default_toolset="hermes-api-server")), ]) diff --git a/hermes_cli/setup.py b/hermes_cli/setup.py index 6d0ec0f45..9d61c10ad 100644 --- a/hermes_cli/setup.py +++ b/hermes_cli/setup.py @@ -2034,6 +2034,15 @@ def _setup_bluebubbles(): print_info(" Install: https://docs.bluebubbles.app/helper-bundle/installation") +def _setup_qq(): + """Configure QQ Bot (Official API v2) via standard platform setup.""" + from hermes_cli.gateway import _PLATFORMS + qq_platform = next((p for p in _PLATFORMS if p["key"] == "qq"), None) + if qq_platform: + from hermes_cli.gateway import _setup_standard_platform + _setup_standard_platform(qq_platform) + + def _setup_webhooks(): """Configure webhook integration.""" print_header("Webhooks") @@ -2097,6 +2106,7 @@ _GATEWAY_PLATFORMS = [ ("WeCom Callback (Self-Built App)", "WECOM_CALLBACK_CORP_ID", _setup_wecom_callback), ("Weixin (WeChat)", "WEIXIN_ACCOUNT_ID", _setup_weixin), ("BlueBubbles (iMessage)", "BLUEBUBBLES_SERVER_URL", _setup_bluebubbles), + ("QQ Bot", "QQ_APP_ID", _setup_qq), ("Webhooks (GitHub, GitLab, etc.)", "WEBHOOK_ENABLED", _setup_webhooks), ] diff --git a/hermes_cli/status.py b/hermes_cli/status.py index a7745d65f..4ea90ed1e 100644 --- a/hermes_cli/status.py +++ b/hermes_cli/status.py @@ -305,6 +305,7 @@ def show_status(args): "WeCom Callback": ("WECOM_CALLBACK_CORP_ID", None), "Weixin": ("WEIXIN_ACCOUNT_ID", "WEIXIN_HOME_CHANNEL"), "BlueBubbles": ("BLUEBUBBLES_SERVER_URL", "BLUEBUBBLES_HOME_CHANNEL"), + "QQ": ("QQ_APP_ID", "QQ_HOME_CHANNEL"), } for name, (token_var, home_var) in platforms.items(): diff --git a/hermes_cli/tools_config.py b/hermes_cli/tools_config.py index 343007cab..97956a6de 100644 --- a/hermes_cli/tools_config.py +++ b/hermes_cli/tools_config.py @@ -426,6 +426,8 @@ def _get_enabled_platforms() -> List[str]: enabled.append("slack") if get_env_value("WHATSAPP_ENABLED"): enabled.append("whatsapp") + if get_env_value("QQ_APP_ID"): + enabled.append("qq") return enabled diff --git a/tests/gateway/test_qq.py b/tests/gateway/test_qq.py new file mode 100644 index 000000000..a3fc58017 --- /dev/null +++ b/tests/gateway/test_qq.py @@ -0,0 +1,460 @@ +"""Tests for the QQ Bot platform adapter.""" + +import json +import os +import sys +from unittest import mock + +import pytest + +from gateway.config import Platform, PlatformConfig + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _make_config(**extra): + """Build a PlatformConfig(enabled=True, extra=extra) for testing.""" + return PlatformConfig(enabled=True, extra=extra) + + +# --------------------------------------------------------------------------- +# check_qq_requirements +# --------------------------------------------------------------------------- + +class TestQQRequirements: + def test_returns_bool(self): + from gateway.platforms.qq import check_qq_requirements + result = check_qq_requirements() + assert isinstance(result, bool) + + +# --------------------------------------------------------------------------- +# QQAdapter.__init__ +# --------------------------------------------------------------------------- + +class TestQQAdapterInit: + def _make(self, **extra): + from gateway.platforms.qq import QQAdapter + return QQAdapter(_make_config(**extra)) + + def test_basic_attributes(self): + adapter = self._make(app_id="123", client_secret="sec") + assert adapter._app_id == "123" + assert adapter._client_secret == "sec" + + def test_env_fallback(self): + with mock.patch.dict(os.environ, {"QQ_APP_ID": "env_id", "QQ_CLIENT_SECRET": "env_sec"}, clear=False): + adapter = self._make() + assert adapter._app_id == "env_id" + assert adapter._client_secret == "env_sec" + + def test_env_fallback_extra_wins(self): + with mock.patch.dict(os.environ, {"QQ_APP_ID": "env_id"}, clear=False): + adapter = self._make(app_id="extra_id", client_secret="sec") + assert adapter._app_id == "extra_id" + + def test_dm_policy_default(self): + adapter = self._make(app_id="a", client_secret="b") + assert adapter._dm_policy == "open" + + def test_dm_policy_explicit(self): + adapter = self._make(app_id="a", client_secret="b", dm_policy="allowlist") + assert adapter._dm_policy == "allowlist" + + def test_group_policy_default(self): + adapter = self._make(app_id="a", client_secret="b") + assert adapter._group_policy == "open" + + def test_allow_from_parsing_string(self): + adapter = self._make(app_id="a", client_secret="b", allow_from="x, y , z") + assert adapter._allow_from == ["x", "y", "z"] + + def test_allow_from_parsing_list(self): + adapter = self._make(app_id="a", client_secret="b", allow_from=["a", "b"]) + assert adapter._allow_from == ["a", "b"] + + def test_allow_from_default_empty(self): + adapter = self._make(app_id="a", client_secret="b") + assert adapter._allow_from == [] + + def test_group_allow_from(self): + adapter = self._make(app_id="a", client_secret="b", group_allow_from="g1,g2") + assert adapter._group_allow_from == ["g1", "g2"] + + def test_markdown_support_default(self): + adapter = self._make(app_id="a", client_secret="b") + assert adapter._markdown_support is True + + def test_markdown_support_false(self): + adapter = self._make(app_id="a", client_secret="b", markdown_support=False) + assert adapter._markdown_support is False + + def test_name_property(self): + adapter = self._make(app_id="a", client_secret="b") + assert adapter.name == "QQ" + + +# --------------------------------------------------------------------------- +# _coerce_list +# --------------------------------------------------------------------------- + +class TestCoerceList: + def _fn(self, value): + from gateway.platforms.qq import _coerce_list + return _coerce_list(value) + + def test_none(self): + assert self._fn(None) == [] + + def test_string(self): + assert self._fn("a, b ,c") == ["a", "b", "c"] + + def test_list(self): + assert self._fn(["x", "y"]) == ["x", "y"] + + def test_empty_string(self): + assert self._fn("") == [] + + def test_tuple(self): + assert self._fn(("a", "b")) == ["a", "b"] + + def test_single_item_string(self): + assert self._fn("hello") == ["hello"] + + +# --------------------------------------------------------------------------- +# _is_voice_content_type +# --------------------------------------------------------------------------- + +class TestIsVoiceContentType: + def _fn(self, content_type, filename): + from gateway.platforms.qq import QQAdapter + return QQAdapter._is_voice_content_type(content_type, filename) + + def test_voice_content_type(self): + assert self._fn("voice", "msg.silk") is True + + def test_audio_content_type(self): + assert self._fn("audio/mp3", "file.mp3") is True + + def test_voice_extension(self): + assert self._fn("", "file.silk") is True + + def test_non_voice(self): + assert self._fn("image/jpeg", "photo.jpg") is False + + def test_audio_extension_amr(self): + assert self._fn("", "recording.amr") is True + + +# --------------------------------------------------------------------------- +# _strip_at_mention +# --------------------------------------------------------------------------- + +class TestStripAtMention: + def _fn(self, content): + from gateway.platforms.qq import QQAdapter + return QQAdapter._strip_at_mention(content) + + def test_removes_mention(self): + result = self._fn("@BotUser hello there") + assert result == "hello there" + + def test_no_mention(self): + result = self._fn("just text") + assert result == "just text" + + def test_empty_string(self): + assert self._fn("") == "" + + def test_only_mention(self): + assert self._fn("@Someone ") == "" + + +# --------------------------------------------------------------------------- +# _is_dm_allowed +# --------------------------------------------------------------------------- + +class TestDmAllowed: + def _make_adapter(self, **extra): + from gateway.platforms.qq import QQAdapter + return QQAdapter(_make_config(**extra)) + + def test_open_policy(self): + adapter = self._make_adapter(app_id="a", client_secret="b", dm_policy="open") + assert adapter._is_dm_allowed("any_user") is True + + def test_disabled_policy(self): + adapter = self._make_adapter(app_id="a", client_secret="b", dm_policy="disabled") + assert adapter._is_dm_allowed("any_user") is False + + def test_allowlist_match(self): + adapter = self._make_adapter(app_id="a", client_secret="b", dm_policy="allowlist", allow_from="user1,user2") + assert adapter._is_dm_allowed("user1") is True + + def test_allowlist_no_match(self): + adapter = self._make_adapter(app_id="a", client_secret="b", dm_policy="allowlist", allow_from="user1,user2") + assert adapter._is_dm_allowed("user3") is False + + def test_allowlist_wildcard(self): + adapter = self._make_adapter(app_id="a", client_secret="b", dm_policy="allowlist", allow_from="*") + assert adapter._is_dm_allowed("anyone") is True + + +# --------------------------------------------------------------------------- +# _is_group_allowed +# --------------------------------------------------------------------------- + +class TestGroupAllowed: + def _make_adapter(self, **extra): + from gateway.platforms.qq import QQAdapter + return QQAdapter(_make_config(**extra)) + + def test_open_policy(self): + adapter = self._make_adapter(app_id="a", client_secret="b", group_policy="open") + assert adapter._is_group_allowed("grp1", "user1") is True + + def test_allowlist_match(self): + adapter = self._make_adapter(app_id="a", client_secret="b", group_policy="allowlist", group_allow_from="grp1") + assert adapter._is_group_allowed("grp1", "user1") is True + + def test_allowlist_no_match(self): + adapter = self._make_adapter(app_id="a", client_secret="b", group_policy="allowlist", group_allow_from="grp1") + assert adapter._is_group_allowed("grp2", "user1") is False + + +# --------------------------------------------------------------------------- +# _resolve_stt_config +# --------------------------------------------------------------------------- + +class TestResolveSTTConfig: + def _make_adapter(self, **extra): + from gateway.platforms.qq import QQAdapter + return QQAdapter(_make_config(**extra)) + + def test_no_config(self): + adapter = self._make_adapter(app_id="a", client_secret="b") + with mock.patch.dict(os.environ, {}, clear=True): + assert adapter._resolve_stt_config() is None + + def test_env_config(self): + adapter = self._make_adapter(app_id="a", client_secret="b") + with mock.patch.dict(os.environ, { + "QQ_STT_API_KEY": "key123", + "QQ_STT_BASE_URL": "https://example.com/v1", + "QQ_STT_MODEL": "my-model", + }, clear=True): + cfg = adapter._resolve_stt_config() + assert cfg is not None + assert cfg["api_key"] == "key123" + assert cfg["base_url"] == "https://example.com/v1" + assert cfg["model"] == "my-model" + + def test_extra_config(self): + stt_cfg = { + "baseUrl": "https://custom.api/v4", + "apiKey": "sk_extra", + "model": "glm-asr", + } + adapter = self._make_adapter(app_id="a", client_secret="b", stt=stt_cfg) + with mock.patch.dict(os.environ, {}, clear=True): + cfg = adapter._resolve_stt_config() + assert cfg is not None + assert cfg["base_url"] == "https://custom.api/v4" + assert cfg["api_key"] == "sk_extra" + assert cfg["model"] == "glm-asr" + + +# --------------------------------------------------------------------------- +# _detect_message_type +# --------------------------------------------------------------------------- + +class TestDetectMessageType: + def _fn(self, media_urls, media_types): + from gateway.platforms.qq import QQAdapter + return QQAdapter._detect_message_type(media_urls, media_types) + + def test_no_media(self): + from gateway.platforms.base import MessageType + assert self._fn([], []) == MessageType.TEXT + + def test_image(self): + from gateway.platforms.base import MessageType + assert self._fn(["file.jpg"], ["image/jpeg"]) == MessageType.PHOTO + + def test_voice(self): + from gateway.platforms.base import MessageType + assert self._fn(["voice.silk"], ["audio/silk"]) == MessageType.VOICE + + def test_video(self): + from gateway.platforms.base import MessageType + assert self._fn(["vid.mp4"], ["video/mp4"]) == MessageType.VIDEO + + +# --------------------------------------------------------------------------- +# QQCloseError +# --------------------------------------------------------------------------- + +class TestQQCloseError: + def test_attributes(self): + from gateway.platforms.qq import QQCloseError + err = QQCloseError(4004, "bad token") + assert err.code == 4004 + assert err.reason == "bad token" + + def test_code_none(self): + from gateway.platforms.qq import QQCloseError + err = QQCloseError(None, "") + assert err.code is None + + def test_string_to_int(self): + from gateway.platforms.qq import QQCloseError + err = QQCloseError("4914", "banned") + assert err.code == 4914 + assert err.reason == "banned" + + def test_message_format(self): + from gateway.platforms.qq import QQCloseError + err = QQCloseError(4008, "rate limit") + assert "4008" in str(err) + assert "rate limit" in str(err) + + +# --------------------------------------------------------------------------- +# _dispatch_payload +# --------------------------------------------------------------------------- + +class TestDispatchPayload: + def _make_adapter(self, **extra): + from gateway.platforms.qq import QQAdapter + adapter = QQAdapter(_make_config(**extra)) + return adapter + + def test_unknown_op(self): + adapter = self._make_adapter(app_id="a", client_secret="b") + # Should not raise + adapter._dispatch_payload({"op": 99, "d": {}}) + # last_seq should remain None + assert adapter._last_seq is None + + def test_op10_updates_heartbeat_interval(self): + adapter = self._make_adapter(app_id="a", client_secret="b") + adapter._dispatch_payload({"op": 10, "d": {"heartbeat_interval": 50000}}) + # Should be 50000 / 1000 * 0.8 = 40.0 + assert adapter._heartbeat_interval == 40.0 + + def test_op11_heartbeat_ack(self): + adapter = self._make_adapter(app_id="a", client_secret="b") + # Should not raise + adapter._dispatch_payload({"op": 11, "t": "HEARTBEAT_ACK", "s": 42}) + + def test_seq_tracking(self): + adapter = self._make_adapter(app_id="a", client_secret="b") + adapter._dispatch_payload({"op": 0, "t": "READY", "s": 100, "d": {}}) + assert adapter._last_seq == 100 + + def test_seq_increments(self): + adapter = self._make_adapter(app_id="a", client_secret="b") + adapter._dispatch_payload({"op": 0, "t": "READY", "s": 5, "d": {}}) + adapter._dispatch_payload({"op": 0, "t": "SOME_EVENT", "s": 10, "d": {}}) + assert adapter._last_seq == 10 + + +# --------------------------------------------------------------------------- +# READY / RESUMED handling +# --------------------------------------------------------------------------- + +class TestReadyHandling: + def _make_adapter(self, **extra): + from gateway.platforms.qq import QQAdapter + return QQAdapter(_make_config(**extra)) + + def test_ready_stores_session(self): + adapter = self._make_adapter(app_id="a", client_secret="b") + adapter._dispatch_payload({ + "op": 0, "t": "READY", + "s": 1, + "d": {"session_id": "sess_abc123"}, + }) + assert adapter._session_id == "sess_abc123" + + def test_resumed_preserves_session(self): + adapter = self._make_adapter(app_id="a", client_secret="b") + adapter._session_id = "old_sess" + adapter._last_seq = 50 + adapter._dispatch_payload({ + "op": 0, "t": "RESUMED", "s": 60, "d": {}, + }) + # Session should remain unchanged on RESUMED + assert adapter._session_id == "old_sess" + assert adapter._last_seq == 60 + + +# --------------------------------------------------------------------------- +# _parse_json +# --------------------------------------------------------------------------- + +class TestParseJson: + def _fn(self, raw): + from gateway.platforms.qq import QQAdapter + return QQAdapter._parse_json(raw) + + def test_valid_json(self): + result = self._fn('{"op": 10, "d": {}}') + assert result == {"op": 10, "d": {}} + + def test_invalid_json(self): + result = self._fn("not json") + assert result is None + + def test_none_input(self): + result = self._fn(None) + assert result is None + + def test_non_dict_json(self): + result = self._fn('"just a string"') + assert result is None + + def test_empty_dict(self): + result = self._fn('{}') + assert result == {} + + +# --------------------------------------------------------------------------- +# _build_text_body +# --------------------------------------------------------------------------- + +class TestBuildTextBody: + def _make_adapter(self, **extra): + from gateway.platforms.qq import QQAdapter + return QQAdapter(_make_config(**extra)) + + def test_plain_text(self): + adapter = self._make_adapter(app_id="a", client_secret="b", markdown_support=False) + body = adapter._build_text_body("hello world") + assert body["msg_type"] == 0 # MSG_TYPE_TEXT + assert body["content"] == "hello world" + + def test_markdown_text(self): + adapter = self._make_adapter(app_id="a", client_secret="b", markdown_support=True) + body = adapter._build_text_body("**bold** text") + assert body["msg_type"] == 2 # MSG_TYPE_MARKDOWN + assert body["markdown"]["content"] == "**bold** text" + + def test_truncation(self): + adapter = self._make_adapter(app_id="a", client_secret="b", markdown_support=False) + long_text = "x" * 10000 + body = adapter._build_text_body(long_text) + assert len(body["content"]) == adapter.MAX_MESSAGE_LENGTH + + def test_empty_string(self): + adapter = self._make_adapter(app_id="a", client_secret="b", markdown_support=False) + body = adapter._build_text_body("") + assert body["content"] == "" + + def test_reply_to(self): + adapter = self._make_adapter(app_id="a", client_secret="b", markdown_support=False) + body = adapter._build_text_body("reply text", reply_to="msg_123") + assert body.get("message_reference", {}).get("message_id") == "msg_123" diff --git a/tools/send_message_tool.py b/tools/send_message_tool.py index a2b3e984c..6da0a4537 100644 --- a/tools/send_message_tool.py +++ b/tools/send_message_tool.py @@ -160,6 +160,7 @@ def _handle_send(args): "wecom": Platform.WECOM, "wecom_callback": Platform.WECOM_CALLBACK, "weixin": Platform.WEIXIN, + "qq": Platform.QQ, "email": Platform.EMAIL, "sms": Platform.SMS, } @@ -426,6 +427,8 @@ async def _send_to_platform(platform, pconfig, chat_id, message, thread_id=None, result = await _send_wecom(pconfig.extra, chat_id, chunk) elif platform == Platform.BLUEBUBBLES: result = await _send_bluebubbles(pconfig.extra, chat_id, chunk) + elif platform == Platform.QQ: + result = await _send_qq(pconfig.extra, chat_id, chunk) else: result = {"error": f"Direct sending not yet implemented for {platform.value}"} @@ -968,6 +971,25 @@ async def _send_bluebubbles(extra, chat_id, message): return _error(f"BlueBubbles send failed: {e}") +async def _send_qq(extra, chat_id, message): + """Send via QQ Bot Official API v2 using the adapter's REST endpoint.""" + try: + from gateway.platforms.qq import QQAdapter + except ImportError: + return {"error": "QQ adapter not available."} + + try: + from gateway.config import PlatformConfig + pconfig = PlatformConfig(extra=extra) + adapter = QQAdapter(pconfig) + result = await adapter.send(chat_id, message) + if not result.success: + return _error(f"QQ send failed: {result.error}") + return {"success": True, "platform": "qq", "chat_id": chat_id, "message_id": result.message_id} + except Exception as e: + return _error(f"QQ send failed: {e}") + + async def _send_feishu(pconfig, chat_id, message, media_files=None, thread_id=None): """Send via Feishu/Lark using the adapter's send pipeline.""" try: diff --git a/toolsets.py b/toolsets.py index da7a2d2b2..8657f5bbf 100644 --- a/toolsets.py +++ b/toolsets.py @@ -359,6 +359,12 @@ TOOLSETS = { "includes": [] }, + "hermes-qq": { + "description": "QQ Bot toolset - QQ messaging via Official Bot API v2 (full access)", + "tools": _HERMES_CORE_TOOLS, + "includes": [] + }, + "hermes-wecom": { "description": "WeCom bot toolset - enterprise WeChat messaging (full access)", "tools": _HERMES_CORE_TOOLS, @@ -386,7 +392,7 @@ TOOLSETS = { "hermes-gateway": { "description": "Gateway toolset - union of all messaging platform tools", "tools": [], - "includes": ["hermes-telegram", "hermes-discord", "hermes-whatsapp", "hermes-slack", "hermes-signal", "hermes-bluebubbles", "hermes-homeassistant", "hermes-email", "hermes-sms", "hermes-mattermost", "hermes-matrix", "hermes-dingtalk", "hermes-feishu", "hermes-wecom", "hermes-wecom-callback", "hermes-weixin", "hermes-webhook"] + "includes": ["hermes-telegram", "hermes-discord", "hermes-whatsapp", "hermes-slack", "hermes-signal", "hermes-bluebubbles", "hermes-homeassistant", "hermes-email", "hermes-sms", "hermes-mattermost", "hermes-matrix", "hermes-dingtalk", "hermes-feishu", "hermes-wecom", "hermes-wecom-callback", "hermes-weixin", "hermes-qq", "hermes-webhook"] } } diff --git a/website/docs/reference/environment-variables.md b/website/docs/reference/environment-variables.md index 907391128..dc2b3c58b 100644 --- a/website/docs/reference/environment-variables.md +++ b/website/docs/reference/environment-variables.md @@ -262,6 +262,20 @@ For cloud sandbox backends, persistence is filesystem-oriented. `TERMINAL_LIFETI | `BLUEBUBBLES_HOME_CHANNEL` | Phone/email for cron/notification delivery | | `BLUEBUBBLES_ALLOWED_USERS` | Comma-separated authorized users | | `BLUEBUBBLES_ALLOW_ALL_USERS` | Allow all users (`true`/`false`) | + +#### QQ Bot + +| Variable | Description | +|----------|-------------| +| `QQ_APP_ID` | QQ Bot App ID (from open.qq.com) | +| `QQ_CLIENT_SECRET` | QQ Bot App Secret | +| `QQ_SANDBOX` | Enable sandbox mode for testing (`true`/`false`) | +| `QQ_ALLOWED_USERS` | Comma-separated QQ user IDs allowed to DM the bot | +| `QQ_GROUP_ALLOWED_USERS` | Comma-separated QQ user IDs allowed in group messages | +| `QQ_ALLOW_ALL_USERS` | Allow all QQ users (`true`/`false`) | +| `QQ_HOME_CHANNEL` | QQ group ID for cron delivery and notifications | +| `QQ_HOME_CHANNEL_NAME` | Display name for the QQ home channel | + | `MATTERMOST_URL` | Mattermost server URL (e.g. `https://mm.example.com`) | | `MATTERMOST_TOKEN` | Bot token or personal access token for Mattermost | | `MATTERMOST_ALLOWED_USERS` | Comma-separated Mattermost user IDs allowed to message the bot | diff --git a/website/docs/user-guide/messaging/index.md b/website/docs/user-guide/messaging/index.md index f4131385e..14e50612f 100644 --- a/website/docs/user-guide/messaging/index.md +++ b/website/docs/user-guide/messaging/index.md @@ -6,7 +6,7 @@ description: "Chat with Hermes from Telegram, Discord, Slack, WhatsApp, Signal, # Messaging Gateway -Chat with Hermes from Telegram, Discord, Slack, WhatsApp, Signal, SMS, Email, Home Assistant, Mattermost, Matrix, DingTalk, Feishu/Lark, WeCom, Weixin, BlueBubbles (iMessage), or your browser. The gateway is a single background process that connects to all your configured platforms, handles sessions, runs cron jobs, and delivers voice messages. +Chat with Hermes from Telegram, Discord, Slack, WhatsApp, Signal, SMS, Email, Home Assistant, Mattermost, Matrix, DingTalk, Feishu/Lark, WeCom, Weixin, BlueBubbles (iMessage), QQ, or your browser. The gateway is a single background process that connects to all your configured platforms, handles sessions, runs cron jobs, and delivers voice messages. For the full voice feature set — including CLI microphone mode, spoken replies in messaging, and Discord voice-channel conversations — see [Voice Mode](/docs/user-guide/features/voice-mode) and [Use Voice Mode with Hermes](/docs/guides/use-voice-mode-with-hermes). @@ -30,6 +30,7 @@ For the full voice feature set — including CLI microphone mode, spoken replies | WeCom Callback | — | — | — | — | — | — | — | | Weixin | ✅ | ✅ | ✅ | — | — | ✅ | ✅ | | BlueBubbles | — | ✅ | ✅ | — | ✅ | ✅ | — | +| QQ | ✅ | ✅ | ✅ | — | — | — | — | **Voice** = TTS audio replies and/or voice message transcription. **Images** = send/receive images. **Files** = send/receive file attachments. **Threads** = threaded conversations. **Reactions** = emoji reactions on messages. **Typing** = typing indicator while processing. **Streaming** = progressive message updates via editing. @@ -55,6 +56,7 @@ flowchart TB wcb[WeCom Callback] wx[Weixin] bb[BlueBubbles] + qq[QQ] api["API Server
(OpenAI-compatible)"] wh[Webhooks] end @@ -369,6 +371,7 @@ Each platform has its own toolset: | WeCom Callback | `hermes-wecom-callback` | Full tools including terminal | | Weixin | `hermes-weixin` | Full tools including terminal | | BlueBubbles | `hermes-bluebubbles` | Full tools including terminal | +| QQ | `hermes-qq` | Full tools including terminal | | API Server | `hermes` (default) | Full tools including terminal | | Webhooks | `hermes-webhook` | Full tools including terminal | @@ -390,5 +393,6 @@ Each platform has its own toolset: - [WeCom Callback Setup](wecom-callback.md) - [Weixin Setup (WeChat)](weixin.md) - [BlueBubbles Setup (iMessage)](bluebubbles.md) +- [QQ Bot Setup](qq.md) - [Open WebUI + API Server](open-webui.md) - [Webhooks](webhooks.md) diff --git a/website/docs/user-guide/messaging/qq.md b/website/docs/user-guide/messaging/qq.md new file mode 100644 index 000000000..686fd862e --- /dev/null +++ b/website/docs/user-guide/messaging/qq.md @@ -0,0 +1,122 @@ +# QQ Bot + +Connect Hermes to QQ via the **Official QQ Bot API (v2)** — supporting private (C2C), group @-mentions, guild, and direct messages with voice transcription. + +## Overview + +The QQ Bot adapter uses the [Official QQ Bot API](https://bot.q.qq.com/wiki/develop/api-v2/) to: + +- Receive messages via a persistent **WebSocket** connection to the QQ Gateway +- Send text and markdown replies via the **REST API** +- Download and process images, voice messages, and file attachments +- Transcribe voice messages using Tencent's built-in ASR or a configurable STT provider + +## Prerequisites + +1. **QQ Bot Application** — Register at [q.qq.com](https://q.qq.com): + - Create a new application and note your **App ID** and **App Secret** + - Enable the required intents: C2C messages, Group @-messages, Guild messages + - Configure your bot in sandbox mode for testing, or publish for production + +2. **Dependencies** — The adapter requires `aiohttp` and `httpx`: + ```bash + pip install aiohttp httpx + ``` + +## Configuration + +### Interactive setup + +```bash +hermes setup gateway +``` + +Select **QQ Bot** from the platform list and follow the prompts. + +### Manual configuration + +Set the required environment variables in `~/.hermes/.env`: + +```bash +QQ_APP_ID=your-app-id +QQ_CLIENT_SECRET=your-app-secret +``` + +## Environment Variables + +| Variable | Description | Default | +|---|---|---| +| `QQ_APP_ID` | QQ Bot App ID (required) | — | +| `QQ_CLIENT_SECRET` | QQ Bot App Secret (required) | — | +| `QQ_HOME_CHANNEL` | OpenID for cron/notification delivery | — | +| `QQ_HOME_CHANNEL_NAME` | Display name for home channel | `Home` | +| `QQ_ALLOWED_USERS` | Comma-separated user OpenIDs for DM access | open (all users) | +| `QQ_ALLOW_ALL_USERS` | Set to `true` to allow all DMs | `false` | +| `QQ_MARKDOWN_SUPPORT` | Enable QQ markdown (msg_type 2) | `true` | +| `QQ_STT_API_KEY` | API key for voice-to-text provider | — | +| `QQ_STT_BASE_URL` | Base URL for STT provider | `https://open.bigmodel.cn/api/coding/paas/v4` | +| `QQ_STT_MODEL` | STT model name | `glm-asr` | + +## Advanced Configuration + +For fine-grained control, add platform settings to `~/.hermes/config.yaml`: + +```yaml +platforms: + qq: + enabled: true + extra: + app_id: "your-app-id" + client_secret: "your-secret" + markdown_support: true + dm_policy: "open" # open | allowlist | disabled + allow_from: + - "user_openid_1" + group_policy: "open" # open | allowlist | disabled + group_allow_from: + - "group_openid_1" + stt: + provider: "zai" # zai (GLM-ASR), openai (Whisper), etc. + baseUrl: "https://open.bigmodel.cn/api/coding/paas/v4" + apiKey: "your-stt-key" + model: "glm-asr" +``` + +## Voice Messages (STT) + +Voice transcription works in two stages: + +1. **QQ built-in ASR** (free, always tried first) — QQ provides `asr_refer_text` in voice message attachments, which uses Tencent's own speech recognition +2. **Configured STT provider** (fallback) — If QQ's ASR doesn't return text, the adapter calls an OpenAI-compatible STT API: + + - **Zhipu/GLM (zai)**: Default provider, uses `glm-asr` model + - **OpenAI Whisper**: Set `QQ_STT_BASE_URL` and `QQ_STT_MODEL` + - Any OpenAI-compatible STT endpoint + +## Troubleshooting + +### Bot disconnects immediately (quick disconnect) + +This usually means: +- **Invalid App ID / Secret** — Double-check your credentials at q.qq.com +- **Missing permissions** — Ensure the bot has the required intents enabled +- **Sandbox-only bot** — If the bot is in sandbox mode, it can only receive messages from QQ's sandbox test channel + +### Voice messages not transcribed + +1. Check if QQ's built-in `asr_refer_text` is present in the attachment data +2. If using a custom STT provider, verify `QQ_STT_API_KEY` is set correctly +3. Check gateway logs for STT error messages + +### Messages not delivered + +- Verify the bot's **intents** are enabled at q.qq.com +- Check `QQ_ALLOWED_USERS` if DM access is restricted +- For group messages, ensure the bot is **@mentioned** (group policy may require allowlisting) +- Check `QQ_HOME_CHANNEL` for cron/notification delivery + +### Connection errors + +- Ensure `aiohttp` and `httpx` are installed: `pip install aiohttp httpx` +- Check network connectivity to `api.sgroup.qq.com` and the WebSocket gateway +- Review gateway logs for detailed error messages and reconnect behavior