diff --git a/gateway/platforms/qqbot/__init__.py b/gateway/platforms/qqbot/__init__.py index 7a01288cfc..d755ec48df 100644 --- a/gateway/platforms/qqbot/__init__.py +++ b/gateway/platforms/qqbot/__init__.py @@ -41,6 +41,20 @@ from .chunked_upload import ( # noqa: F401 UploadFileTooLargeError, ) +# -- Inline keyboards ------------------------------------------------------ +from .keyboards import ( # noqa: F401 + ApprovalRequest, + ApprovalSender, + InlineKeyboard, + InteractionEvent, + build_approval_keyboard, + build_approval_text, + build_update_prompt_keyboard, + parse_approval_button_data, + parse_interaction_event, + parse_update_prompt_button_data, +) + __all__ = [ # adapter "QQAdapter", @@ -63,4 +77,15 @@ __all__ = [ "ChunkedUploader", "UploadDailyLimitExceededError", "UploadFileTooLargeError", + # keyboards + "ApprovalRequest", + "ApprovalSender", + "InlineKeyboard", + "InteractionEvent", + "build_approval_keyboard", + "build_approval_text", + "build_update_prompt_keyboard", + "parse_approval_button_data", + "parse_interaction_event", + "parse_update_prompt_button_data", ] diff --git a/gateway/platforms/qqbot/adapter.py b/gateway/platforms/qqbot/adapter.py index f0e89aabe7..046758c796 100644 --- a/gateway/platforms/qqbot/adapter.py +++ b/gateway/platforms/qqbot/adapter.py @@ -41,7 +41,7 @@ import time import uuid from datetime import datetime, timezone from pathlib import Path -from typing import Any, Dict, List, Optional, Tuple +from typing import Any, Awaitable, Callable, Dict, List, Optional, Tuple from urllib.parse import urlparse try: @@ -124,6 +124,17 @@ from gateway.platforms.qqbot.chunked_upload import ( UploadDailyLimitExceededError, UploadFileTooLargeError, ) +from gateway.platforms.qqbot.keyboards import ( + ApprovalRequest, + ApprovalSender, + InlineKeyboard, + InteractionEvent, + build_approval_keyboard, + build_update_prompt_keyboard, + parse_approval_button_data, + parse_interaction_event, + parse_update_prompt_button_data, +) def check_qq_requirements() -> bool: @@ -213,6 +224,14 @@ class QQAdapter(BasePlatformAdapter): # Upload cache: content_hash -> {file_info, file_uuid, expires_at} self._upload_cache: Dict[str, Dict[str, Any]] = {} + # Inline-keyboard interaction routing. The callback (if set) is invoked + # for every INTERACTION_CREATE event after the adapter has already + # ACKed it. Callers (gateway wiring for approvals / update prompts) + # register via set_interaction_callback(). + self._interaction_callback: Optional[ + Callable[[InteractionEvent], Awaitable[None]] + ] = None + # ------------------------------------------------------------------ # Properties # ------------------------------------------------------------------ @@ -764,6 +783,8 @@ class QQAdapter(BasePlatformAdapter): "GUILD_AT_MESSAGE_CREATE", ): asyncio.create_task(self._on_message(t, d)) + elif t == "INTERACTION_CREATE": + self._create_task(self._on_interaction(d)) else: logger.debug("[%s] Unhandled dispatch: %s", self._log_tag, t) return @@ -837,6 +858,111 @@ class QQAdapter(BasePlatformAdapter): elif event_type == "DIRECT_MESSAGE_CREATE": await self._handle_dm_message(d, msg_id, content, author, timestamp) + # ------------------------------------------------------------------ + # Inline-keyboard interactions (INTERACTION_CREATE) + # ------------------------------------------------------------------ + + def set_interaction_callback( + self, + callback: Optional[Callable[[InteractionEvent], Awaitable[None]]], + ) -> None: + """Register (or clear) the interaction callback. + + Invoked once per ``INTERACTION_CREATE`` event *after* the adapter has + ACKed the interaction. The callback is responsible for routing the + button click to the right subsystem (approval resolver, update-prompt + resolver, etc.) based on the ``button_data`` payload. + """ + self._interaction_callback = callback + + async def _on_interaction(self, d: Any) -> None: + """Handle an ``INTERACTION_CREATE`` event. + + Responsibilities: + + 1. Parse the raw payload into an :class:`InteractionEvent`. + 2. ACK the interaction (``PUT /interactions/{id}``) so the client + stops showing a loading indicator on the button. + 3. Dispatch to the registered interaction callback, if any. + """ + if not isinstance(d, dict): + return + try: + event = parse_interaction_event(d) + except Exception as exc: + logger.warning( + "[%s] Failed to parse INTERACTION_CREATE: %s", self._log_tag, exc + ) + return + + if not event.id: + logger.warning( + "[%s] INTERACTION_CREATE missing id, skipping ACK", self._log_tag + ) + return + + # ACK the interaction promptly — per the QQ docs the client will show + # an error icon on the button if we don't respond quickly. + try: + await self._acknowledge_interaction(event.id) + except Exception as exc: + logger.warning( + "[%s] Failed to ACK interaction %s: %s", + self._log_tag, event.id, exc, + ) + + logger.info( + "[%s] Interaction: scene=%s button_data=%r operator=%s", + self._log_tag, event.scene, event.button_data, event.operator_openid, + ) + + callback = self._interaction_callback + if callback is None: + logger.debug( + "[%s] No interaction callback registered; dropping button " + "click %r", + self._log_tag, event.button_data, + ) + return + try: + await callback(event) + except Exception as exc: + logger.error( + "[%s] Interaction callback raised: %s", + self._log_tag, exc, exc_info=True, + ) + + async def _acknowledge_interaction( + self, + interaction_id: str, + code: int = 0, + ) -> None: + """ACK a button interaction via ``PUT /interactions/{id}``. + + :param interaction_id: The ``id`` field from the + ``INTERACTION_CREATE`` event. + :param code: Response code (``0`` = success). + """ + if not self._http_client: + raise RuntimeError("HTTP client not initialized — not connected?") + token = await self._ensure_token() + headers = { + "Authorization": f"QQBot {token}", + "Content-Type": "application/json", + "User-Agent": build_user_agent(), + } + resp = await self._http_client.put( + f"{API_BASE}/interactions/{interaction_id}", + headers=headers, + json={"code": code}, + timeout=DEFAULT_API_TIMEOUT, + ) + if resp.status_code >= 400: + raise RuntimeError( + f"Interaction ACK failed [{resp.status_code}]: " + f"{resp.text[:200]}" + ) + async def _handle_c2c_message( self, d: Dict[str, Any], @@ -1997,26 +2123,44 @@ class QQAdapter(BasePlatformAdapter): return SendResult(success=False, error=error_msg, retryable=retryable) async def _send_c2c_text( - self, openid: str, content: str, reply_to: Optional[str] = None + self, + openid: str, + content: str, + reply_to: Optional[str] = None, + keyboard: Optional[InlineKeyboard] = None, ) -> SendResult: - """Send text to a C2C user via REST API.""" + """Send text to a C2C user via REST API. + + :param keyboard: Optional inline keyboard attached to the message. + """ self._next_msg_seq(reply_to or openid) body = self._build_text_body(content, reply_to) if reply_to: body["msg_id"] = reply_to + if keyboard is not None: + body["keyboard"] = keyboard.to_dict() data = await self._api_request("POST", f"/v2/users/{openid}/messages", body) msg_id = str(data.get("id", uuid.uuid4().hex[:12])) return SendResult(success=True, message_id=msg_id, raw_response=data) async def _send_group_text( - self, group_openid: str, content: str, reply_to: Optional[str] = None + self, + group_openid: str, + content: str, + reply_to: Optional[str] = None, + keyboard: Optional[InlineKeyboard] = None, ) -> SendResult: - """Send text to a group via REST API.""" + """Send text to a group via REST API. + + :param keyboard: Optional inline keyboard attached to the message. + """ self._next_msg_seq(reply_to or group_openid) body = self._build_text_body(content, reply_to) if reply_to: body["msg_id"] = reply_to + if keyboard is not None: + body["keyboard"] = keyboard.to_dict() data = await self._api_request( "POST", f"/v2/groups/{group_openid}/messages", body @@ -2036,6 +2180,100 @@ class QQAdapter(BasePlatformAdapter): msg_id = str(data.get("id", uuid.uuid4().hex[:12])) return SendResult(success=True, message_id=msg_id, raw_response=data) + # ------------------------------------------------------------------ + # Inline-keyboard outbound helpers (approval / update-prompt flows) + # ------------------------------------------------------------------ + + async def send_with_keyboard( + self, + chat_id: str, + content: str, + keyboard: InlineKeyboard, + reply_to: Optional[str] = None, + ) -> SendResult: + """Send a single text message with an inline keyboard attached. + + Unlike :meth:`send`, this does NOT split long content into chunks — + a keyboard message has exactly one interactive surface, and splitting + would orphan the buttons from the first chunk. Callers should keep + approval/update-prompt bodies short. + + Guild (channel) chats don't support inline keyboards; returns a + non-retryable failure for those. + """ + if not self.is_connected: + if not await self._wait_for_reconnection(): + return SendResult( + success=False, error="Not connected", retryable=True + ) + + chat_type = self._guess_chat_type(chat_id) + formatted = self.format_message(content) + truncated = formatted[: self.MAX_MESSAGE_LENGTH] + try: + if chat_type == "c2c": + return await self._send_c2c_text( + chat_id, truncated, reply_to, keyboard=keyboard, + ) + if chat_type == "group": + return await self._send_group_text( + chat_id, truncated, reply_to, keyboard=keyboard, + ) + return SendResult( + success=False, + error=( + f"Inline keyboards not supported for chat_type " + f"{chat_type!r}" + ), + retryable=False, + ) + except Exception as exc: + logger.error( + "[%s] send_with_keyboard failed: %s", self._log_tag, exc + ) + return SendResult(success=False, error=str(exc)) + + async def send_approval_request( + self, + chat_id: str, + req: ApprovalRequest, + reply_to: Optional[str] = None, + ) -> SendResult: + """Send a 3-button approval request (``allow-once / allow-always / deny``). + + The rendered text comes from :func:`build_approval_text`; callers can + override by passing a custom :class:`ApprovalRequest`. + + Users click the button → ``INTERACTION_CREATE`` fires → the adapter's + registered :meth:`set_interaction_callback` handler decodes + ``button_data`` via :func:`parse_approval_button_data`. + """ + from gateway.platforms.qqbot.keyboards import build_approval_text + return await self.send_with_keyboard( + chat_id, + build_approval_text(req), + build_approval_keyboard(req.session_key), + reply_to=reply_to, + ) + + async def send_update_prompt( + self, + chat_id: str, + content: str, + reply_to: Optional[str] = None, + ) -> SendResult: + """Send a Yes/No update-confirmation prompt with inline buttons. + + Button clicks surface as ``INTERACTION_CREATE`` with + ``button_data = 'update_prompt:y'`` or ``'update_prompt:n'``. + """ + return await self.send_with_keyboard( + chat_id, + content, + build_update_prompt_keyboard(), + reply_to=reply_to, + ) + def _build_text_body( self, content: str, reply_to: Optional[str] = None ) -> Dict[str, Any]: diff --git a/gateway/platforms/qqbot/keyboards.py b/gateway/platforms/qqbot/keyboards.py new file mode 100644 index 0000000000..19fd36e370 --- /dev/null +++ b/gateway/platforms/qqbot/keyboards.py @@ -0,0 +1,473 @@ +"""QQ Bot inline keyboards + approval / update-prompt senders. + +QQ Bot v2 supports attaching inline keyboards to outbound messages. When a +user clicks a button, the platform dispatches an ``INTERACTION_CREATE`` +gateway event containing the button's ``data`` payload. The bot must ACK the +interaction promptly via ``PUT /interactions/{id}`` or the user sees an +error indicator on the button. + +This module provides: + +- :class:`InlineKeyboard` + button dataclasses — serialized into the + ``keyboard`` field of the outbound message body. +- :func:`build_approval_keyboard` — 3-button ✅ once / ⭐ always / ❌ deny + keyboard for tool-approval flows. +- :func:`build_update_prompt_keyboard` — Yes/No keyboard for update confirms. +- :func:`parse_approval_button_data` / :func:`parse_update_prompt_button_data` + — decode the ``button_data`` payload from ``INTERACTION_CREATE``. +- :class:`ApprovalRequest` + :class:`ApprovalSender` — high-level helper that + builds an approval message with keyboard and posts it to a c2c / group chat. + +``button_data`` formats:: + + approve:: # decision = allow-once|allow-always|deny + update_prompt: # answer = y|n + +Ported from WideLee's qqbot-agent-sdk v1.2.2 (``approval.py`` + ``dto.py`` +keyboard types). Authorship preserved via Co-authored-by. +""" + +from __future__ import annotations + +import logging +import re +from dataclasses import dataclass, field +from typing import Any, Awaitable, Callable, Dict, List, Optional + +logger = logging.getLogger(__name__) + +# ── button_data prefixes + patterns ────────────────────────────────── + +APPROVAL_BUTTON_PREFIX = "approve:" +UPDATE_PROMPT_PREFIX = "update_prompt:" + +# Pattern: approve:: +# session_key may itself contain colons (e.g. agent:main:qqbot:c2c:OPENID), +# so the session_key group is greedy but trails the decision. +_APPROVAL_DATA_RE = re.compile( + r"^approve:(.+):(allow-once|allow-always|deny)$" +) + +# Pattern: update_prompt:y | update_prompt:n +_UPDATE_PROMPT_RE = re.compile(r"^update_prompt:(y|n)$") + + +# ── Keyboard dataclasses ───────────────────────────────────────────── + +@dataclass +class KeyboardButtonPermission: + """Button permission metadata. ``type=2`` means all users can click.""" + type: int = 2 + + def to_dict(self) -> Dict[str, Any]: + return {"type": self.type} + + +@dataclass +class KeyboardButtonAction: + """What happens when the button is clicked. + + :param type: ``1`` (Callback — triggers ``INTERACTION_CREATE``) or + ``2`` (Link — opens a URL). + :param data: Payload delivered in ``data.resolved.button_data`` when + ``type=1``. + :param permission: :class:`KeyboardButtonPermission`. + :param click_limit: Max clicks per user (``1`` = single-use). + """ + type: int + data: str + permission: KeyboardButtonPermission = field( + default_factory=KeyboardButtonPermission + ) + click_limit: int = 1 + + def to_dict(self) -> Dict[str, Any]: + return { + "type": self.type, + "data": self.data, + "permission": self.permission.to_dict(), + "click_limit": self.click_limit, + } + + +@dataclass +class KeyboardButtonRenderData: + """Visual rendering of a button. + + :param label: Pre-click label. + :param visited_label: Post-click label (button stays greyed in place). + :param style: ``0`` = grey, ``1`` = blue. + """ + label: str + visited_label: str + style: int = 1 + + def to_dict(self) -> Dict[str, Any]: + return { + "label": self.label, + "visited_label": self.visited_label, + "style": self.style, + } + + +@dataclass +class KeyboardButton: + """One button in a keyboard. + + :param group_id: Buttons sharing a ``group_id`` are mutually exclusive — + clicking one greys the rest. + """ + id: str + render_data: KeyboardButtonRenderData + action: KeyboardButtonAction + group_id: str = "default" + + def to_dict(self) -> Dict[str, Any]: + return { + "id": self.id, + "render_data": self.render_data.to_dict(), + "action": self.action.to_dict(), + "group_id": self.group_id, + } + + +@dataclass +class KeyboardRow: + buttons: List[KeyboardButton] = field(default_factory=list) + + def to_dict(self) -> Dict[str, Any]: + return {"buttons": [b.to_dict() for b in self.buttons]} + + +@dataclass +class KeyboardContent: + rows: List[KeyboardRow] = field(default_factory=list) + + def to_dict(self) -> Dict[str, Any]: + return {"rows": [r.to_dict() for r in self.rows]} + + +@dataclass +class InlineKeyboard: + """Top-level keyboard payload — goes into ``MessageToCreate.keyboard``.""" + content: KeyboardContent = field(default_factory=KeyboardContent) + + def to_dict(self) -> Dict[str, Any]: + return {"content": self.content.to_dict()} + + +# ── INTERACTION_CREATE parsing ─────────────────────────────────────── + +def parse_approval_button_data(button_data: str) -> Optional[tuple[str, str]]: + """Parse approval ``button_data`` into ``(session_key, decision)``. + + :param button_data: Raw ``data.resolved.button_data`` from + ``INTERACTION_CREATE``. + :returns: ``(session_key, decision)`` or ``None`` if not an approval button. + """ + m = _APPROVAL_DATA_RE.match(button_data or "") + if not m: + return None + return m.group(1), m.group(2) + + +def parse_update_prompt_button_data(button_data: str) -> Optional[str]: + """Parse update-prompt ``button_data`` into ``'y'`` or ``'n'``.""" + m = _UPDATE_PROMPT_RE.match(button_data or "") + if not m: + return None + return m.group(1) + + +# ── Keyboard builders ──────────────────────────────────────────────── + +def _make_callback_button( + btn_id: str, + label: str, + visited_label: str, + data: str, + style: int, + group_id: str, +) -> KeyboardButton: + return KeyboardButton( + id=btn_id, + render_data=KeyboardButtonRenderData( + label=label, + visited_label=visited_label, + style=style, + ), + action=KeyboardButtonAction(type=1, data=data), + group_id=group_id, + ) + + +def build_approval_keyboard(session_key: str) -> InlineKeyboard: + """Build the 3-button approval keyboard. + + Layout: ``[✅ 允许一次] [⭐ 始终允许] [❌ 拒绝]`` — all three share + ``group_id='approval'`` so clicking one greys out the rest. + + :param session_key: Embedded into ``button_data`` so the decision + routes back to the right pending approval. + """ + return InlineKeyboard( + content=KeyboardContent( + rows=[ + KeyboardRow(buttons=[ + _make_callback_button( + btn_id="allow", + label="✅ 允许一次", + visited_label="已允许", + data=f"{APPROVAL_BUTTON_PREFIX}{session_key}:allow-once", + style=1, + group_id="approval", + ), + _make_callback_button( + btn_id="always", + label="⭐ 始终允许", + visited_label="已始终允许", + data=f"{APPROVAL_BUTTON_PREFIX}{session_key}:allow-always", + style=1, + group_id="approval", + ), + _make_callback_button( + btn_id="deny", + label="❌ 拒绝", + visited_label="已拒绝", + data=f"{APPROVAL_BUTTON_PREFIX}{session_key}:deny", + style=0, + group_id="approval", + ), + ]), + ] + ) + ) + + +def build_update_prompt_keyboard() -> InlineKeyboard: + """Build a Yes/No keyboard for update confirmation prompts.""" + return InlineKeyboard( + content=KeyboardContent( + rows=[ + KeyboardRow(buttons=[ + _make_callback_button( + btn_id="yes", + label="✓ 确认", + visited_label="已确认", + data=f"{UPDATE_PROMPT_PREFIX}y", + style=1, + group_id="update_prompt", + ), + _make_callback_button( + btn_id="no", + label="✗ 取消", + visited_label="已取消", + data=f"{UPDATE_PROMPT_PREFIX}n", + style=0, + group_id="update_prompt", + ), + ]), + ] + ) + ) + + +# ── ApprovalRequest + text builder ─────────────────────────────────── + +@dataclass +class ApprovalRequest: + """Structured approval-request display data. + + :param session_key: Routes the decision back to the waiting caller. + :param title: Short title at the top. + :param description: Optional longer description. + :param command_preview: Command text (exec approvals). + :param cwd: Working directory (exec approvals). + :param tool_name: Tool name (plugin approvals). + :param severity: ``'critical' | 'info' | ''``. + :param timeout_sec: Seconds until the approval expires. + """ + session_key: str + title: str + description: str = "" + command_preview: str = "" + cwd: str = "" + tool_name: str = "" + severity: str = "" + timeout_sec: int = 120 + + +def build_approval_text(req: ApprovalRequest) -> str: + """Render an :class:`ApprovalRequest` into the message body (markdown).""" + if req.command_preview or req.cwd: + return _build_exec_text(req) + return _build_plugin_text(req) + + +def _build_exec_text(req: ApprovalRequest) -> str: + lines: List[str] = ["🔐 **命令执行审批**", ""] + if req.command_preview: + preview = req.command_preview[:300] + lines.append(f"```\n{preview}\n```") + if req.cwd: + lines.append(f"📁 目录: {req.cwd}") + if req.title and req.title != req.command_preview: + lines.append(f"📋 {req.title}") + if req.description: + lines.append(f"📝 {req.description}") + lines.append("") + lines.append(f"⏱️ 超时: {req.timeout_sec} 秒") + return "\n".join(lines) + + +def _build_plugin_text(req: ApprovalRequest) -> str: + icon = ( + "🔴" if req.severity == "critical" + else "🔵" if req.severity == "info" + else "🟡" + ) + lines: List[str] = [f"{icon} **审批请求**", ""] + lines.append(f"📋 {req.title}") + if req.description: + lines.append(f"📝 {req.description}") + if req.tool_name: + lines.append(f"🔧 工具: {req.tool_name}") + lines.append("") + lines.append(f"⏱️ 超时: {req.timeout_sec} 秒") + return "\n".join(lines) + + +# ── ApprovalSender ─────────────────────────────────────────────────── + +PostMessageFn = Callable[..., Awaitable[Dict[str, Any]]] +"""Signature of an async POST to ``/v2/{users|groups}/{id}/messages``. + +Implementations accept a body dict and return the raw API response. +""" + + +class ApprovalSender: + """Send an approval-request message with an inline keyboard. + + Decoupled from the adapter via callables so it can be unit-tested in + isolation. Pass the adapter's ``_send_message_with_keyboard`` helper + (or any equivalent) as ``post_message``. + """ + + def __init__( + self, + post_c2c: PostMessageFn, + post_group: PostMessageFn, + log_tag: str = "QQBot", + ) -> None: + self._post_c2c = post_c2c + self._post_group = post_group + self._log_tag = log_tag + + async def send( + self, + chat_type: str, + chat_id: str, + req: ApprovalRequest, + msg_id: Optional[str] = None, + ) -> bool: + """Send an approval message to *chat_id*. + + :param chat_type: ``'c2c'`` or ``'group'``. + :param chat_id: User openid or group openid. + :param req: :class:`ApprovalRequest`. + :param msg_id: Reply-to message id (required for passive messages). + :returns: ``True`` on success, ``False`` on failure. + """ + text = build_approval_text(req) + keyboard = build_approval_keyboard(req.session_key) + + logger.info( + "[%s] Sending approval request to %s:%s (session=%.20s…)", + self._log_tag, chat_type, chat_id, req.session_key, + ) + + try: + if chat_type == "c2c": + await self._post_c2c(chat_id, text, msg_id, keyboard) + elif chat_type == "group": + await self._post_group(chat_id, text, msg_id, keyboard) + else: + logger.warning( + "[%s] Approval: unsupported chat_type %r", + self._log_tag, chat_type, + ) + return False + logger.info( + "[%s] Approval message sent to %s:%s", + self._log_tag, chat_type, chat_id, + ) + return True + except Exception as exc: + logger.error( + "[%s] Failed to send approval message to %s:%s: %s", + self._log_tag, chat_type, chat_id, exc, + ) + return False + + +# ── INTERACTION_CREATE event shape ─────────────────────────────────── + +@dataclass +class InteractionEvent: + """Parsed ``INTERACTION_CREATE`` event payload. + + See https://bot.q.qq.com/wiki/develop/api-v2/dev-prepare/interface-framework/event-emit.html + """ + id: str = "" + """Interaction event id — required for the ``PUT /interactions/{id}`` ACK.""" + + type: int = 0 + """Event type code (``11`` = message button).""" + + chat_type: int = 0 + """``0`` = guild, ``1`` = group, ``2`` = c2c.""" + + scene: str = "" + """``'guild'`` | ``'group'`` | ``'c2c'`` — human-readable scene.""" + + group_openid: str = "" + group_member_openid: str = "" + user_openid: str = "" + channel_id: str = "" + guild_id: str = "" + + button_data: str = "" + button_id: str = "" + resolver_user_id: str = "" + + @property + def operator_openid(self) -> str: + """Best available operator openid (group → member; c2c → user).""" + return ( + self.group_member_openid + or self.user_openid + or self.resolver_user_id + ) + + +def parse_interaction_event(raw: Dict[str, Any]) -> InteractionEvent: + """Parse a raw ``INTERACTION_CREATE`` dispatch payload (``d``).""" + data_raw = raw.get("data") or {} + resolved = data_raw.get("resolved") or {} + scene_code = int(raw.get("chat_type", 0) or 0) + scene = {0: "guild", 1: "group", 2: "c2c"}.get(scene_code, "") + return InteractionEvent( + id=str(raw.get("id", "")), + type=int(data_raw.get("type", 0) or 0), + chat_type=scene_code, + scene=scene, + group_openid=str(raw.get("group_openid", "")), + group_member_openid=str(raw.get("group_member_openid", "")), + user_openid=str(raw.get("user_openid", "")), + channel_id=str(raw.get("channel_id", "")), + guild_id=str(raw.get("guild_id", "")), + button_data=str(resolved.get("button_data", "")), + button_id=str(resolved.get("button_id", "")), + resolver_user_id=str(resolved.get("user_id", "")), + ) diff --git a/tests/gateway/test_qqbot.py b/tests/gateway/test_qqbot.py index 358cb97c53..5ecc28dd4c 100644 --- a/tests/gateway/test_qqbot.py +++ b/tests/gateway/test_qqbot.py @@ -975,3 +975,329 @@ class TestChunkedUploaderFlow: ) assert result["file_info"] == "F" assert put_attempts["n"] == 2 + + +# --------------------------------------------------------------------------- +# Inline keyboards — approval + update-prompt flows +# --------------------------------------------------------------------------- + +class TestApprovalButtonData: + def test_parse_allow_once(self): + from gateway.platforms.qqbot.keyboards import parse_approval_button_data + result = parse_approval_button_data("approve:agent:main:qqbot:c2c:UID:allow-once") + assert result == ("agent:main:qqbot:c2c:UID", "allow-once") + + def test_parse_allow_always(self): + from gateway.platforms.qqbot.keyboards import parse_approval_button_data + assert parse_approval_button_data("approve:sess:allow-always") == ("sess", "allow-always") + + def test_parse_deny(self): + from gateway.platforms.qqbot.keyboards import parse_approval_button_data + assert parse_approval_button_data("approve:sess:deny") == ("sess", "deny") + + def test_parse_invalid_prefix_returns_none(self): + from gateway.platforms.qqbot.keyboards import parse_approval_button_data + assert parse_approval_button_data("update_prompt:y") is None + + def test_parse_unknown_decision_returns_none(self): + from gateway.platforms.qqbot.keyboards import parse_approval_button_data + assert parse_approval_button_data("approve:sess:maybe") is None + + def test_parse_empty_returns_none(self): + from gateway.platforms.qqbot.keyboards import parse_approval_button_data + assert parse_approval_button_data("") is None + assert parse_approval_button_data(None) is None # type: ignore[arg-type] + + +class TestUpdatePromptButtonData: + def test_parse_yes(self): + from gateway.platforms.qqbot.keyboards import parse_update_prompt_button_data + assert parse_update_prompt_button_data("update_prompt:y") == "y" + + def test_parse_no(self): + from gateway.platforms.qqbot.keyboards import parse_update_prompt_button_data + assert parse_update_prompt_button_data("update_prompt:n") == "n" + + def test_parse_unknown_returns_none(self): + from gateway.platforms.qqbot.keyboards import parse_update_prompt_button_data + assert parse_update_prompt_button_data("update_prompt:maybe") is None + + def test_parse_wrong_prefix(self): + from gateway.platforms.qqbot.keyboards import parse_update_prompt_button_data + assert parse_update_prompt_button_data("approve:sess:deny") is None + + +class TestBuildApprovalKeyboard: + def test_three_buttons_in_single_row(self): + from gateway.platforms.qqbot.keyboards import build_approval_keyboard + kb = build_approval_keyboard("session-1") + assert len(kb.content.rows) == 1 + assert len(kb.content.rows[0].buttons) == 3 + + def test_button_data_embeds_session_key(self): + from gateway.platforms.qqbot.keyboards import build_approval_keyboard + kb = build_approval_keyboard("agent:main:qqbot:c2c:UID") + datas = [b.action.data for b in kb.content.rows[0].buttons] + assert datas[0] == "approve:agent:main:qqbot:c2c:UID:allow-once" + assert datas[1] == "approve:agent:main:qqbot:c2c:UID:allow-always" + assert datas[2] == "approve:agent:main:qqbot:c2c:UID:deny" + + def test_buttons_share_group_id_for_mutual_exclusion(self): + from gateway.platforms.qqbot.keyboards import build_approval_keyboard + kb = build_approval_keyboard("s") + group_ids = {b.group_id for b in kb.content.rows[0].buttons} + assert group_ids == {"approval"} + + def test_to_dict_has_expected_shape(self): + from gateway.platforms.qqbot.keyboards import build_approval_keyboard + kb = build_approval_keyboard("s") + d = kb.to_dict() + assert "content" in d + assert "rows" in d["content"] + assert len(d["content"]["rows"]) == 1 + btn0 = d["content"]["rows"][0]["buttons"][0] + assert btn0["id"] == "allow" + assert btn0["action"]["type"] == 1 + assert btn0["action"]["data"].startswith("approve:s:") + assert btn0["render_data"]["label"] + assert btn0["render_data"]["visited_label"] + + def test_round_trip_parse_matches_build(self): + """Every button built by build_approval_keyboard is parseable.""" + from gateway.platforms.qqbot.keyboards import ( + build_approval_keyboard, parse_approval_button_data, + ) + session_key = "agent:main:qqbot:c2c:UID123" + kb = build_approval_keyboard(session_key) + for btn in kb.content.rows[0].buttons: + parsed = parse_approval_button_data(btn.action.data) + assert parsed is not None + assert parsed[0] == session_key + assert parsed[1] in ("allow-once", "allow-always", "deny") + + +class TestBuildUpdatePromptKeyboard: + def test_two_buttons(self): + from gateway.platforms.qqbot.keyboards import build_update_prompt_keyboard + kb = build_update_prompt_keyboard() + assert len(kb.content.rows[0].buttons) == 2 + + def test_button_data_shape(self): + from gateway.platforms.qqbot.keyboards import build_update_prompt_keyboard + kb = build_update_prompt_keyboard() + datas = [b.action.data for b in kb.content.rows[0].buttons] + assert datas == ["update_prompt:y", "update_prompt:n"] + + +class TestBuildApprovalText: + def test_exec_approval_includes_command_preview(self): + from gateway.platforms.qqbot.keyboards import ( + ApprovalRequest, build_approval_text, + ) + req = ApprovalRequest( + session_key="s", + title="t", + command_preview="rm -rf /tmp/demo", + cwd="/home/user", + timeout_sec=60, + ) + text = build_approval_text(req) + assert "命令执行审批" in text + assert "rm -rf /tmp/demo" in text + assert "/home/user" in text + assert "60" in text + + def test_plugin_approval_uses_severity_icon(self): + from gateway.platforms.qqbot.keyboards import ( + ApprovalRequest, build_approval_text, + ) + crit = ApprovalRequest( + session_key="s", title="dangerous op", + severity="critical", tool_name="shell", timeout_sec=30, + ) + assert "🔴" in build_approval_text(crit) + + info = ApprovalRequest( + session_key="s", title="read-only", severity="info", tool_name="q", + ) + assert "🔵" in build_approval_text(info) + + default = ApprovalRequest(session_key="s", title="t", tool_name="x") + assert "🟡" in build_approval_text(default) + + def test_truncates_long_commands(self): + from gateway.platforms.qqbot.keyboards import ( + ApprovalRequest, build_approval_text, + ) + long = "x" * 1000 + req = ApprovalRequest( + session_key="s", title="t", command_preview=long, cwd="/x", + ) + text = build_approval_text(req) + # Preview is truncated to 300 chars; 1000 "x"s would still push the + # body past 300, but the inline preview specifically must be capped. + preview_line = [ + line for line in text.split("\n") if line.startswith("```") + ] + # 2 backtick fences; the content line in between is separate. + xs_in_preview = sum(line.count("x") for line in text.split("\n") if line and "```" not in line) + assert xs_in_preview <= 301 # 300 xs + one-off tolerance + + +class TestInteractionEventParsing: + def test_parse_c2c_interaction(self): + from gateway.platforms.qqbot.keyboards import parse_interaction_event + raw = { + "id": "interaction-42", + "chat_type": 2, + "user_openid": "user-1", + "data": { + "type": 11, + "resolved": { + "button_data": "approve:sess:allow-once", + "button_id": "allow", + }, + }, + } + ev = parse_interaction_event(raw) + assert ev.id == "interaction-42" + assert ev.scene == "c2c" + assert ev.chat_type == 2 + assert ev.user_openid == "user-1" + assert ev.button_data == "approve:sess:allow-once" + assert ev.button_id == "allow" + assert ev.operator_openid == "user-1" + + def test_parse_group_interaction(self): + from gateway.platforms.qqbot.keyboards import parse_interaction_event + raw = { + "id": "i-1", + "chat_type": 1, + "group_openid": "grp-1", + "group_member_openid": "mem-1", + "data": { + "type": 11, + "resolved": { + "button_data": "update_prompt:y", + "button_id": "yes", + }, + }, + } + ev = parse_interaction_event(raw) + assert ev.scene == "group" + assert ev.group_openid == "grp-1" + assert ev.group_member_openid == "mem-1" + assert ev.operator_openid == "mem-1" # member openid preferred in group + + def test_parse_missing_data_gracefully(self): + from gateway.platforms.qqbot.keyboards import parse_interaction_event + ev = parse_interaction_event({"id": "i", "chat_type": 0}) + assert ev.id == "i" + assert ev.scene == "guild" + assert ev.button_data == "" + assert ev.button_id == "" + assert ev.type == 0 + + +class TestAdapterInteractionDispatch: + """End-to-end verification of _on_interaction including ACK + callback.""" + + def _make_adapter(self): + from gateway.platforms.qqbot.adapter import QQAdapter + return QQAdapter(_make_config(app_id="a", client_secret="b")) + + @pytest.mark.asyncio + async def test_callback_invoked_with_parsed_event(self): + adapter = self._make_adapter() + + # Stub ACK so we don't require a live http_client. + ack_calls = [] + + async def fake_ack(interaction_id, code=0): + ack_calls.append((interaction_id, code)) + + adapter._acknowledge_interaction = fake_ack # type: ignore[assignment] + + received = [] + + async def cb(event): + received.append(event) + + adapter.set_interaction_callback(cb) + await adapter._on_interaction({ + "id": "i-1", + "chat_type": 2, + "user_openid": "user-1", + "data": { + "type": 11, + "resolved": {"button_data": "approve:s:deny", "button_id": "deny"}, + }, + }) + + assert len(ack_calls) == 1 + assert ack_calls[0][0] == "i-1" + assert len(received) == 1 + assert received[0].button_data == "approve:s:deny" + assert received[0].scene == "c2c" + + @pytest.mark.asyncio + async def test_missing_id_skips_ack(self): + adapter = self._make_adapter() + + ack_calls = [] + + async def fake_ack(interaction_id, code=0): + ack_calls.append(interaction_id) + + adapter._acknowledge_interaction = fake_ack # type: ignore[assignment] + + callback_calls = [] + + async def cb(event): + callback_calls.append(event) + + adapter.set_interaction_callback(cb) + await adapter._on_interaction({ + "chat_type": 2, # no id + "data": {"resolved": {"button_data": "approve:s:deny"}}, + }) + + assert ack_calls == [] + assert callback_calls == [] + + @pytest.mark.asyncio + async def test_callback_exception_does_not_propagate(self): + adapter = self._make_adapter() + + async def fake_ack(interaction_id, code=0): + pass + + adapter._acknowledge_interaction = fake_ack # type: ignore[assignment] + + async def bad_cb(event): + raise RuntimeError("boom") + + adapter.set_interaction_callback(bad_cb) + # Should NOT raise. + await adapter._on_interaction({ + "id": "i-2", + "chat_type": 2, + "user_openid": "u", + "data": {"resolved": {"button_data": "approve:s:deny"}}, + }) + + @pytest.mark.asyncio + async def test_no_callback_is_harmless(self): + adapter = self._make_adapter() + + async def fake_ack(interaction_id, code=0): + pass + + adapter._acknowledge_interaction = fake_ack # type: ignore[assignment] + # No callback set — default None. + await adapter._on_interaction({ + "id": "i-3", + "chat_type": 2, + "user_openid": "u", + "data": {"resolved": {"button_data": "approve:s:deny"}}, + })