diff --git a/gateway/config.py b/gateway/config.py index c58cd0f73..75cba7336 100644 --- a/gateway/config.py +++ b/gateway/config.py @@ -1038,6 +1038,25 @@ def _apply_env_overrides(config: GatewayConfig) -> None: if webhook_secret: config.platforms[Platform.WEBHOOK].extra["secret"] = webhook_secret + # DingTalk + dingtalk_client_id = os.getenv("DINGTALK_CLIENT_ID") + dingtalk_client_secret = os.getenv("DINGTALK_CLIENT_SECRET") + if dingtalk_client_id and dingtalk_client_secret: + if Platform.DINGTALK not in config.platforms: + config.platforms[Platform.DINGTALK] = PlatformConfig() + config.platforms[Platform.DINGTALK].enabled = True + config.platforms[Platform.DINGTALK].extra.update({ + "client_id": dingtalk_client_id, + "client_secret": dingtalk_client_secret, + }) + dingtalk_home = os.getenv("DINGTALK_HOME_CHANNEL") + if dingtalk_home: + config.platforms[Platform.DINGTALK].home_channel = HomeChannel( + platform=Platform.DINGTALK, + chat_id=dingtalk_home, + name=os.getenv("DINGTALK_HOME_CHANNEL_NAME", "Home"), + ) + # Feishu / Lark feishu_app_id = os.getenv("FEISHU_APP_ID") feishu_app_secret = os.getenv("FEISHU_APP_SECRET") diff --git a/hermes_cli/dingtalk_auth.py b/hermes_cli/dingtalk_auth.py new file mode 100644 index 000000000..b97e20358 --- /dev/null +++ b/hermes_cli/dingtalk_auth.py @@ -0,0 +1,292 @@ +""" +DingTalk Device Flow authorization. + +Implements the same 3-step registration flow as dingtalk-openclaw-connector: + 1. POST /app/registration/init → get nonce + 2. POST /app/registration/begin → get device_code + verification_uri_complete + 3. POST /app/registration/poll → poll until SUCCESS → get client_id + client_secret + +The verification_uri_complete is rendered as a QR code in the terminal so the +user can scan it with DingTalk to authorize, yielding AppKey + AppSecret +automatically. +""" + +from __future__ import annotations + +import io +import os +import sys +import time +import logging +from typing import Optional, Tuple + +import requests + +logger = logging.getLogger(__name__) + +# ── Configuration ────────────────────────────────────────────────────────── + +REGISTRATION_BASE_URL = os.environ.get( + "DINGTALK_REGISTRATION_BASE_URL", "https://oapi.dingtalk.com" +).rstrip("/") + +REGISTRATION_SOURCE = os.environ.get("DINGTALK_REGISTRATION_SOURCE", "openClaw") + + +# ── API helpers ──────────────────────────────────────────────────────────── + +class RegistrationError(Exception): + """Raised when a DingTalk registration API call fails.""" + + +def _api_post(path: str, payload: dict) -> dict: + """POST to the registration API and return the parsed JSON body.""" + url = f"{REGISTRATION_BASE_URL}{path}" + try: + resp = requests.post(url, json=payload, timeout=15) + resp.raise_for_status() + data = resp.json() + except requests.RequestException as exc: + raise RegistrationError(f"Network error calling {url}: {exc}") from exc + + errcode = data.get("errcode", -1) + if errcode != 0: + errmsg = data.get("errmsg", "unknown error") + raise RegistrationError(f"API error [{path}]: {errmsg} (errcode={errcode})") + return data + + +# ── Core flow ────────────────────────────────────────────────────────────── + +def begin_registration() -> dict: + """Start a device-flow registration. + + Returns a dict with keys: + device_code, verification_uri_complete, expires_in, interval + """ + # Step 1: init → nonce + init_data = _api_post("/app/registration/init", {"source": REGISTRATION_SOURCE}) + nonce = str(init_data.get("nonce", "")).strip() + if not nonce: + raise RegistrationError("init response missing nonce") + + # Step 2: begin → device_code, verification_uri_complete + begin_data = _api_post("/app/registration/begin", {"nonce": nonce}) + device_code = str(begin_data.get("device_code", "")).strip() + verification_uri_complete = str(begin_data.get("verification_uri_complete", "")).strip() + if not device_code: + raise RegistrationError("begin response missing device_code") + if not verification_uri_complete: + raise RegistrationError("begin response missing verification_uri_complete") + + return { + "device_code": device_code, + "verification_uri_complete": verification_uri_complete, + "expires_in": int(begin_data.get("expires_in", 7200)), + "interval": max(int(begin_data.get("interval", 3)), 2), + } + + +def poll_registration(device_code: str) -> dict: + """Poll the registration status once. + + Returns a dict with keys: status, client_id?, client_secret?, fail_reason? + """ + data = _api_post("/app/registration/poll", {"device_code": device_code}) + status_raw = str(data.get("status", "")).strip().upper() + if status_raw not in ("WAITING", "SUCCESS", "FAIL", "EXPIRED"): + status_raw = "UNKNOWN" + return { + "status": status_raw, + "client_id": str(data.get("client_id", "")).strip() or None, + "client_secret": str(data.get("client_secret", "")).strip() or None, + "fail_reason": str(data.get("fail_reason", "")).strip() or None, + } + + +def wait_for_registration_success( + device_code: str, + interval: int = 3, + expires_in: int = 7200, + on_waiting: Optional[callable] = None, +) -> Tuple[str, str]: + """Block until the registration succeeds or times out. + + Returns (client_id, client_secret). + """ + deadline = time.monotonic() + expires_in + retry_window = 120 # 2 minutes for transient errors + retry_start = 0.0 + + while time.monotonic() < deadline: + time.sleep(interval) + try: + result = poll_registration(device_code) + except RegistrationError: + if retry_start == 0: + retry_start = time.monotonic() + if time.monotonic() - retry_start < retry_window: + continue + raise + + status = result["status"] + if status == "WAITING": + retry_start = 0 + if on_waiting: + on_waiting() + continue + if status == "SUCCESS": + cid = result["client_id"] + csecret = result["client_secret"] + if not cid or not csecret: + raise RegistrationError("authorization succeeded but credentials are missing") + return cid, csecret + # FAIL / EXPIRED / UNKNOWN + if retry_start == 0: + retry_start = time.monotonic() + if time.monotonic() - retry_start < retry_window: + continue + reason = result.get("fail_reason") or status + raise RegistrationError(f"authorization failed: {reason}") + + raise RegistrationError("authorization timed out, please retry") + + +# ── QR code rendering ───────────────────────────────────────────────────── + +def _ensure_qrcode_installed() -> bool: + """Try to import qrcode; if missing, auto-install it via pip/uv.""" + try: + import qrcode # noqa: F401 + return True + except ImportError: + pass + + import subprocess + + # Try uv first (Hermes convention), then pip + for cmd in ( + [sys.executable, "-m", "uv", "pip", "install", "qrcode"], + [sys.executable, "-m", "pip", "install", "-q", "qrcode"], + ): + try: + subprocess.check_call(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + import qrcode # noqa: F401,F811 + return True + except (subprocess.CalledProcessError, ImportError, FileNotFoundError): + continue + return False + + +def render_qr_to_terminal(url: str) -> bool: + """Render *url* as a compact QR code in the terminal. + + Returns True if the QR code was printed, False if the library is missing. + """ + try: + import qrcode + except ImportError: + return False + + qr = qrcode.QRCode( + version=1, + error_correction=qrcode.constants.ERROR_CORRECT_L, + box_size=1, + border=1, + ) + qr.add_data(url) + qr.make(fit=True) + + # Use half-block characters for compact rendering (2 rows per character) + matrix = qr.get_matrix() + rows = len(matrix) + lines: list[str] = [] + + TOP_HALF = "\u2580" # ▀ + BOTTOM_HALF = "\u2584" # ▄ + FULL_BLOCK = "\u2588" # █ + EMPTY = " " + + for r in range(0, rows, 2): + line_chars: list[str] = [] + for c in range(len(matrix[r])): + top = matrix[r][c] + bottom = matrix[r + 1][c] if r + 1 < rows else False + if top and bottom: + line_chars.append(FULL_BLOCK) + elif top: + line_chars.append(TOP_HALF) + elif bottom: + line_chars.append(BOTTOM_HALF) + else: + line_chars.append(EMPTY) + lines.append(" " + "".join(line_chars)) + + print("\n".join(lines)) + return True + + +# ── High-level entry point for the setup wizard ─────────────────────────── + +def dingtalk_qr_auth() -> Optional[Tuple[str, str]]: + """Run the interactive QR-code device-flow authorization. + + Returns (client_id, client_secret) on success, or None if the user + cancelled or the flow failed. + """ + from hermes_cli.setup import print_info, print_success, print_warning, print_error + + print() + print_info(" Initializing DingTalk device authorization...") + + try: + reg = begin_registration() + except RegistrationError as exc: + print_error(f" Authorization init failed: {exc}") + return None + + url = reg["verification_uri_complete"] + + # Ensure qrcode library is available (auto-install if missing) + if not _ensure_qrcode_installed(): + print_warning(" qrcode library install failed, will show link only.") + + print() + print_info(" Please scan the QR code below with DingTalk to authorize:") + print() + + if not render_qr_to_terminal(url): + print_warning(f" QR code render failed, please open the link below to authorize:") + + print() + print_info(f" Or open this link manually: {url}") + print() + print_info(" Waiting for QR scan authorization... (timeout: 2 hours)") + + dot_count = 0 + + def _on_waiting(): + nonlocal dot_count + dot_count += 1 + if dot_count % 10 == 0: + sys.stdout.write(".") + sys.stdout.flush() + + try: + client_id, client_secret = wait_for_registration_success( + device_code=reg["device_code"], + interval=reg["interval"], + expires_in=reg["expires_in"], + on_waiting=_on_waiting, + ) + except RegistrationError as exc: + print() + print_error(f" Authorization failed: {exc}") + return None + + print() + print_success(" QR scan authorization successful!") + print_success(f" Client ID: {client_id}") + print_success(f" Client Secret: {client_secret[:8]}{'*' * (len(client_secret) - 8)}") + + return client_id, client_secret diff --git a/hermes_cli/gateway.py b/hermes_cli/gateway.py index d010a601d..585bbe446 100644 --- a/hermes_cli/gateway.py +++ b/hermes_cli/gateway.py @@ -2211,9 +2211,62 @@ def _setup_sms(): def _setup_dingtalk(): - """Configure DingTalk via the standard platform setup.""" + """Configure DingTalk — QR scan (recommended) or manual credential entry.""" + from hermes_cli.setup import ( + prompt_choice, prompt_yes_no, print_info, print_success, print_warning, + ) + dingtalk_platform = next(p for p in _PLATFORMS if p["key"] == "dingtalk") - _setup_standard_platform(dingtalk_platform) + emoji = dingtalk_platform["emoji"] + label = dingtalk_platform["label"] + + print() + print(color(f" ─── {emoji} {label} Setup ───", Colors.CYAN)) + + existing = get_env_value("DINGTALK_CLIENT_ID") + if existing: + print() + print_success(f"{label} is already configured (Client ID: {existing}).") + if not prompt_yes_no(f" Reconfigure {label}?", False): + return + + print() + method = prompt_choice( + " Choose setup method", + [ + "QR Code Scan (Recommended, auto-obtain Client ID and Client Secret)", + "Manual Input (Client ID and Client Secret)", + ], + default=0, + ) + + if method == 0: + # ── QR-code device-flow authorization ── + try: + from hermes_cli.dingtalk_auth import dingtalk_qr_auth + except ImportError as exc: + print_warning(f" QR auth module failed to load ({exc}), falling back to manual input.") + _setup_standard_platform(dingtalk_platform) + return + + result = dingtalk_qr_auth() + if result is None: + print_warning(" QR auth incomplete, falling back to manual input.") + _setup_standard_platform(dingtalk_platform) + return + + client_id, client_secret = result + save_env_value("DINGTALK_CLIENT_ID", client_id) + save_env_value("DINGTALK_CLIENT_SECRET", client_secret) + save_env_value("DINGTALK_ALLOW_ALL_USERS", "true") + print() + print_success(f"{emoji} {label} configured via QR scan!") + else: + # ── Manual entry ── + _setup_standard_platform(dingtalk_platform) + # Also enable allow-all by default for convenience + if get_env_value("DINGTALK_CLIENT_ID"): + save_env_value("DINGTALK_ALLOW_ALL_USERS", "true") def _setup_wecom(): @@ -2749,6 +2802,8 @@ def gateway_setup(): _setup_signal() elif platform["key"] == "weixin": _setup_weixin() + elif platform["key"] == "dingtalk": + _setup_dingtalk() elif platform["key"] == "feishu": _setup_feishu() else: