From cf55c738e79bf1a9ae809d11bcab695e83f4e248 Mon Sep 17 00:00:00 2001 From: WideLee Date: Tue, 21 Apr 2026 20:36:50 +0800 Subject: [PATCH] refactor(qqbot): migrate qr onboard flow to sync + consolidate into onboard.py - Replace async create_bind_task/poll_bind_result with synchronous httpx.Client equivalents, eliminating manual event loop management - Move _render_qr and full qr_register() entry-point into onboard.py, mirroring the Feishu onboarding pattern - Remove _qqbot_render_qr and _qqbot_qr_flow from gateway.py (~90 lines); call site becomes a single qr_register() import - Fix potential segfault: previous code called loop.close() in the EXPIRED branch and again in the finally block (double-close crashed under uvloop) --- gateway/platforms/qqbot/__init__.py | 6 +- gateway/platforms/qqbot/onboard.py | 138 +++++++++++++++++++++++----- hermes_cli/gateway.py | 103 +-------------------- 3 files changed, 121 insertions(+), 126 deletions(-) diff --git a/gateway/platforms/qqbot/__init__.py b/gateway/platforms/qqbot/__init__.py index 7119dd979..130269b5f 100644 --- a/gateway/platforms/qqbot/__init__.py +++ b/gateway/platforms/qqbot/__init__.py @@ -26,9 +26,8 @@ from .adapter import ( # noqa: F401 # -- Onboard (QR-code scan-to-configure) ----------------------------------- from .onboard import ( # noqa: F401 BindStatus, - create_bind_task, - poll_bind_result, build_connect_url, + qr_register, ) from .crypto import decrypt_secret, generate_bind_key # noqa: F401 @@ -44,9 +43,8 @@ __all__ = [ "_ssrf_redirect_guard", # onboard "BindStatus", - "create_bind_task", - "poll_bind_result", "build_connect_url", + "qr_register", # crypto "decrypt_secret", "generate_bind_key", diff --git a/gateway/platforms/qqbot/onboard.py b/gateway/platforms/qqbot/onboard.py index 65750b3f1..b48c39a4f 100644 --- a/gateway/platforms/qqbot/onboard.py +++ b/gateway/platforms/qqbot/onboard.py @@ -1,6 +1,10 @@ """ QQBot scan-to-configure (QR code onboard) module. +Mirrors the Feishu onboarding pattern: synchronous HTTP + a single public +entry-point ``qr_register()`` that handles the full flow (create task → +display QR code → poll → decrypt credentials). + Calls the ``q.qq.com`` ``create_bind_task`` / ``poll_bind_result`` APIs to generate a QR-code URL and poll for scan completion. On success the caller receives the bot's *app_id*, *client_secret* (decrypted locally), and the @@ -12,18 +16,20 @@ Reference: https://bot.q.qq.com/wiki/develop/api-v2/ from __future__ import annotations import logging +import time from enum import IntEnum -from typing import Tuple +from typing import Optional, Tuple from urllib.parse import quote from .constants import ( ONBOARD_API_TIMEOUT, ONBOARD_CREATE_PATH, + ONBOARD_POLL_INTERVAL, ONBOARD_POLL_PATH, PORTAL_HOST, QR_URL_TEMPLATE, ) -from .crypto import generate_bind_key +from .crypto import decrypt_secret, generate_bind_key from .utils import get_api_headers logger = logging.getLogger(__name__) @@ -35,7 +41,7 @@ logger = logging.getLogger(__name__) class BindStatus(IntEnum): - """Status codes returned by ``poll_bind_result``.""" + """Status codes returned by ``_poll_bind_result``.""" NONE = 0 PENDING = 1 @@ -44,18 +50,40 @@ class BindStatus(IntEnum): # --------------------------------------------------------------------------- -# Public API +# QR rendering +# --------------------------------------------------------------------------- + +try: + import qrcode as _qrcode_mod +except (ImportError, TypeError): + _qrcode_mod = None # type: ignore[assignment] + + +def _render_qr(url: str) -> bool: + """Try to render a QR code in the terminal. Returns True if successful.""" + if _qrcode_mod is None: + return False + try: + qr = _qrcode_mod.QRCode( + error_correction=_qrcode_mod.constants.ERROR_CORRECT_M, + border=2, + ) + qr.add_data(url) + qr.make(fit=True) + qr.print_ascii(invert=True) + return True + except Exception: + return False + + +# --------------------------------------------------------------------------- +# Synchronous HTTP helpers (mirrors Feishu _post_registration pattern) # --------------------------------------------------------------------------- -async def create_bind_task( - timeout: float = ONBOARD_API_TIMEOUT, -) -> Tuple[str, str]: +def _create_bind_task(timeout: float = ONBOARD_API_TIMEOUT) -> Tuple[str, str]: """Create a bind task and return *(task_id, aes_key_base64)*. - The AES key is generated locally and sent to the server so it can - encrypt the bot credentials before returning them. - Raises: RuntimeError: If the API returns a non-zero ``retcode``. """ @@ -64,8 +92,8 @@ async def create_bind_task( url = f"https://{PORTAL_HOST}{ONBOARD_CREATE_PATH}" key = generate_bind_key() - async with httpx.AsyncClient(timeout=timeout, follow_redirects=True) as client: - resp = await client.post(url, json={"key": key}, headers=get_api_headers()) + with httpx.Client(timeout=timeout, follow_redirects=True) as client: + resp = client.post(url, json={"key": key}, headers=get_api_headers()) resp.raise_for_status() data = resp.json() @@ -80,7 +108,7 @@ async def create_bind_task( return task_id, key -async def poll_bind_result( +def _poll_bind_result( task_id: str, timeout: float = ONBOARD_API_TIMEOUT, ) -> Tuple[BindStatus, str, str, str]: @@ -89,12 +117,6 @@ async def poll_bind_result( Returns: A 4-tuple of ``(status, bot_appid, bot_encrypt_secret, user_openid)``. - * ``bot_encrypt_secret`` is AES-256-GCM encrypted — decrypt it with - :func:`~gateway.platforms.qqbot.crypto.decrypt_secret` using the - key from :func:`create_bind_task`. - * ``user_openid`` is the OpenID of the person who scanned the code - (available when ``status == COMPLETED``). - Raises: RuntimeError: If the API returns a non-zero ``retcode``. """ @@ -102,8 +124,8 @@ async def poll_bind_result( url = f"https://{PORTAL_HOST}{ONBOARD_POLL_PATH}" - async with httpx.AsyncClient(timeout=timeout, follow_redirects=True) as client: - resp = await client.post(url, json={"task_id": task_id}, headers=get_api_headers()) + with httpx.Client(timeout=timeout, follow_redirects=True) as client: + resp = client.post(url, json={"task_id": task_id}, headers=get_api_headers()) resp.raise_for_status() data = resp.json() @@ -122,3 +144,77 @@ async def poll_bind_result( def build_connect_url(task_id: str) -> str: """Build the QR-code target URL for a given *task_id*.""" return QR_URL_TEMPLATE.format(task_id=quote(task_id)) + + +# --------------------------------------------------------------------------- +# Public entry-point +# --------------------------------------------------------------------------- + +_MAX_REFRESHES = 3 + + +def qr_register(timeout_seconds: int = 600) -> Optional[dict]: + """Run the QQBot scan-to-configure QR registration flow. + + Mirrors ``feishu.qr_register()``: handles create → display → poll → + decrypt in one call. Unexpected errors propagate to the caller. + + :returns: + ``{"app_id": ..., "client_secret": ..., "user_openid": ...}`` on + success, or ``None`` on failure / expiry / cancellation. + """ + deadline = time.monotonic() + timeout_seconds + + for refresh_count in range(_MAX_REFRESHES + 1): + # ── Create bind task ── + try: + task_id, aes_key = _create_bind_task() + except Exception as exc: + logger.warning("[QQBot onboard] Failed to create bind task: %s", exc) + return None + + url = build_connect_url(task_id) + + # ── Display QR code + URL ── + print() + if _render_qr(url): + print(f" Scan the QR code above, or open this URL directly:\n {url}") + else: + print(f" Open this URL in QQ on your phone:\n {url}") + print(" Tip: pip install qrcode to display a scannable QR code here") + print() + + # ── Poll loop ── + while time.monotonic() < deadline: + try: + status, app_id, encrypted_secret, user_openid = _poll_bind_result(task_id) + except Exception: + time.sleep(ONBOARD_POLL_INTERVAL) + continue + + if status == BindStatus.COMPLETED: + client_secret = decrypt_secret(encrypted_secret, aes_key) + print() + print(f" QR scan complete! (App ID: {app_id})") + if user_openid: + print(f" Scanner's OpenID: {user_openid}") + return { + "app_id": app_id, + "client_secret": client_secret, + "user_openid": user_openid, + } + + if status == BindStatus.EXPIRED: + if refresh_count >= _MAX_REFRESHES: + logger.warning("[QQBot onboard] QR code expired %d times — giving up", _MAX_REFRESHES) + return None + print(f"\n QR code expired, refreshing... ({refresh_count + 1}/{_MAX_REFRESHES})") + break # next for-loop iteration creates a new task + + time.sleep(ONBOARD_POLL_INTERVAL) + else: + # deadline reached without completing + logger.warning("[QQBot onboard] Poll timed out after %ds", timeout_seconds) + return None + + return None diff --git a/hermes_cli/gateway.py b/hermes_cli/gateway.py index 481566f9d..59bd37d11 100644 --- a/hermes_cli/gateway.py +++ b/hermes_cli/gateway.py @@ -3132,7 +3132,8 @@ def _setup_qqbot(): if method_idx == 0: # ── QR scan-to-configure ── try: - credentials = _qqbot_qr_flow() + from gateway.platforms.qqbot import qr_register + credentials = qr_register() except KeyboardInterrupt: print() print_warning(" QQ Bot setup cancelled.") @@ -3214,106 +3215,6 @@ def _setup_qqbot(): print_info(f" App ID: {credentials['app_id']}") -def _qqbot_render_qr(url: str) -> bool: - """Try to render a QR code in the terminal. Returns True if successful.""" - try: - import qrcode as _qr - qr = _qr.QRCode(border=1,error_correction=_qr.constants.ERROR_CORRECT_L) - qr.add_data(url) - qr.make(fit=True) - qr.print_ascii(invert=True) - return True - except Exception: - return False - - -def _qqbot_qr_flow(): - """Run the QR-code scan-to-configure flow. - - Returns a dict with app_id, client_secret, user_openid on success, - or None on failure/cancel. - """ - try: - from gateway.platforms.qqbot import ( - create_bind_task, poll_bind_result, build_connect_url, - decrypt_secret, BindStatus, - ) - from gateway.platforms.qqbot.constants import ONBOARD_POLL_INTERVAL - except Exception as exc: - print_error(f" QQBot onboard import failed: {exc}") - return None - - import asyncio - import time - - MAX_REFRESHES = 3 - refresh_count = 0 - - while refresh_count <= MAX_REFRESHES: - loop = asyncio.new_event_loop() - - # ── Create bind task ── - try: - task_id, aes_key = loop.run_until_complete(create_bind_task()) - except Exception as e: - print_warning(f" Failed to create bind task: {e}") - loop.close() - return None - - url = build_connect_url(task_id) - - # ── Display QR code + URL ── - print() - if _qqbot_render_qr(url): - print(f" Scan the QR code above, or open this URL directly:\n {url}") - else: - print(f" Open this URL in QQ on your phone:\n {url}") - print_info(" Tip: pip install qrcode to show a scannable QR code here") - - # ── Poll loop (silent — keep QR visible at bottom) ── - try: - while True: - try: - status, app_id, encrypted_secret, user_openid = loop.run_until_complete( - poll_bind_result(task_id) - ) - except Exception: - time.sleep(ONBOARD_POLL_INTERVAL) - continue - - if status == BindStatus.COMPLETED: - client_secret = decrypt_secret(encrypted_secret, aes_key) - print() - print_success(f" QR scan complete! (App ID: {app_id})") - if user_openid: - print_info(f" Scanner's OpenID: {user_openid}") - return { - "app_id": app_id, - "client_secret": client_secret, - "user_openid": user_openid, - } - - if status == BindStatus.EXPIRED: - refresh_count += 1 - if refresh_count > MAX_REFRESHES: - print() - print_warning(f" QR code expired {MAX_REFRESHES} times — giving up.") - return None - print() - print_warning(f" QR code expired, refreshing... ({refresh_count}/{MAX_REFRESHES})") - loop.close() - break # outer while creates a new task - - time.sleep(ONBOARD_POLL_INTERVAL) - except KeyboardInterrupt: - loop.close() - raise - finally: - loop.close() - - return None - - def _setup_signal(): """Interactive setup for Signal messenger.""" import shutil