diff --git a/.gitignore b/.gitignore index 72f3bd17f7..385ff7cabe 100644 --- a/.gitignore +++ b/.gitignore @@ -69,3 +69,6 @@ mini-swe-agent/ .nix-stamps/ result website/static/api/skills-index.json + +.claude + diff --git a/gateway/config.py b/gateway/config.py index 67ebf73461..0263fe4765 100644 --- a/gateway/config.py +++ b/gateway/config.py @@ -491,6 +491,22 @@ def load_gateway_config() -> GatewayConfig: with open(config_yaml_path, encoding="utf-8") as f: yaml_cfg = yaml.safe_load(f) or {} + # Hydrate top-level UPPERCASE_HOME_CHANNEL keys from config.yaml + # into os.environ so `_apply_env_overrides` picks them up on boot. + # Without this, /sethome-written yaml entries get orphaned across + # gateway restarts (yaml survives, process env does not), which + # makes onboarding prompts and home-channel routing flap between + # sessions. Env var (if already set) takes precedence. + for _k, _v in yaml_cfg.items(): + if ( + isinstance(_k, str) + and _k.endswith("_HOME_CHANNEL") + and _k.isupper() + and _v + and not os.getenv(_k) + ): + os.environ[_k] = str(_v) + # Map config.yaml keys → GatewayConfig.from_dict() schema. # Each key overwrites whatever gateway.json may have set. sr = yaml_cfg.get("session_reset") diff --git a/gateway/platforms/dingtalk.py b/gateway/platforms/dingtalk.py index 3037e402b2..0b68505e8d 100644 --- a/gateway/platforms/dingtalk.py +++ b/gateway/platforms/dingtalk.py @@ -30,11 +30,14 @@ import asyncio import json import logging import os +import random import re +import time import traceback import uuid from datetime import datetime, timezone -from typing import Any, Dict, List, Optional, Set +from pathlib import Path +from typing import Any, Dict, List, Optional, Set, Tuple try: import dingtalk_stream @@ -56,6 +59,19 @@ except ImportError: }, ) # type: ignore[assignment] +# Capture pristine ``websockets.connect`` at our import time so dingtalk-stream +# always uses the original even if another module (e.g. the Feishu adapter) +# later monkey-patches the global ``websockets.connect`` with an ``async def`` +# wrapper — that turns the return value into a coroutine and breaks the SDK's +# ``async with websockets.connect(uri)`` at dingtalk_stream/stream.py:74. +try: + import websockets as _pristine_ws_module + + _PRISTINE_WEBSOCKETS_CONNECT = _pristine_ws_module.connect +except ImportError: + _pristine_ws_module = None # type: ignore[assignment] + _PRISTINE_WEBSOCKETS_CONNECT = None # type: ignore[assignment] + try: import httpx @@ -64,6 +80,37 @@ except ImportError: HTTPX_AVAILABLE = False httpx = None # type: ignore[assignment] + +_WS_PROXY_INSTALLED = False + + +def _install_dingtalk_websockets_proxy() -> None: + """Point dingtalk_stream.stream.websockets at a namespace holding the + pristine ``connect`` captured at our import time. + + Idempotent. Safe no-op when dingtalk-stream or websockets is unavailable. + """ + global _WS_PROXY_INSTALLED + if _WS_PROXY_INSTALLED: + return + if _PRISTINE_WEBSOCKETS_CONNECT is None or dingtalk_stream is None: + return + try: + import types + + from dingtalk_stream import stream as _dts_stream + + _dts_stream.websockets = types.SimpleNamespace( + connect=_PRISTINE_WEBSOCKETS_CONNECT, + exceptions=_pristine_ws_module.exceptions, + ) + _WS_PROXY_INSTALLED = True + except Exception: # pragma: no cover - defensive + logger.debug( + "[DingTalk] failed to install websockets proxy on dingtalk_stream.stream", + exc_info=True, + ) + # Card SDK for AI Cards (following QwenPaw pattern) try: from alibabacloud_dingtalk.card_1_0 import ( @@ -103,12 +150,873 @@ RECONNECT_BACKOFF = [2, 5, 10, 30, 60] _SESSION_WEBHOOKS_MAX = 500 _DINGTALK_WEBHOOK_RE = re.compile(r'^https://(?:api|oapi)\.dingtalk\.com/') +# AI Card streaming_update QPS guard. The DingTalk gateway returns HTTP 403 +# "concurrent update" when multiple edit_message calls hit the same card +# within ~500ms. Reference (openclaw-connector reply-dispatcher.ts:103) uses +# 800ms as the per-card minimum interval between non-finalize edits. We +# match that to avoid the same 403 storm. Finalize edits are NEVER +# throttled — dropping them would leave the card stuck in streaming state. +_CARD_EDIT_THROTTLE_MS = 800 +# Error-send cooldown so repeated failures don't spam users +# (reply-dispatcher.ts:108 uses 60s — same pattern, same reason). +_ERROR_COOLDOWN_MS = 60_000 + +# Global token-bucket rate for AI Card streaming_update across ALL chats. +# DingTalk's official cap is ~40 QPS per tenant; reference connector +# (messaging/card.ts:18) uses 20 as a safety margin. Matching that, so +# concurrent sessions never bust the tenant-wide ceiling. +_CARD_API_MAX_QPS = 20 +_CARD_API_QPS_BACKOFF_MS = 2_000 + +# Inbound-message queue TTL. queueKey entries that haven't received a new +# message for this long are eligible for sweep (reference +# core/message-handler.ts:92 uses 5 min). +_INBOUND_QUEUE_TTL_SEC = 300 +# Busy-ACK phrases when inbound queue already has a pending task. Picked +# randomly so repeats don't feel scripted (reference utils/constants.ts +# QUEUE_BUSY_ACK_PHRASES). +_QUEUE_BUSY_ACK_PHRASES = ( + "收到,让我先把前一条处理完 🙏", + "稍等,排队中……", + "收到~手头这条完事就来", + "别急,按顺序处理中", +) + + +class _CardTokenBucket: + """Global async token bucket for DingTalk card streaming_update. + + Mirrors messaging/card.ts:23-95. All streamAICard/edit_message calls + across every chat + account share one bucket so concurrent sessions + don't blow past the tenant-wide QPS limit and trigger 403 storms. + Refills at ``rate`` tokens/second with capacity = rate. On an + upstream 403 limit, callers call ``trigger_backoff`` and subsequent + acquirers wait out the backoff window. + """ + + def __init__(self, rate: float) -> None: + self._rate = float(rate) + self._tokens = float(rate) + self._last_refill = time.monotonic() + self._backoff_until = 0.0 + self._lock = asyncio.Lock() + + async def acquire(self) -> None: + async with self._lock: + now = time.monotonic() + if now < self._backoff_until: + await asyncio.sleep(self._backoff_until - now) + now = time.monotonic() + elapsed = now - self._last_refill + self._tokens = min(self._rate, self._tokens + elapsed * self._rate) + self._last_refill = now + if self._tokens < 1.0: + wait_s = (1.0 - self._tokens) / self._rate + await asyncio.sleep(wait_s) + self._tokens = 0.0 + self._last_refill = time.monotonic() + else: + self._tokens -= 1.0 + + def trigger_backoff(self) -> None: + self._backoff_until = time.monotonic() + _CARD_API_QPS_BACKOFF_MS / 1000.0 + + +# Process-wide card-API rate limiter (shared across adapters). +_CARD_BUCKET = _CardTokenBucket(_CARD_API_MAX_QPS) + +# DingTalk OpenAPI endpoints for proactive (non-session-webhook) messaging. +_DINGTALK_OAUTH_TOKEN_URL = "https://api.dingtalk.com/v1.0/oauth2/accessToken" +_DINGTALK_OAPI_TOKEN_URL = "https://oapi.dingtalk.com/gettoken" # legacy, for /media/upload +_DINGTALK_OAPI_MEDIA_UPLOAD_URL = "https://oapi.dingtalk.com/media/upload" +_DINGTALK_OTO_BATCH_SEND_URL = "https://api.dingtalk.com/v1.0/robot/oToMessages/batchSend" +_DINGTALK_GROUP_SEND_URL = "https://api.dingtalk.com/v1.0/robot/groupMessages/send" + +# Process-wide access token cache: (client_id) -> (token, expires_at_ts). +# Tokens are valid for ~2h; we refresh with a 5-minute safety margin so parallel +# sends share one round-trip. Two caches because /media/upload needs the +# legacy OAPI token (different endpoint, different token) — matches the +# reference connector's getAccessToken / getOapiAccessToken split. +_DINGTALK_TOKEN_CACHE: Dict[str, tuple[str, float]] = {} +_DINGTALK_OAPI_TOKEN_CACHE: Dict[str, tuple[str, float]] = {} +_DINGTALK_TOKEN_LOCK = asyncio.Lock() +_DINGTALK_OAPI_TOKEN_LOCK = asyncio.Lock() +_DINGTALK_TOKEN_SAFETY_MARGIN = 300.0 # refresh 5 min before expiry + +# Media upload limits. DingTalk /media/upload caps at 20 MB per file. +_DINGTALK_MEDIA_MAX_SIZE = 20 * 1024 * 1024 +# File-extension → (media_type, msg_key) routing for /media/upload. +_IMAGE_EXTS = frozenset({".png", ".jpg", ".jpeg", ".gif", ".bmp", ".webp"}) +_VIDEO_EXTS = frozenset({".mp4", ".mov", ".avi", ".mkv", ".webm"}) +_VOICE_EXTS = frozenset({".amr", ".mp3", ".wav", ".aac", ".m4a", ".ogg"}) + +# LWCP-encoded sender_id (opaque DingTalk-internal form). Treated as OTO but +# the OpenAPI will reject it as ``staffId.notExisted`` unless the caller has +# resolved it to a real staffId via the contact APIs first. +_LWCP_SENDER_RE = re.compile(r'^\$:LWCP_') +# Real openConversationId (group chat): ``cid...==`` base64-padded form. +_OPEN_CONVERSATION_RE = re.compile(r'^cid[A-Za-z0-9+/_\-]+={0,2}$') + # DingTalk message type → runtime content type DINGTALK_TYPE_MAPPING = { "picture": "image", "voice": "audio", } +# --------------------------------------------------------------------------- +# File content auto-parsing (ported from dingtalk-openclaw-connector +# core/message-handler.ts:700-956) +# --------------------------------------------------------------------------- + +# Extensions that can be parsed as plain text and injected into agent context. +_TEXT_FILE_EXTS = frozenset({ + ".txt", ".md", ".json", ".xml", ".yaml", ".yml", ".csv", ".log", + ".js", ".ts", ".py", ".java", ".c", ".cpp", ".h", ".sh", ".bat", + ".html", ".css", ".sql", ".rb", ".go", ".rs", ".toml", ".ini", ".cfg", +}) +_DOCX_EXTS = frozenset({".docx", ".doc"}) +_PDF_EXTS = frozenset({".pdf"}) +_EXCEL_EXTS = frozenset({".xlsx", ".xls", ".xlsm"}) +# All parseable extensions (text + docx + pdf + excel). +_PARSEABLE_FILE_EXTS = _TEXT_FILE_EXTS | _DOCX_EXTS | _PDF_EXTS | _EXCEL_EXTS + + +def _file_type_label(ext: str) -> str: + """Return a human-readable Chinese label for a file extension.""" + if ext in _TEXT_FILE_EXTS: + return "文本文件" + if ext in _DOCX_EXTS: + return "Word 文档" + if ext in _PDF_EXTS: + return "PDF 文档" + if ext in {".xlsx", ".xls"}: + return "Excel 表格" + if ext in {".pptx", ".ppt"}: + return "PPT 演示文稿" + if ext in {".zip", ".rar", ".7z", ".tar", ".gz"}: + return "压缩包" + if ext in _IMAGE_EXTS: + return "图片" + if ext in _VIDEO_EXTS: + return "视频" + if ext in _VOICE_EXTS: + return "音频" + return "文件" + + +def _parse_text_file(file_path: str) -> Optional[str]: + """Read a plain-text file and return its content.""" + try: + text = Path(file_path).read_text(encoding="utf-8", errors="replace").strip() + return text if text else None + except Exception as exc: + logger.warning("Failed to read text file %s: %s", file_path, exc) + return None + + +def _parse_docx_file(file_path: str) -> Optional[str]: + """Extract raw text from a .docx file using python-docx.""" + try: + import docx # python-docx + except ImportError: + logger.warning( + "python-docx not installed, cannot parse .docx. " + "Install with: pip install python-docx" + ) + return None + try: + doc = docx.Document(file_path) + paragraphs = [p.text for p in doc.paragraphs if p.text.strip()] + text = "\n".join(paragraphs).strip() + return text if text else None + except Exception as exc: + logger.warning("Failed to parse docx %s: %s", file_path, exc) + return None + + +def _parse_pdf_file(file_path: str) -> Optional[str]: + """Extract text from a PDF file. + + Tries pdfplumber first (better table / layout handling), then falls back + to PyPDF2. + """ + # Try pdfplumber + try: + import pdfplumber + with pdfplumber.open(file_path) as pdf: + pages_text = [p.extract_text() or "" for p in pdf.pages] + text = "\n".join(pages_text).strip() + if text: + return text + except ImportError: + pass + except Exception as exc: + logger.warning("pdfplumber failed for %s: %s", file_path, exc) + + # Fallback to PyPDF2 + try: + from PyPDF2 import PdfReader + reader = PdfReader(file_path) + pages_text = [page.extract_text() or "" for page in reader.pages] + text = "\n".join(pages_text).strip() + return text if text else None + except ImportError: + logger.warning( + "Neither pdfplumber nor PyPDF2 installed, cannot parse PDF. " + "Install with: pip install pdfplumber or pip install PyPDF2" + ) + return None + except Exception as exc: + logger.warning("PyPDF2 failed for %s: %s", file_path, exc) + return None + + +def _parse_excel_file(file_path: str) -> Optional[str]: + """Extract text representation from an Excel (.xlsx/.xls/.xlsm) file. + + Tries openpyxl first (modern xlsx), then falls back to xlrd (legacy xls). + Converts each sheet into a Markdown-style table so the LLM can reason + over the data directly. + """ + # -- Try openpyxl (xlsx / xlsm) -- + try: + import openpyxl + wb = openpyxl.load_workbook(file_path, read_only=True, data_only=True) + sheets_text: List[str] = [] + for sheet_name in wb.sheetnames: + ws = wb[sheet_name] + rows: List[List[str]] = [] + for row in ws.iter_rows(values_only=True): + cells = [str(c) if c is not None else "" for c in row] + rows.append(cells) + if not rows: + continue + # Build markdown table + header = "| " + " | ".join(rows[0]) + " |" + sep = "| " + " | ".join(["---"] * len(rows[0])) + " |" + body_lines = [] + for r in rows[1:]: + # Pad or truncate to header length + padded = r + [""] * (len(rows[0]) - len(r)) + body_lines.append("| " + " | ".join(padded[:len(rows[0])]) + " |") + table = "\n".join([header, sep] + body_lines) + sheets_text.append(f"### Sheet: {sheet_name}\n\n{table}") + wb.close() + text = "\n\n".join(sheets_text).strip() + if text: + return text + except ImportError: + pass + except Exception as exc: + logger.warning("openpyxl failed for %s: %s", file_path, exc) + + # -- Fallback: pandas (handles both xlsx and xls) -- + try: + import pandas as pd + xls = pd.ExcelFile(file_path) + sheets_text = [] + for sheet_name in xls.sheet_names: + df = pd.read_excel(xls, sheet_name=sheet_name) + md_table = df.to_markdown(index=False) + if md_table: + sheets_text.append(f"### Sheet: {sheet_name}\n\n{md_table}") + text = "\n\n".join(sheets_text).strip() + return text if text else None + except ImportError: + logger.warning( + "Neither openpyxl nor pandas installed, cannot parse Excel. " + "Install with: pip install openpyxl or pip install pandas" + ) + return None + except Exception as exc: + logger.warning("pandas Excel parse failed for %s: %s", file_path, exc) + return None + + +def _parse_file_content(file_path: str, file_name: str) -> Optional[str]: + """Dispatch to the correct parser based on file extension. + + Returns the extracted text, or None if unparseable / binary. + """ + ext = os.path.splitext(file_name)[1].lower() + if ext in _TEXT_FILE_EXTS: + return _parse_text_file(file_path) + if ext in _DOCX_EXTS: + return _parse_docx_file(file_path) + if ext in _PDF_EXTS: + return _parse_pdf_file(file_path) + if ext in _EXCEL_EXTS: + return _parse_excel_file(file_path) + return None + + +# Persistent inbound-media storage. Files live here so agent tools +# (vision_analyze / transcribe_audio / read_file …) can reopen them after +# ``_on_message`` returns. Cleanup on adapter connect ages out files +# older than 24 h so the directory doesn't grow unboundedly. + +def _inbound_media_dir() -> str: + base = os.environ.get("HERMES_HOME") or os.path.expanduser("~/.hermes") + path = os.path.join(base, "inbound_media") + os.makedirs(path, exist_ok=True) + return path + + +def _cleanup_inbound_media(older_than_seconds: int = 24 * 3600) -> int: + """Delete inbox files older than the cutoff. Returns the count removed.""" + removed = 0 + try: + dir_path = _inbound_media_dir() + now = time.time() + for name in os.listdir(dir_path): + full = os.path.join(dir_path, name) + try: + if os.path.isfile(full) and (now - os.path.getmtime(full)) > older_than_seconds: + os.unlink(full) + removed += 1 + except OSError: + continue + except Exception: + logger.debug("inbound media cleanup failed", exc_info=True) + return removed + + +async def _download_file_to_inbox( + url: str, file_name: str, *, msg_id: str = "", timeout: float = 60.0, +) -> Optional[str]: + """Download *url* into the persistent inbox; agent tools can reopen it. + + ``msg_id`` is hashed into the output filename so concurrent messages + don't clobber each other. + """ + if not HTTPX_AVAILABLE: + return None + try: + ext = os.path.splitext(file_name)[1] or "" + base = re.sub(r'[^\w.-]', '_', os.path.splitext(file_name)[0])[:80] or "file" + slug = re.sub(r'[^\w]', '', msg_id)[:16] if msg_id else uuid.uuid4().hex[:12] + out_path = os.path.join(_inbound_media_dir(), f"{slug}_{base}{ext}") + async with httpx.AsyncClient(timeout=timeout, follow_redirects=True) as client: + resp = await client.get(url) + resp.raise_for_status() + with open(out_path, "wb") as fh: + fh.write(resp.content) + logger.info( + "Downloaded inbound media %s (%d bytes) -> %s", + file_name, len(resp.content), out_path, + ) + return out_path + except Exception as exc: + logger.warning("Failed to download inbound media %s: %s", file_name, exc) + return None + + +# Extension → MIME type used when we hand a local inbound file off to the +# agent via ``event.media_urls`` / ``media_types``. +_EXT_TO_MIME = { + ".mp3": "audio/mpeg", ".m4a": "audio/mp4", ".wav": "audio/wav", + ".ogg": "audio/ogg", ".amr": "audio/amr", ".aac": "audio/aac", + ".mp4": "video/mp4", ".mov": "video/quicktime", ".webm": "video/webm", + ".avi": "video/x-msvideo", ".mkv": "video/x-matroska", + ".png": "image/png", ".jpg": "image/jpeg", ".jpeg": "image/jpeg", + ".gif": "image/gif", ".webp": "image/webp", ".svg": "image/svg+xml", + ".pdf": "application/pdf", ".md": "text/markdown", ".txt": "text/plain", + ".csv": "text/csv", ".json": "application/json", + ".zip": "application/zip", ".tar": "application/x-tar", ".gz": "application/gzip", + ".docx": "application/vnd.openxmlformats-officedocument.wordprocessingml.document", + ".xlsx": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", +} + + +def _mime_for_file(file_name: str) -> str: + ext = os.path.splitext(file_name)[1].lower() + return _EXT_TO_MIME.get(ext, "application/octet-stream") + + +async def _dingtalk_fetch_access_token( + client_id: str, client_secret: str +) -> str: + """Return a cached DingTalk accessToken, refreshing if near expiry. + + Uses ``/v1.0/oauth2/accessToken`` directly (no SDK dependency) so the + ``send_message`` tool can call this without opening a Stream WebSocket + that would conflict with the running gateway's exclusive connection. + """ + if not HTTPX_AVAILABLE: + raise RuntimeError("httpx not installed") + + now = datetime.now(tz=timezone.utc).timestamp() + cached = _DINGTALK_TOKEN_CACHE.get(client_id) + if cached and cached[1] - _DINGTALK_TOKEN_SAFETY_MARGIN > now: + return cached[0] + + async with _DINGTALK_TOKEN_LOCK: + cached = _DINGTALK_TOKEN_CACHE.get(client_id) + if cached and cached[1] - _DINGTALK_TOKEN_SAFETY_MARGIN > now: + return cached[0] + async with httpx.AsyncClient(timeout=15.0) as client: + resp = await client.post( + _DINGTALK_OAUTH_TOKEN_URL, + json={"appKey": client_id, "appSecret": client_secret}, + ) + resp.raise_for_status() + body = resp.json() + token = body.get("accessToken") or "" + expires_in = body.get("expireIn") or body.get("expires_in") or 7200 + if not token: + raise RuntimeError( + f"DingTalk accessToken response missing token: {body}" + ) + _DINGTALK_TOKEN_CACHE[client_id] = ( + token, now + float(expires_in), + ) + return token + + +async def _dingtalk_fetch_oapi_token( + client_id: str, client_secret: str, +) -> Optional[str]: + """Fetch the legacy OAPI accessToken (for /media/upload). + + DingTalk has two parallel token systems: + - ``/v1.0/oauth2/accessToken`` — new, for message OpenAPI + - ``/gettoken`` — legacy OAPI, for /media/upload + Cached separately because they're different endpoints returning + different tokens. Returns None on any failure (non-fatal — + callers should fall back to text-only). + """ + if not HTTPX_AVAILABLE: + return None + now = datetime.now(tz=timezone.utc).timestamp() + cached = _DINGTALK_OAPI_TOKEN_CACHE.get(client_id) + if cached and cached[1] - _DINGTALK_TOKEN_SAFETY_MARGIN > now: + return cached[0] + + async with _DINGTALK_OAPI_TOKEN_LOCK: + cached = _DINGTALK_OAPI_TOKEN_CACHE.get(client_id) + if cached and cached[1] - _DINGTALK_TOKEN_SAFETY_MARGIN > now: + return cached[0] + try: + async with httpx.AsyncClient(timeout=15.0) as client: + resp = await client.get( + _DINGTALK_OAPI_TOKEN_URL, + params={"appkey": client_id, "appsecret": client_secret}, + ) + body = resp.json() + if body.get("errcode") != 0 or not body.get("access_token"): + return None + token = str(body["access_token"]) + expires_in = float(body.get("expires_in") or 7200) + _DINGTALK_OAPI_TOKEN_CACHE[client_id] = ( + token, now + expires_in, + ) + return token + except Exception: + return None + + +def _classify_media_kind(path: str) -> str: + """Return one of 'image' / 'voice' / 'video' / 'file' for a local path.""" + ext = os.path.splitext(path)[1].lower() + if ext in _IMAGE_EXTS: + return "image" + if ext in _VOICE_EXTS: + return "voice" + if ext in _VIDEO_EXTS: + return "video" + return "file" + + +async def _dingtalk_upload_media( + *, + oapi_token: str, + file_path: str, + media_kind: str, +) -> Optional[str]: + """Upload a local file to DingTalk /media/upload. + + Returns the raw ``media_id`` on success (preserving the leading ``@`` + when present), or None on failure. + + Caller is responsible for shaping the id per msgKey: + - ``sampleImageMsg.photoURL`` → ``@`` (raw mediaId, NOT the + CDN download URL — the CDN URL renders as a white placeholder in + the DingTalk client; reference messaging.ts:714-719) + - ``sampleFile.mediaId`` → ``@`` + - ``sampleAudio.mediaId`` → ``@`` + - ``sampleVideo.videoMediaId`` → ``@`` + + Matches the reference connector's ``uploadMediaToDingTalk`` + (media/common.ts:65). Large-file chunked upload (>20MB) not yet + ported — callers should pre-split or downscale. + """ + if not HTTPX_AVAILABLE: + return None + if not os.path.exists(file_path): + return None + size = os.path.getsize(file_path) + if size > _DINGTALK_MEDIA_MAX_SIZE: + logger.warning( + "[dingtalk] media file %s is %d bytes, exceeds 20MB limit", + file_path, size, + ) + return None + + ext = os.path.splitext(file_path)[1].lower() + if media_kind == "image": + content_type = "image/png" if ext == ".png" else "image/jpeg" + elif media_kind == "video": + content_type = "video/mp4" if ext == ".mp4" else "video/quicktime" + elif media_kind == "voice": + content_type = "audio/mpeg" if ext == ".mp3" else "audio/amr" + else: + content_type = "application/octet-stream" + + # DingTalk OAPI quirk: videos must be uploaded with type=file + # (reference: media/common.ts:141). + upload_type = "file" if media_kind == "video" else media_kind + + try: + with open(file_path, "rb") as fh: + files = { + "media": ( + os.path.basename(file_path), + fh, + content_type, + ), + } + async with httpx.AsyncClient(timeout=60.0) as client: + resp = await client.post( + _DINGTALK_OAPI_MEDIA_UPLOAD_URL, + params={"access_token": oapi_token, "type": upload_type}, + files=files, + ) + body = resp.json() + if body.get("errcode") != 0: + logger.warning( + "[dingtalk] /media/upload errcode=%s errmsg=%s", + body.get("errcode"), body.get("errmsg"), + ) + return None + media_id = body.get("media_id") or "" + return media_id or None + except Exception as e: + logger.warning("[dingtalk] /media/upload failed: %s", e) + return None + + +def _dingtalk_classify_chat_id(chat_id: str) -> tuple[str, str]: + """Classify a DingTalk chat_id into (bucket, resolved_id). + + Accepts the same target-prefix convention the dingtalk-openclaw-connector + reference uses (``src/services/messaging.ts:sendTextToDingTalk``): + + - ``"group:"`` → group + - ``"user:"`` → oto + - ``"cid...=="`` → group (auto-detect) + - ``"$:LWCP_..."`` → lwcp (fast-fail, needs contact perms) + - anything else → oto (assume staffId) + + Returns ``(bucket, id)`` where ``id`` is the stripped value to put into + the OpenAPI payload. + """ + if not chat_id: + return "oto", "" + if chat_id.startswith("group:"): + return "group", chat_id[6:] + if chat_id.startswith("user:"): + return "oto", chat_id[5:] + if _LWCP_SENDER_RE.match(chat_id): + return "lwcp", chat_id + if _OPEN_CONVERSATION_RE.match(chat_id): + return "group", chat_id + return "oto", chat_id + + +# Smart markdown detection — content contains any of these characters/leading +# markers, assume markdown and upgrade msgKey to sampleMarkdown (reference: +# messaging.ts:820-826). Plain-text prose uses sampleText so DingTalk's +# client renders it without extra padding/heading inference. +_MARKDOWN_LEADING_RE = re.compile(r'^[#*>\-]') +_MARKDOWN_INLINE_RE = re.compile(r'[*_`#\[\]]') + + +def _looks_like_markdown(content: str) -> bool: + if not content: + return False + first_line = content.lstrip().split("\n", 1)[0] + if _MARKDOWN_LEADING_RE.match(first_line): + return True + if _MARKDOWN_INLINE_RE.search(content): + return True + if "\n" in content: # multi-line prose renders better as markdown + return True + return False + + +def _dingtalk_build_msg_param( + content: str, + *, + title: str = "Hermes", + msg_type: Optional[str] = None, +) -> tuple[str, str]: + """Return (msgKey, JSON-encoded msgParam). + + ``msg_type`` overrides auto-detection: + - "text" → sampleText + - "markdown" → sampleMarkdown + - "image" → sampleImageMsg (content = mediaId or photoURL) + - "file" → sampleFile (content = mediaId) + - "voice" → sampleAudio (content = mediaId; duration=0 per reference) + - "video" → sampleVideo (content = mediaId) + If None, chooses sampleMarkdown vs sampleText via + ``_looks_like_markdown``. Matches the reference's ``buildMsgPayload`` + (messaging.ts:150+). + """ + truncated = content[:MAX_MESSAGE_LENGTH] + + if msg_type == "image": + # sampleImageMsg.photoURL accepts the raw media_id with the leading + # ``@`` preserved — DingTalk's server-side resolves this to the + # tenant-internal image store. Reference (messaging.ts:714-719 + + # messaging.ts:188-192): `photoURL: uploadResult.mediaId` with the + # inline comment "使用原始 mediaId(带 @)". + # + # Earlier iterations passed `https://down.dingtalk.com/media/` + # here. The API accepted it, but DingTalk clients rendered a blank + # (white) image because the CDN URL requires a tenant-scoped auth + # handshake that the client doesn't perform during `sampleImageMsg` + # rendering. Only keep the URL passthrough for already-public http(s) + # URLs the caller provides directly. + if truncated.startswith(("http://", "https://")): + photo_url = truncated + else: + photo_url = truncated if truncated.startswith("@") else f"@{truncated}" + return "sampleImageMsg", json.dumps( + {"photoURL": photo_url}, ensure_ascii=False, + ) + if msg_type == "file": + # sampleFile / sampleAudio / sampleVideo take the raw media_id with + # leading ``@`` preserved (reference: media.ts:680+ & :876). + media_id = truncated if truncated.startswith("@") else f"@{truncated}" + return "sampleFile", json.dumps( + {"mediaId": media_id, "fileName": title, "fileType": "file"}, + ensure_ascii=False, + ) + if msg_type == "voice": + media_id = truncated if truncated.startswith("@") else f"@{truncated}" + return "sampleAudio", json.dumps( + {"mediaId": media_id, "duration": "0"}, ensure_ascii=False, + ) + if msg_type == "video": + media_id = truncated if truncated.startswith("@") else f"@{truncated}" + return "sampleVideo", json.dumps( + {"videoMediaId": media_id, "videoType": "mp4", + "picMediaId": "", "duration": "0"}, + ensure_ascii=False, + ) + if msg_type == "text": + return "sampleText", json.dumps( + {"content": truncated}, ensure_ascii=False, + ) + if msg_type == "markdown" or _looks_like_markdown(truncated): + return "sampleMarkdown", json.dumps( + {"title": title, "text": truncated}, ensure_ascii=False, + ) + return "sampleText", json.dumps( + {"content": truncated}, ensure_ascii=False, + ) + + +async def _dingtalk_post_one( + *, + token: str, + robot_code: str, + bucket: str, + target_id: str, + msg_key: str, + msg_param: str, +) -> Dict[str, Any]: + """POST a single already-built message to the correct Robot OpenAPI endpoint.""" + if bucket == "group": + url = _DINGTALK_GROUP_SEND_URL + payload = { + "robotCode": robot_code, + "openConversationId": target_id, + "msgKey": msg_key, + "msgParam": msg_param, + } + else: + url = _DINGTALK_OTO_BATCH_SEND_URL + payload = { + "robotCode": robot_code, + "userIds": [target_id], + "msgKey": msg_key, + "msgParam": msg_param, + } + + headers = { + "x-acs-dingtalk-access-token": token, + "Content-Type": "application/json", + } + try: + async with httpx.AsyncClient(timeout=30.0) as client: + resp = await client.post(url, json=payload, headers=headers) + except Exception as e: + return {"error": f"DingTalk OpenAPI request failed: {e}"} + + if resp.status_code >= 300: + return { + "error": f"DingTalk OpenAPI HTTP {resp.status_code}: {resp.text[:300]}" + } + + try: + body = resp.json() + except Exception: + body = {} + # Robot OpenAPI returns ``processQueryKey`` on success (reference: + # openclaw-connector messaging.ts:254,331,999). Fallback to requestId. + request_id = body.get("processQueryKey") or body.get("requestId") or "" + return {"success": True, "request_id": request_id} + + +async def dingtalk_send_proactive( + *, + client_id: str, + client_secret: str, + robot_code: str, + chat_id: str, + content: str, + media_files: Optional[List[tuple[str, bool]]] = None, + title: str = "Hermes", +) -> Dict[str, Any]: + """Send a message via DingTalk Robot OpenAPI without Stream Mode. + + Proactive-send counterpart to ``DingTalkAdapter.send()``'s + session_webhook path. Authenticates with AppKey/Secret, routes by + ``chat_id`` shape (group / oto / lwcp), and optionally uploads+sends + media attachments as additional messages. + + ``media_files`` is a list of ``(local_path, is_voice_hint)`` tuples. + Each file gets uploaded to /media/upload, then sent as a dedicated + message (sampleImageMsg / sampleAudio / sampleVideo / sampleFile). + Text is sent first (if non-empty), then each media in order. Mirrors + Feishu's approach of emitting separate messages per attachment. + + ``title`` is used for sampleMarkdown headings when content auto-upgrades. + + Returns ``{"success": True, "request_id": ..., "route": ...}`` on success + of the primary (text or first) message. ``media_results`` lists each + media attempt with its outcome. + """ + if not HTTPX_AVAILABLE: + return {"error": "httpx not installed"} + if not client_id or not client_secret: + return {"error": "DingTalk proactive send requires client_id + client_secret"} + if not robot_code: + return {"error": "DingTalk proactive send requires robot_code (defaults to client_id)"} + if not chat_id: + return {"error": "DingTalk proactive send requires chat_id"} + + # Classify FIRST so LWCP fast-fails with zero network roundtrip. + bucket, target_id = _dingtalk_classify_chat_id(chat_id) + if bucket == "lwcp": + return { + "error": ( + "DingTalk chat_id is LWCP-encoded sender_id (%s…): cannot be routed " + "directly by Robot OpenAPI. Grant the app 'qyapi_get_department_list' " + "+ 'qyapi_get_department_member' permissions at open-dev.dingtalk.com " + "and resolve to plain staffId before calling proactive send." + ) % target_id[:16] + } + + try: + token = await _dingtalk_fetch_access_token(client_id, client_secret) + except Exception as e: + return {"error": f"DingTalk accessToken fetch failed: {e}"} + + results: List[Dict[str, Any]] = [] + primary_request_id = "" + + # 1. Text first (if non-empty). Smart markdown detection applied here. + if content and content.strip(): + msg_key, msg_param = _dingtalk_build_msg_param(content, title=title) + text_result = await _dingtalk_post_one( + token=token, robot_code=robot_code, bucket=bucket, + target_id=target_id, msg_key=msg_key, msg_param=msg_param, + ) + if text_result.get("error"): + return text_result + primary_request_id = text_result.get("request_id", "") + + # 2. Media (one message per file). Non-fatal per file — we collect + # per-file results and let the caller decide what to surface. + if media_files: + oapi_token = await _dingtalk_fetch_oapi_token(client_id, client_secret) + if not oapi_token: + results.append({ + "error": ( + "Could not obtain OAPI token for /media/upload — verify the " + "app has ServerAPI permissions (qyapi_media) and retry." + ), + }) + # Keep the text result if we had one. + if primary_request_id: + return { + "success": True, + "request_id": primary_request_id, + "chat_id": target_id, + "route": bucket, + "media_results": results, + } + return results[-1] + + for raw_path, is_voice_hint in media_files: + media_kind = ( + "voice" if is_voice_hint and _classify_media_kind(raw_path) in ("voice", "file") + else _classify_media_kind(raw_path) + ) + media_id = await _dingtalk_upload_media( + oapi_token=oapi_token, + file_path=raw_path, + media_kind=media_kind, + ) + if not media_id: + results.append({ + "error": f"upload failed for {os.path.basename(raw_path)}", + "path": raw_path, + }) + continue + m_msg_key, m_msg_param = _dingtalk_build_msg_param( + media_id, + msg_type=media_kind, + title=os.path.basename(raw_path), + ) + m_result = await _dingtalk_post_one( + token=token, robot_code=robot_code, bucket=bucket, + target_id=target_id, msg_key=m_msg_key, msg_param=m_msg_param, + ) + m_result["kind"] = media_kind + m_result["path"] = raw_path + results.append(m_result) + if not primary_request_id and m_result.get("success"): + primary_request_id = m_result.get("request_id", "") + + if not primary_request_id and not content.strip(): + # Nothing delivered (empty text, all media failed). + return { + "error": "Nothing delivered: empty text and all media uploads failed", + "media_results": results, + } + + return { + "success": True, + "request_id": primary_request_id, + "chat_id": target_id, + "route": bucket, + **({"media_results": results} if results else {}), + } + def check_dingtalk_requirements() -> bool: """Check if DingTalk dependencies are available and configured.""" @@ -202,14 +1110,43 @@ class DingTalkAdapter(BasePlatformAdapter): # auto-close them as siblings — otherwise tool-progress cards get # stuck in streaming state forever. self._streaming_cards: Dict[str, Dict[str, str]] = {} + # Per-card last-edit timestamp (ms) — used to throttle non-finalize + # edit_message() calls so DingTalk's concurrent-update 403 doesn't + # fire. Finalize edits bypass the check. Cleared on finalize. + self._card_last_edit_ms: Dict[str, int] = {} + # Per-chat error-send cooldown (chat_id -> last error send ts ms) + # so a burst of failures doesn't spam the user. + self._error_last_sent_ms: Dict[str, int] = {} # Track fire-and-forget emoji/reaction coroutines so Python's GC # doesn't drop them mid-flight, and we can cancel them on disconnect. self._bg_tasks: Set[asyncio.Task] = set() + # Per-session inbound message queue (promise-chain pattern ported + # from the reference connector, core/message-handler.ts:86-105). + # queueKey is the chat_id (conversation_id or sender_id). Each + # new inbound message chains a ``.then()`` onto the previous + # task so same-session messages are processed in arrival order, + # avoiding parallel agent runs that race on the same context. + self._session_queues: Dict[str, asyncio.Task] = {} + self._session_last_activity: Dict[str, float] = {} + # Periodic sweep of stale queue entries (>5 min idle) so long- + # idle chats don't keep references to old Tasks forever. + self._session_queue_sweeper: Optional[asyncio.Task] = None + # -- Connection lifecycle ----------------------------------------------- async def connect(self) -> bool: """Connect to DingTalk via Stream Mode.""" + # Age out stale inbound-media downloads so the directory doesn't grow + # unboundedly. Safe every connect — O(files), threshold (24 h) is + # well above any reasonable agent session. + removed = _cleanup_inbound_media() + if removed: + logger.info( + "[%s] Cleaned up %d stale inbound-media file(s)", + self.name, removed, + ) + if not DINGTALK_STREAM_AVAILABLE: logger.warning( "[%s] dingtalk-stream not installed. Run: pip install 'dingtalk-stream>=0.20'", @@ -230,6 +1167,8 @@ class DingTalkAdapter(BasePlatformAdapter): try: self._http_client = httpx.AsyncClient(timeout=30.0) + _install_dingtalk_websockets_proxy() + credential = dingtalk_stream.Credential( self._client_id, self._client_secret ) @@ -263,6 +1202,11 @@ class DingTalkAdapter(BasePlatformAdapter): ) self._stream_task = asyncio.create_task(self._run_stream()) + # Start the periodic sweeper for stale inbound-queue entries. + # Mirrors core/message-handler.ts:105 (setInterval 60s). + self._session_queue_sweeper = asyncio.create_task( + self._sweep_session_queues() + ) self._mark_connected() logger.info("[%s] Connected via Stream Mode", self.name) return True @@ -323,6 +1267,27 @@ class DingTalkAdapter(BasePlatformAdapter): logger.debug("[%s] stream task did not exit cleanly during disconnect", self.name) self._stream_task = None + # Stop the session-queue sweeper. + if self._session_queue_sweeper: + self._session_queue_sweeper.cancel() + try: + await self._session_queue_sweeper + except (asyncio.CancelledError, Exception): + pass + self._session_queue_sweeper = None + + # Cancel any still-pending inbound-queue tasks so disconnect + # doesn't hang waiting on half-handled messages. + if self._session_queues: + for task in list(self._session_queues.values()): + if not task.done(): + task.cancel() + await asyncio.gather( + *self._session_queues.values(), return_exceptions=True, + ) + self._session_queues.clear() + self._session_last_activity.clear() + # Cancel any in-flight background tasks (emoji reactions, etc.) if self._bg_tasks: for task in list(self._bg_tasks): @@ -338,6 +1303,8 @@ class DingTalkAdapter(BasePlatformAdapter): self._session_webhooks.clear() self._message_contexts.clear() self._streaming_cards.clear() + self._card_last_edit_ms.clear() + self._error_last_sent_ms.clear() self._done_emoji_fired.clear() self._dedup.clear() logger.info("[%s] Disconnected", self.name) @@ -460,6 +1427,117 @@ class DingTalkAdapter(BasePlatformAdapter): self._bg_tasks.add(task) task.add_done_callback(self._bg_tasks.discard) + # -- Inbound serialization queue --------------------------------------- + + def _inbound_queue_key(self, chatbot_msg: "ChatbotMessage") -> str: + """Build the queueKey for same-session inbound serialization. + + Mirrors core/message-handler.ts:1601-1649 but simplified: hermes + has one agent per adapter, so we key on chat_id alone + (conversation_id for groups, sender_id for DMs). Parallel + different-chat messages still fan out — the queue only serializes + same-chat arrivals. + """ + conv_id = getattr(chatbot_msg, "conversation_id", "") or "" + sender_id = getattr(chatbot_msg, "sender_id", "") or "" + return conv_id or sender_id + + async def _send_busy_ack(self, chatbot_msg: "ChatbotMessage") -> None: + """Send a text ACK when a new message lands on a busy queue. + + Reference: core/message-handler.ts:1662-1689 creates an AI Card + with ACK text and hands it to the dispatcher for later reuse. + hermes does NOT thread a pre-created card into ``send()`` (that + refactor is out of scope for this port), so we fall back to a + plain text webhook reply — users still see immediate feedback + that the bot received their message, just as a separate bubble + instead of an in-place card update. + """ + if not self._http_client: + return + webhook = getattr(chatbot_msg, "session_webhook", "") or "" + if not webhook or not _DINGTALK_WEBHOOK_RE.match(webhook): + return + phrase = random.choice(_QUEUE_BUSY_ACK_PHRASES) + try: + await self._http_client.post( + webhook, + json={"msgtype": "text", "text": {"content": phrase}}, + timeout=5.0, + ) + except Exception as e: + logger.debug("[%s] busy ACK send failed: %s", self.name, e) + + async def _enqueue_inbound(self, chatbot_msg: "ChatbotMessage") -> None: + """Chain inbound message processing onto the per-chat queue tail. + + Mirrors core/message-handler.ts:1651-1719. Same-chat messages are + processed strictly in order; different chats run in parallel. + When a new message lands while the queue is already working on a + previous one, a busy-ACK is sent fire-and-forget so the user sees + immediate feedback that we noticed their message. + """ + queue_key = self._inbound_queue_key(chatbot_msg) + if not queue_key: + await self._on_message(chatbot_msg) + return + + self._session_last_activity[queue_key] = time.monotonic() + prev_task = self._session_queues.get(queue_key) + is_busy = prev_task is not None and not prev_task.done() + + if is_busy: + self._spawn_bg(self._send_busy_ack(chatbot_msg)) + + async def _chained() -> None: + if prev_task is not None: + try: + await prev_task + except Exception: + # Prior task's failure is its own problem; don't + # block the whole queue. + pass + await self._on_message(chatbot_msg) + + task = asyncio.create_task(_chained()) + self._session_queues[queue_key] = task + + def _cleanup(t: asyncio.Task) -> None: + # Only clear the map if we're still the tail — a newer + # enqueue may have already replaced us. + if self._session_queues.get(queue_key) is t: + self._session_queues.pop(queue_key, None) + + task.add_done_callback(_cleanup) + + async def _sweep_session_queues(self) -> None: + """Periodic sweep of stale queueKey entries. + + Runs every 60s. Entries idle for more than ``_INBOUND_QUEUE_TTL_SEC`` + are dropped from the activity map; entries whose Task already + completed are also dropped from ``_session_queues`` (defensive — + done_callback normally handles this, but an entry may survive if + the callback raced with sweep). Mirrors + core/message-handler.ts:94-105. + """ + try: + while self._running: + await asyncio.sleep(60) + now = time.monotonic() + stale = [ + k for k, ts in self._session_last_activity.items() + if now - ts > _INBOUND_QUEUE_TTL_SEC + ] + for k in stale: + self._session_last_activity.pop(k, None) + task = self._session_queues.get(k) + if task is not None and task.done(): + self._session_queues.pop(k, None) + except asyncio.CancelledError: + return + except Exception as e: + logger.debug("[%s] session-queue sweep error: %s", self.name, e) + # -- AI Card lifecycle helpers ------------------------------------------ async def _close_streaming_siblings(self, chat_id: str) -> None: @@ -595,6 +1673,70 @@ class DingTalkAdapter(BasePlatformAdapter): # Determine message type and build media list msg_type, media_urls, media_types = self._extract_media(message) + logger.info( + "[%s] Pre-file-parse state: msg_type_str=%s text=%r media_urls=%d extensions_keys=%s", + self.name, + getattr(message, "message_type", "?"), + (text[:60] if text else ""), + len(media_urls), + list(getattr(message, "extensions", {}).keys())[:10], + ) + + # ------------------------------------------------------------------ + # Download remote image URLs to local files. + # + # DingTalk's signed OSS URLs are not universally accessible — the + # vision_analyze_tool fails when it tries to download them directly + # because the OSS signature is bound to specific request context. + # Mirrors connector's downloadImageToFile / downloadMediaByCode + # approach: download in the gateway process (same IP/context as the + # DingTalk SDK) and pass local file paths to the agent. + # ------------------------------------------------------------------ + if media_urls: + try: + media_urls = await self._download_images_to_local( + media_urls, media_types, message, + ) + except Exception: + logger.warning( + "[%s] Image download to local failed (non-fatal), keeping URLs", + self.name, exc_info=True, + ) + + # ------------------------------------------------------------------ + # File content auto-parsing: download text-type files (.md, .txt, + # .json, .docx, .pdf, etc.) and inject their content into ``text`` + # so the LLM can see the content immediately without needing tools. + # Mirrors dingtalk-openclaw-connector core/message-handler.ts:1226-1328. + # ------------------------------------------------------------------ + try: + file_parts, file_attachments = await self._extract_and_parse_file_attachments(message) + if file_parts: + file_text = "\n\n".join(file_parts) + text = f"{text}\n\n{file_text}" if text else file_text + logger.info( + "[%s] Injected %d file content block(s) into message text", + self.name, len(file_parts), + ) + # Expose each downloaded media file to the agent as a ``file://`` + # URI so tools (transcribe_audio / vision_analyze / read_file …) + # can open them. ``_extract_media`` + ``_download_images_to_local`` + # already populated ``media_urls`` for image/richText items; this + # appends the audio/video/file items we just downloaded. + for local_path, mime in file_attachments: + media_urls.append(f"file://{local_path}") + media_types.append(mime) + if file_attachments: + logger.info( + "[%s] Attached %d local file(s) to event media_urls", + self.name, len(file_attachments), + ) + except Exception: + logger.warning( + "[%s] File content extraction failed (non-fatal), continuing", + self.name, exc_info=True, + ) + if not text and not media_urls: logger.debug("[%s] Empty message, skipping", self.name) return @@ -639,6 +1781,106 @@ class DingTalkAdapter(BasePlatformAdapter): ) await self.handle_message(event) + @staticmethod + def _extract_quoted_msg_text(container: Any, max_depth: int = 3) -> Optional[str]: + """Extract a ``[引用] `` string from a DingTalk reply container. + + When a user long-presses → quotes → replies in the DingTalk client, + the inbound payload carries ``isReplyMsg=True`` + ``repliedMsg={...}`` + alongside the user's new text. The DingTalk Robot OpenAPI does not + surface this through a typed SDK field, so the reference connector + (dingtalk-openclaw-connector, ``core/message-handler.ts:163-240``) + walks the raw dict. We mirror that logic here to preserve the + quoted-context signal for the agent. + + ``max_depth`` bounds nested-quote recursion (matches openclaw's 3). + Returns ``None`` when no quote is present or the body is empty. + """ + if max_depth <= 0 or not container: + return None + if not container.get("isReplyMsg"): + return None + + replied = container.get("repliedMsg") + if not replied: + return None + + msg_type = replied.get("msgType") or "text" + + raw_content = replied.get("content") + if isinstance(raw_content, dict): + content_obj = raw_content + elif isinstance(raw_content, str): + try: + parsed = json.loads(raw_content) + content_obj = parsed if isinstance(parsed, dict) else {} + except (ValueError, TypeError): + content_obj = {} + else: + content_obj = {} + + body_text = "" + if msg_type == "text": + body_text = (content_obj.get("text") or replied.get("text") or "").strip() + if content_obj.get("isReplyMsg"): + nested = DingTalkAdapter._extract_quoted_msg_text(content_obj, max_depth - 1) + if nested: + body_text = f"{body_text}\n{nested}" if body_text else nested + elif msg_type == "richText": + rich_list = content_obj.get("richText") or [] + parts = [ + item.get("text", "") + for item in rich_list + if isinstance(item, dict) + and item.get("text") + and item.get("msgType") != "skill" + and not item.get("skillData") + ] + body_text = "".join(parts) + elif msg_type == "picture": + body_text = "[图片]" + elif msg_type == "video": + body_text = "[视频]" + elif msg_type == "audio": + body_text = content_obj.get("recognition") or "[语音消息]" + elif msg_type == "file": + file_name = content_obj.get("fileName") or "unknown" + body_text = f"[文件: {file_name}]" + elif msg_type == "markdown": + body_text = (content_obj.get("text") or "").strip() or "[markdown消息]" + elif msg_type == "interactiveCard": + # Forward-compatible extraction. Today DingTalk does NOT populate + # `content` for quoted interactiveCards — only 4 metadata fields + # (msgId/senderId/msgType/createdAt) are delivered. We are + # pushing the DingTalk IM team to populate `content.text` / + # `content.markdown` / `content.title` on the server side; once + # they ship, hermes picks it up automatically without code change. + card_text = "" + for key in ("text", "markdown", "title", "summary"): + candidate = content_obj.get(key) + if isinstance(candidate, str) and candidate.strip(): + card_text = candidate.strip() + break + if card_text: + body_text = card_text + else: + card_url = ( + content_obj.get("biz_custom_action_url") + or replied.get("biz_custom_action_url") + or "" + ) + body_text = ( + f"收到交互式卡片链接:{card_url}" + if card_url + else "[interactiveCard消息]" + ) + else: + body_text = f"[{msg_type}消息]" + + if not body_text: + return None + return f"[引用] {body_text}" + @staticmethod def _extract_text(message: "ChatbotMessage") -> str: """Extract plain text from a DingTalk chatbot message. @@ -650,17 +1892,40 @@ class DingTalkAdapter(BasePlatformAdapter): back to ``str(text)`` without extracting ``.content`` first. * rich text moved from ``message.rich_text`` (list) to ``message.rich_text_content.rich_text_list`` (list of dicts). + + When the user quoted an earlier message (``isReplyMsg=True``), the + quoted body is appended as ``[引用] `` so the agent sees the + conversational context, matching dingtalk-openclaw-connector. """ text = getattr(message, "text", None) or "" - # Handle TextContent object (SDK style) + # Handle TextContent object (SDK style). TextContent.from_dict routes + # unknown JSON fields (isReplyMsg, repliedMsg) into ``.extensions``. + quote_container: Optional[Dict[str, Any]] = None if hasattr(text, "content"): content = (text.content or "").strip() + extensions = getattr(text, "extensions", None) + if isinstance(extensions, dict) and extensions.get("isReplyMsg"): + quote_container = extensions elif isinstance(text, dict): content = text.get("content", "").strip() + if text.get("isReplyMsg"): + quote_container = text else: content = str(text).strip() + if quote_container: + try: + logger.info( + "dingtalk quote payload: %s", + json.dumps(quote_container, ensure_ascii=False, default=str), + ) + except Exception: + logger.info("dingtalk quote payload (repr): %r", quote_container) + quoted = DingTalkAdapter._extract_quoted_msg_text(quote_container, max_depth=3) + if quoted: + content = f"{content}\n{quoted}" if content else quoted + if not content: rich_text = getattr(message, "rich_text_content", None) or getattr( message, "rich_text", None @@ -684,6 +1949,35 @@ class DingTalkAdapter(BasePlatformAdapter): # (alice@example.com), SSH URLs (git@github.com), and literal # references the user wrote ("what does @openai think"). Let the # LLM see the raw text — it handles "@bot hello" cleanly. + + # ----------------------------------------------------------------- + # For message types that the SDK does NOT parse into typed attrs + # (audio / video / file), extract a reasonable text placeholder from + # extensions['content']. Matches connector's extractMessageContent. + # ----------------------------------------------------------------- + if not content: + msg_type_str = getattr(message, "message_type", "") or "" + ext_content = (getattr(message, "extensions", {}) or {}).get("content", None) + if isinstance(ext_content, str): + try: + ext_content = json.loads(ext_content) + except (ValueError, TypeError): + ext_content = None + if isinstance(ext_content, dict): + if msg_type_str == "audio": + content = ( + ext_content.get("recognition") + or ext_content.get("recognition_text") + or "[语音消息]" + ) + elif msg_type_str == "video": + content = "[视频]" + elif msg_type_str == "file": + fname = ext_content.get("fileName", "文件") + content = f"[文件: {fname}]" + elif msg_type_str == "markdown": + content = ext_content.get("text", "").strip() or "[markdown消息]" + return content def _extract_media(self, message: "ChatbotMessage"): @@ -746,6 +2040,317 @@ class DingTalkAdapter(BasePlatformAdapter): return msg_type, media_urls, media_types + async def _download_images_to_local( + self, + media_urls: List[str], + media_types: List[str], + message: "ChatbotMessage", + ) -> List[str]: + """Download remote image URLs to local temp files. + + DingTalk's ``_resolve_media_codes`` replaces download codes with signed + OSS URLs. These signed URLs are **not universally accessible** — the + OSS signature may be bound to specific request headers, IP ranges, or + Referer, so the downstream ``vision_analyze_tool`` (which runs in a + different network context) consistently fails with "Invalid image + source" or HTTP 403. + + The reference connector (dingtalk-openclaw-connector) avoids this by + downloading images to local files first and passing ``file://`` paths. + We mirror that approach here: for each image in *media_urls*, download + via httpx (which runs in the same process as the DingTalk SDK and + therefore shares the same network/IP context) and replace the URL with + the local temp-file path. + + Non-image entries and entries that fail to download are left unchanged. + """ + result = list(media_urls) # shallow copy + robot_code = getattr(message, "robot_code", None) or self._client_id + token: Optional[str] = None # lazy-fetched + + for i, url in enumerate(media_urls): + mtype = media_types[i] if i < len(media_types) else "" + if mtype != "image": + continue # only process images + + try: + download_url = url + # If URL is still a download code (not http), resolve it first + if not url.startswith("http"): + if token is None: + token = await self._get_access_token() + download_url = await self._resolve_single_download_url( + url, robot_code, token, + ) + if not download_url: + logger.warning( + "[%s] Failed to resolve image download code, keeping original", + self.name, + ) + continue + + # Download to persistent inbox (kept for agent tool re-access + # long after _on_message returns). + ext = ".png" # safe default for images + # Try to detect from URL path (before query params) + url_path = download_url.split("?")[0] + for img_ext in (".png", ".jpg", ".jpeg", ".gif", ".webp", ".bmp"): + if url_path.lower().endswith(img_ext): + ext = img_ext + break + + msg_id = getattr(message, "message_id", "") or "" + local_path = await _download_file_to_inbox( + download_url, f"image{ext}", msg_id=msg_id, timeout=30.0, + ) + if local_path: + result[i] = local_path + logger.info( + "[%s] Downloaded image to local: %s -> %s", + self.name, url[:60], local_path, + ) + else: + logger.warning( + "[%s] Image download failed, keeping original URL: %s", + self.name, url[:80], + ) + except Exception as exc: + logger.warning( + "[%s] Image download error (non-fatal): %s", self.name, exc, + ) + + return result + + async def _extract_and_parse_file_attachments( + self, message: "ChatbotMessage", + ) -> Tuple[List[str], List[Tuple[str, str]]]: + """Download rich-media attachments and describe them for the agent. + + Handles: + * msgtype='file' → download to inbox + parse if small text + * msgtype='audio' → download + surface DingTalk STT recognition + * msgtype='video' → download + * richText items → download each (non-picture) item + + DingTalk's SDK only parses ``text``/``picture``/``richText``; the raw + ``content`` dict for ``file``/``audio``/``video`` lives in + ``message.extensions['content']``. + + Returns ``(parts, attachments)``: + * ``parts`` — text blocks injected into the user message so the + LLM knows what was sent and where to find it. + * ``attachments`` — ``[(local_path, mime_type), ...]`` the caller + merges into ``event.media_urls`` / ``media_types`` so tools + (``transcribe_audio``, ``vision_analyze``, ``read_file`` …) can + open the file directly. + + Files live in ``~/.hermes/inbound_media/``; the cleanup sweep on + adapter connect ages out anything older than 24 h. + """ + parts: List[str] = [] + attachments: List[Tuple[str, str]] = [] + # (download_code_or_url, file_name, extra_text_hint) + items: List[Tuple[str, str, str]] = [] + + msg_type_str = getattr(message, "message_type", "") or "" + extensions = getattr(message, "extensions", {}) or {} + msg_id = getattr(message, "message_id", "") or "" + + raw_content: Optional[Dict[str, Any]] = None + if msg_type_str in ("file", "audio", "video"): + _raw = extensions.get("content", None) + if isinstance(_raw, str): + try: + _raw = json.loads(_raw) + except (ValueError, TypeError): + _raw = None + if isinstance(_raw, dict): + raw_content = _raw + + # 1) file msgtype + if msg_type_str == "file" and raw_content: + dl = raw_content.get("downloadCode", "") + fn = raw_content.get("fileName", "") + if dl and fn: + items.append((dl, fn, "")) + logger.info("[%s] Found file attachment: %s", self.name, fn) + + # 2) audio msgtype — carry DingTalk's STT text as the extra hint + if msg_type_str == "audio" and raw_content: + dl = raw_content.get("downloadCode", "") + fn = raw_content.get("fileName", "") or "audio.amr" + recog = ( + raw_content.get("recognition") + or raw_content.get("recognition_text") + or "" + ) + if dl: + items.append((dl, fn, recog)) + elif recog: + parts.append( + f"🎤 **音频**: {fn}\n" + f"📝 语音识别结果:\n{recog}" + ) + + # 3) video msgtype + if msg_type_str == "video" and raw_content: + dl = raw_content.get("downloadCode", "") + fn = raw_content.get("fileName", "") or "video.mp4" + if dl: + items.append((dl, fn, "")) + + # 4) richText non-picture items (pictures handled upstream by + # ``_resolve_media_codes`` + ``_extract_media`` + + # ``_download_images_to_local``). + rich_text = getattr(message, "rich_text_content", None) + if rich_text: + rich_list = getattr(rich_text, "rich_text_list", []) or [] + for item in rich_list: + if not isinstance(item, dict): + continue + dl = item.get("downloadCode") or item.get("download_code") or "" + fn = item.get("fileName") or item.get("file_name") or "" + itype = item.get("type", "") + if dl and fn and itype not in ("picture",): + items.append((dl, fn, "")) + + # 5) Future-proofing: SDK typed ``file_content`` attribute + if msg_type_str == "file" and not items: + fc = getattr(message, "file_content", None) + if fc: + dl = getattr(fc, "download_code", None) or "" + fn = getattr(fc, "file_name", None) or "" + if dl and fn: + items.append((dl, fn, "")) + + if not items: + return parts, attachments + + # Download every collected item into the persistent inbox and + # describe it for the LLM. + token = await self._get_access_token() + robot_code = getattr(message, "robot_code", None) or self._client_id + + for dl_code, fname, extra in items: + download_url = dl_code + if not dl_code.startswith("http"): + download_url = await self._resolve_single_download_url( + dl_code, robot_code, token, + ) + if not download_url: + parts.append(f"⚠️ 文件获取失败: {fname}") + continue + + local_path = await _download_file_to_inbox( + download_url, fname, msg_id=msg_id, + ) + if not local_path: + parts.append(f"⚠️ 文件下载失败: {fname}") + continue + + mime = _mime_for_file(fname) + attachments.append((local_path, mime)) + + ext = os.path.splitext(fname)[1].lower() + label = _file_type_label(ext) + + # Audio: STT hint + local path so the agent can re-transcribe. + if extra: + parts.append( + f"🎤 **{label}**: {fname}\n" + f"📝 语音识别结果:\n{extra}\n" + f"📁 本地路径: {local_path}" + ) + continue + + # Text-parseable: inline if small, preview-only if large. + if ext in _PARSEABLE_FILE_EXTS: + try: + content = await asyncio.to_thread( + _parse_file_content, local_path, fname, + ) + except Exception: + content = None + logger.warning( + "[%s] parse_file_content raised for %s", + self.name, fname, exc_info=True, + ) + + if content: + size = len(content) + logger.info( + "[%s] Parsed file %s: %d chars", self.name, fname, size, + ) + if size <= 8000: + parts.append( + f"📄 **{label}**: {fname}({size} 字符)\n" + f"📁 本地路径: {local_path}\n" + f"📋 内容:\n{content}" + ) + else: + preview = content[:2000] + parts.append( + f"📄 **{label}**: {fname}({size} 字符,已截断)\n" + f"📁 本地路径: {local_path}\n" + f"📋 前 2000 字预览:\n```\n{preview}\n```\n" + f"(完整内容请用 read_file 工具读取本地路径)" + ) + continue + # parse failed — fall through to path-only + + # Non-parseable / parse-failed: surface local path with tool hints. + parts.append( + f"📎 **{label}**: {fname}\n" + f"📁 本地路径: {local_path}\n" + f"(可用相应工具读取:" + f"transcribe_audio / vision_analyze / read_file)" + ) + + return parts, attachments + + async def _resolve_single_download_url( + self, code: str, robot_code: str, token: Optional[str], + ) -> Optional[str]: + """Resolve a single downloadCode to a downloadUrl. + + Wraps the Robot SDK call without mutating any message attribute. + """ + if not token or not self._robot_sdk: + # Fallback: try direct HTTP with httpx + if HTTPX_AVAILABLE and token: + try: + async with httpx.AsyncClient(timeout=30.0) as client: + resp = await client.post( + "https://api.dingtalk.com/v1.0/robot/messageFiles/download", + json={"downloadCode": code, "robotCode": robot_code}, + headers={ + "x-acs-dingtalk-access-token": token, + "Content-Type": "application/json", + }, + ) + body = resp.json() + return body.get("downloadUrl") + except Exception as exc: + logger.warning("[%s] httpx fallback download failed: %s", self.name, exc) + return None + try: + request = dingtalk_robot_models.RobotMessageFileDownloadRequest( + download_code=code, + robot_code=robot_code, + ) + headers = dingtalk_robot_models.RobotMessageFileDownloadHeaders( + x_acs_dingtalk_access_token=token, + ) + runtime = tea_util_models.RuntimeOptions() + response = await self._robot_sdk.robot_message_file_download_with_options_async( + request, headers, runtime, + ) + body = response.body if response else None + return getattr(body, "download_url", None) if body else None + except Exception as exc: + logger.warning("[%s] Failed to resolve download code: %s", self.name, exc) + return None + # -- Outbound messaging ------------------------------------------------- async def send( @@ -755,7 +2360,18 @@ class DingTalkAdapter(BasePlatformAdapter): reply_to: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None, ) -> SendResult: - """Send a markdown reply via DingTalk session webhook.""" + """Send a markdown reply via DingTalk session webhook. + + ``reply_to`` is accepted for cross-platform BasePlatformAdapter + signature parity, but DingTalk Robot OpenAPI and session_webhook + payloads have no "reply to message X" field. The reference + connector (dingtalk-openclaw-connector) also plumbs ``replyToId`` + through its signatures but never puts it in an outbound body — + see messaging.ts:567-569. We use it only as the "final reply" + vs "intermediate tool progress" routing signal for AI Card + lifecycle (see ``is_final_reply`` below). Threading is likewise + unsupported (see channel.ts:97 ``threads: false``). + """ metadata = metadata or {} logger.debug( "[%s] send() chat_id=%s card_enabled=%s", @@ -768,16 +2384,32 @@ class DingTalkAdapter(BasePlatformAdapter): session_webhook = metadata.get("session_webhook") if not session_webhook: webhook_info = self._get_valid_webhook(chat_id) - if not webhook_info: - logger.warning( - "[%s] No valid session_webhook for chat_id=%s", + if webhook_info: + session_webhook, _ = webhook_info + else: + # No reply webhook for this chat — this is a proactive send + # (no preceding inbound message in cache). Fall back to + # Robot OpenAPI using AppKey/Secret, same pattern Feishu uses + # for proactive ``im.v1.message.create`` calls. + logger.debug( + "[%s] No session_webhook for chat_id=%s; routing via Robot OpenAPI", self.name, chat_id, ) - return SendResult( - success=False, - error="No valid session_webhook available. Reply must follow an incoming message.", + result = await dingtalk_send_proactive( + client_id=self._client_id, + client_secret=self._client_secret, + robot_code=self._robot_code, + chat_id=chat_id, + content=content, + ) + if result.get("success"): + return SendResult( + success=True, + message_id=result.get("request_id") or uuid.uuid4().hex[:12], + ) + return SendResult( + success=False, error=result.get("error", "unknown"), ) - session_webhook, _ = webhook_info if not self._http_client: return SendResult(success=False, error="HTTP client not initialized") @@ -831,6 +2463,18 @@ class DingTalkAdapter(BasePlatformAdapter): "markdown": {"title": "Hermes", "text": normalized}, } + # Optional @mention: metadata may carry ``at_user_ids`` (list of + # staff_ids) or ``at_all`` (bool) which DingTalk's custom-robot + # webhook accepts under the ``at`` key. Mirrors reference + # messaging/send.ts:54-59. + at_user_ids = metadata.get("at_user_ids") if metadata else None + at_all = bool(metadata.get("at_all")) if metadata else False + if at_user_ids or at_all: + payload["at"] = { + "atUserIds": list(at_user_ids) if at_user_ids else [], + "isAtAll": at_all, + } + try: resp = await self._http_client.post( session_webhook, json=payload, timeout=15.0 @@ -1020,6 +2664,25 @@ class DingTalkAdapter(BasePlatformAdapter): """ if not message_id: return SendResult(success=False, error="message_id required") + + # Throttle non-finalize edits per out_track_id. DingTalk's + # streaming_update endpoint 403s if two edits to the same card + # land within ~500ms. Finalize edits bypass — dropping them + # would leave the card stuck in streaming state forever. + # Optimistic update: stamp lastUpdate BEFORE the network call so + # concurrent edits don't both pass the window check during + # the await (mirrors reply-dispatcher.ts:607 pattern). + now_ms = int(datetime.now(tz=timezone.utc).timestamp() * 1000) + if not finalize: + last_ms = self._card_last_edit_ms.get(message_id, 0) + if now_ms - last_ms < _CARD_EDIT_THROTTLE_MS: + logger.debug( + "[%s] edit_message throttled (%dms since last) for %s", + self.name, now_ms - last_ms, message_id, + ) + return SendResult(success=True, message_id=message_id) + self._card_last_edit_ms[message_id] = now_ms + token = await self._get_access_token() if not token: return SendResult(success=False, error="No access token") @@ -1035,6 +2698,8 @@ class DingTalkAdapter(BasePlatformAdapter): self._streaming_cards.get(chat_id, {}).pop(message_id, None) if not self._streaming_cards.get(chat_id): self._streaming_cards.pop(chat_id, None) + # Card is closed; throttle ts is no longer useful. + self._card_last_edit_ms.pop(message_id, None) logger.debug( "[%s] AI Card finalized (edit): %s", self.name, message_id, @@ -1057,7 +2722,15 @@ class DingTalkAdapter(BasePlatformAdapter): content: str, finalize: bool = False, ) -> None: - """Stream content to an existing AI Card.""" + """Stream content to an existing AI Card. + + Per-card 800ms throttle happens at the ``edit_message`` layer; this + function additionally goes through the **global** token bucket so + that many parallel chats can't collectively overrun the tenant-wide + DingTalk card-API QPS cap (~40/s). On an upstream limit response, + ``trigger_backoff`` pauses every acquirer for 2s before spending + the next token — same pattern as messaging/card.ts:93. + """ stream_request = dingtalk_card_models.StreamingUpdateRequest( out_track_id=out_track_id, guid=str(uuid.uuid4()), @@ -1073,9 +2746,24 @@ class DingTalkAdapter(BasePlatformAdapter): ) runtime = tea_util_models.RuntimeOptions() - await self._card_sdk.streaming_update_with_options_async( - stream_request, stream_headers, runtime - ) + await _CARD_BUCKET.acquire() + try: + await self._card_sdk.streaming_update_with_options_async( + stream_request, stream_headers, runtime + ) + except Exception as e: + # Detect DingTalk's 403 "QpsLimit" style responses. The Tea SDK + # raises with the error code embedded in the message — string + # match is intentional (reference messaging/card.ts:107-125 + # does the same). + err_msg = str(e) + if "QpsLimit" in err_msg or "403" in err_msg or "qps" in err_msg.lower(): + logger.warning( + "[%s] Card QPS limit hit, backing off %dms: %s", + self.name, _CARD_API_QPS_BACKOFF_MS, err_msg[:160], + ) + _CARD_BUCKET.trigger_backoff() + raise async def _get_access_token(self) -> Optional[str]: """Get access token using SDK's cached token.""" @@ -1353,9 +3041,14 @@ class _IncomingHandler( return AckMessage.STATUS_OK, "OK" async def _safe_on_message(self, chatbot_msg: "ChatbotMessage") -> None: - """Wrapper that catches exceptions from _on_message.""" + """Wrapper that catches exceptions from _on_message. + + Dispatches through ``_enqueue_inbound`` so same-chat messages are + serialized (with a busy-ACK on the tail), mirroring the reference + connector's per-session queue at core/message-handler.ts:1597-1720. + """ try: - await self._adapter._on_message(chatbot_msg) + await self._adapter._enqueue_inbound(chatbot_msg) except Exception: logger.exception( "[%s] Error processing incoming message", self._adapter.name diff --git a/gateway/run.py b/gateway/run.py index a024649cbd..736fea3df2 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -4411,12 +4411,23 @@ class GatewayRunner: if not history and source.platform and source.platform != Platform.LOCAL and source.platform != Platform.WEBHOOK: platform_name = source.platform.value env_key = f"{platform_name.upper()}_HOME_CHANNEL" + # Platform display-name overrides for mixed-case brands where + # naive .title() reads awkwardly ("Dingtalk" → "DingTalk"). + _display_overrides = { + "dingtalk": "DingTalk", + "feishu": "Feishu", + "wecom": "WeCom", + "qqbot": "QQ", + "bluebubbles": "BlueBubbles", + "homeassistant": "Home Assistant", + } + display_name = _display_overrides.get(platform_name, platform_name.title()) if not os.getenv(env_key): adapter = self.adapters.get(source.platform) if adapter: await adapter.send( source.chat_id, - f"📬 No home channel is set for {platform_name.title()}. " + f"📬 No home channel is set for {display_name}. " f"A home channel is where Hermes delivers cron job results " f"and cross-platform messages.\n\n" f"Type /sethome to make this chat your home channel, " @@ -5939,7 +5950,21 @@ class GatewayRunner: platform_name = source.platform.value if source.platform else "unknown" chat_id = source.chat_id chat_name = source.chat_name or chat_id - + + # DingTalk DM quirk: source.chat_id holds the DM conversation_id + # (cid... form), which the Robot OpenAPI /groupMessages/send + # endpoint rejects as `resource.not.found`. Proactive DM sends + # require a real staffId routed through /oToMessages/batchSend. + # Persist as ``user:`` so _dingtalk_classify_chat_id picks + # the oto bucket when cron/cross-platform deliveries fire. + if ( + source.platform + and source.platform.value == "dingtalk" + and source.chat_type == "dm" + and source.user_id_alt + ): + chat_id = f"user:{source.user_id_alt}" + env_key = f"{platform_name.upper()}_HOME_CHANNEL" # Save to config.yaml diff --git a/hermes_cli/dingtalk_auth.py b/hermes_cli/dingtalk_auth.py index e1034c53da..6f9c692b07 100644 --- a/hermes_cli/dingtalk_auth.py +++ b/hermes_cli/dingtalk_auth.py @@ -30,7 +30,7 @@ REGISTRATION_BASE_URL = os.environ.get( "DINGTALK_REGISTRATION_BASE_URL", "https://oapi.dingtalk.com" ).rstrip("/") -REGISTRATION_SOURCE = os.environ.get("DINGTALK_REGISTRATION_SOURCE", "openClaw") +REGISTRATION_SOURCE = os.environ.get("DINGTALK_REGISTRATION_SOURCE", "DING_DWS_CLAW") # ── API helpers ──────────────────────────────────────────────────────────── diff --git a/pyproject.toml b/pyproject.toml index b4dc6ed346..bded799041 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -76,7 +76,15 @@ termux = [ "hermes-agent[honcho]", "hermes-agent[acp]", ] -dingtalk = ["dingtalk-stream>=0.20,<1", "alibabacloud-dingtalk>=2.0.0", "qrcode>=7.0,<8"] +dingtalk = [ + "dingtalk-stream>=0.20,<1", + "alibabacloud-dingtalk>=2.0.0", + "qrcode>=7.0,<8", + # File content parsing for rich-media messages (docx/pdf/xlsx) + "python-docx>=1.0,<2", + "pdfplumber>=0.10,<1", + "openpyxl>=3.1,<4", +] feishu = ["lark-oapi>=1.5.3,<2", "qrcode>=7.0,<8"] web = ["fastapi>=0.104.0,<1", "uvicorn[standard]>=0.24.0,<1"] rl = [ diff --git a/scripts/gateway_guard.sh b/scripts/gateway_guard.sh new file mode 100755 index 0000000000..6cb3a72b38 --- /dev/null +++ b/scripts/gateway_guard.sh @@ -0,0 +1,107 @@ +#!/usr/bin/env bash +set -euo pipefail + +ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)" +LOG_DIR="${ROOT_DIR}/logs" +PID_FILE="${LOG_DIR}/gateway-guard.pid" +RUN_LOG="${LOG_DIR}/gateway-guard.log" + +mkdir -p "${LOG_DIR}" + +usage() { + cat <<'EOF' +Usage: scripts/gateway_guard.sh + +start Start gateway in background with auto-restart loop +stop Stop background guard process +restart Restart guard process +status Show whether guard is running +logs Follow logs +EOF +} + +is_running() { + if [[ -f "${PID_FILE}" ]]; then + local pid + pid="$(cat "${PID_FILE}")" + if [[ -n "${pid}" ]] && kill -0 "${pid}" 2>/dev/null; then + return 0 + fi + fi + return 1 +} + +start_guard() { + if is_running; then + echo "gateway guard is already running (pid: $(cat "${PID_FILE}"))" + exit 0 + fi + + ( + cd "${ROOT_DIR}" + while true; do + echo "===== $(date '+%F %T') gateway start =====" >> "${RUN_LOG}" + if command -v caffeinate >/dev/null 2>&1; then + caffeinate -dimsu venv/bin/python -m hermes_cli.main gateway run --replace -v >> "${RUN_LOG}" 2>&1 + else + venv/bin/python -m hermes_cli.main gateway run --replace -v >> "${RUN_LOG}" 2>&1 + fi + code=$? + echo "===== $(date '+%F %T') gateway exited code=${code}, restart in 5s =====" >> "${RUN_LOG}" + sleep 5 + done + ) & + + echo $! > "${PID_FILE}" + echo "gateway guard started (pid: $(cat "${PID_FILE}"))" + echo "log file: ${RUN_LOG}" +} + +stop_guard() { + if ! is_running; then + echo "gateway guard is not running" + rm -f "${PID_FILE}" + exit 0 + fi + + local pid + pid="$(cat "${PID_FILE}")" + kill "${pid}" 2>/dev/null || true + sleep 1 + if kill -0 "${pid}" 2>/dev/null; then + kill -9 "${pid}" 2>/dev/null || true + fi + rm -f "${PID_FILE}" + echo "gateway guard stopped" +} + +status_guard() { + if is_running; then + echo "gateway guard is running (pid: $(cat "${PID_FILE}"))" + else + echo "gateway guard is not running" + fi +} + +logs_guard() { + touch "${RUN_LOG}" + tail -f "${RUN_LOG}" +} + +main() { + if [[ $# -lt 1 ]]; then + usage + exit 1 + fi + + case "${1}" in + start) start_guard ;; + stop) stop_guard ;; + restart) stop_guard; start_guard ;; + status) status_guard ;; + logs) logs_guard ;; + *) usage; exit 1 ;; + esac +} + +main "$@" diff --git a/tests/gateway/test_dingtalk.py b/tests/gateway/test_dingtalk.py index 6795f81ca9..ad69e823ac 100644 --- a/tests/gateway/test_dingtalk.py +++ b/tests/gateway/test_dingtalk.py @@ -186,7 +186,9 @@ class TestSend: result = await adapter.send("chat-123", "Hello!") assert result.success is False - assert "session_webhook" in result.error + # No session webhook AND no client_id/secret → proactive-send path trips the + # credentials guard first (adapter drops back to OAPI when webhook missing). + assert "client_id" in result.error and "client_secret" in result.error @pytest.mark.asyncio async def test_send_uses_cached_webhook(self): @@ -421,6 +423,180 @@ class TestExtractText: msg.rich_text = None assert DingTalkAdapter._extract_text(msg) == "" + def test_quoted_text_from_dict_payload(self): + from gateway.platforms.dingtalk import DingTalkAdapter + msg = MagicMock() + msg.text = { + "content": "follow-up question", + "isReplyMsg": True, + "repliedMsg": { + "msgType": "text", + "content": {"text": "original statement"}, + }, + } + msg.rich_text_content = None + msg.rich_text = None + assert DingTalkAdapter._extract_text(msg) == ( + "follow-up question\n[引用] original statement" + ) + + def test_quoted_text_from_textcontent_extensions(self): + from gateway.platforms.dingtalk import DingTalkAdapter + text_obj = SimpleNamespace( + content="follow-up", + extensions={ + "isReplyMsg": True, + "repliedMsg": { + "msgType": "text", + "content": {"text": "earlier msg"}, + }, + }, + ) + msg = MagicMock() + msg.text = text_obj + msg.rich_text_content = None + msg.rich_text = None + assert DingTalkAdapter._extract_text(msg) == "follow-up\n[引用] earlier msg" + + def test_quoted_picture_renders_placeholder(self): + from gateway.platforms.dingtalk import DingTalkAdapter + msg = MagicMock() + msg.text = { + "content": "what is this?", + "isReplyMsg": True, + "repliedMsg": {"msgType": "picture", "content": {}}, + } + msg.rich_text_content = None + msg.rich_text = None + assert DingTalkAdapter._extract_text(msg) == "what is this?\n[引用] [图片]" + + def test_quoted_file_shows_name(self): + from gateway.platforms.dingtalk import DingTalkAdapter + msg = MagicMock() + msg.text = { + "content": "read this", + "isReplyMsg": True, + "repliedMsg": { + "msgType": "file", + "content": {"fileName": "report.pdf"}, + }, + } + msg.rich_text_content = None + msg.rich_text = None + assert DingTalkAdapter._extract_text(msg) == "read this\n[引用] [文件: report.pdf]" + + def test_quoted_content_as_json_string(self): + from gateway.platforms.dingtalk import DingTalkAdapter + msg = MagicMock() + msg.text = { + "content": "hmm", + "isReplyMsg": True, + "repliedMsg": { + "msgType": "text", + "content": json.dumps({"text": "stringified body"}), + }, + } + msg.rich_text_content = None + msg.rich_text = None + assert DingTalkAdapter._extract_text(msg) == "hmm\n[引用] stringified body" + + def test_nested_quote_recurses(self): + from gateway.platforms.dingtalk import DingTalkAdapter + msg = MagicMock() + msg.text = { + "content": "L0", + "isReplyMsg": True, + "repliedMsg": { + "msgType": "text", + "content": { + "text": "L1", + "isReplyMsg": True, + "repliedMsg": { + "msgType": "text", + "content": {"text": "L2"}, + }, + }, + }, + } + msg.rich_text_content = None + msg.rich_text = None + assert DingTalkAdapter._extract_text(msg) == "L0\n[引用] L1\n[引用] L2" + + def test_no_quote_leaves_content_unchanged(self): + from gateway.platforms.dingtalk import DingTalkAdapter + msg = MagicMock() + msg.text = {"content": "plain", "isReplyMsg": False} + msg.rich_text_content = None + msg.rich_text = None + assert DingTalkAdapter._extract_text(msg) == "plain" + + def test_interactive_card_forward_compat_text(self): + """When DingTalk IM team populates content.text server-side, hermes + surfaces it automatically without code change.""" + from gateway.platforms.dingtalk import DingTalkAdapter + msg = MagicMock() + msg.text = { + "content": "总结一下", + "isReplyMsg": True, + "repliedMsg": { + "msgType": "interactiveCard", + "content": {"text": "今日销售额: 1.2万 · 订单数: 38"}, + }, + } + msg.rich_text_content = None + msg.rich_text = None + assert ( + DingTalkAdapter._extract_text(msg) + == "总结一下\n[引用] 今日销售额: 1.2万 · 订单数: 38" + ) + + def test_interactive_card_forward_compat_markdown(self): + from gateway.platforms.dingtalk import DingTalkAdapter + msg = MagicMock() + msg.text = { + "content": "继续", + "isReplyMsg": True, + "repliedMsg": { + "msgType": "interactiveCard", + "content": {"markdown": "## 卡片标题\n卡片正文"}, + }, + } + msg.rich_text_content = None + msg.rich_text = None + result = DingTalkAdapter._extract_text(msg) + assert result == "继续\n[引用] ## 卡片标题\n卡片正文" + + def test_interactive_card_forward_compat_title_fallback(self): + from gateway.platforms.dingtalk import DingTalkAdapter + msg = MagicMock() + msg.text = { + "content": "?", + "isReplyMsg": True, + "repliedMsg": { + "msgType": "interactiveCard", + "content": {"title": "审批通知"}, + }, + } + msg.rich_text_content = None + msg.rich_text = None + assert DingTalkAdapter._extract_text(msg) == "?\n[引用] 审批通知" + + def test_interactive_card_no_content_falls_back_to_placeholder(self): + """Current DingTalk behavior: content is not populated → placeholder.""" + from gateway.platforms.dingtalk import DingTalkAdapter + msg = MagicMock() + msg.text = { + "content": "这个是啥", + "isReplyMsg": True, + "repliedMsg": {"msgType": "interactiveCard"}, + } + msg.rich_text_content = None + msg.rich_text = None + assert ( + DingTalkAdapter._extract_text(msg) + == "这个是啥\n[引用] [interactiveCard消息]" + ) + # --------------------------------------------------------------------------- # Group gating — require_mention + allowed_users (parity with other platforms) diff --git a/tests/hermes_cli/test_dingtalk_auth.py b/tests/hermes_cli/test_dingtalk_auth.py index 592cd3175e..4cc9535cdd 100644 --- a/tests/hermes_cli/test_dingtalk_auth.py +++ b/tests/hermes_cli/test_dingtalk_auth.py @@ -214,4 +214,4 @@ class TestConfigOverrides: import importlib import hermes_cli.dingtalk_auth as mod importlib.reload(mod) - assert mod.REGISTRATION_SOURCE == "openClaw" + assert mod.REGISTRATION_SOURCE == "DING_DWS_CLAW" diff --git a/tools/send_message_tool.py b/tools/send_message_tool.py index 19da4f55af..edf9c68b13 100644 --- a/tools/send_message_tool.py +++ b/tools/send_message_tool.py @@ -108,7 +108,19 @@ SEND_MESSAGE_SCHEMA = { "(not just a bare platform name), call send_message(action='list') FIRST to see " "available targets, then send to the correct one.\n" "If the user just says a platform name like 'send to telegram', send directly " - "to the home channel without listing first." + "to the home channel without listing first.\n\n" + "MEDIA ATTACHMENTS (images / videos / audio / voice):\n" + "This tool CAN send media files — do not tell the user it is text-only.\n" + "Embed local files in the `message` using either form:\n" + " • `MEDIA:/absolute/path/to/file.png` tag anywhere in the message, OR\n" + " • a bare absolute path like `/Users/me/pic.jpg` (auto-detected).\n" + "Supported extensions: .png .jpg .jpeg .gif .webp (image);\n" + ".mp4 .mov .avi .mkv .webm (video); .mp3 .wav .m4a .ogg .opus (audio).\n" + "Add `[[audio_as_voice]]` to deliver audio as a voice message where " + "the platform supports it (DingTalk/Telegram/Feishu/WhatsApp/Signal).\n" + "Multiple attachments: include multiple MEDIA tags or paths.\n" + "Works on: DingTalk, Feishu, Telegram, Discord, Slack, WhatsApp, Signal, " + "WeCom, Weixin, BlueBubbles, Matrix, QQBot, Email." ), "parameters": { "type": "object", @@ -124,7 +136,14 @@ SEND_MESSAGE_SCHEMA = { }, "message": { "type": "string", - "description": "The message text to send" + "description": ( + "The message body. Plain text, markdown, or text with " + "embedded media. To attach a local file, include either " + "`MEDIA:/absolute/path/to/file.ext` OR a bare absolute " + "path (e.g. `/Users/me/photo.jpg`) anywhere in the string. " + "The file is uploaded and sent as a native attachment " + "(image/video/audio) by the platform adapter." + ) } }, "required": [] @@ -246,7 +265,16 @@ def _handle_send(args): from gateway.platforms.base import BasePlatformAdapter + # Two-stage media extraction (mirrors what inbound-reply rendering does): + # 1. `MEDIA:` tags + `[[audio_as_voice]]` directive + # 2. bare absolute paths (e.g. "/Users/me/pic.jpg") auto-detected + # outside of fenced code blocks. + # Stage 2 matters for the common case where the agent just pastes the + # file path it already knows (what the user asked about in this flow). media_files, cleaned_message = BasePlatformAdapter.extract_media(message) + bare_paths, cleaned_message = BasePlatformAdapter.extract_local_files(cleaned_message) + for _p in bare_paths: + media_files.append((_p, False)) mirror_text = cleaned_message.strip() or _describe_media_for_mirror(media_files) used_home_channel = False @@ -512,6 +540,35 @@ async def _send_to_platform(platform, pconfig, chat_id, message, thread_id=None, last_result = result return last_result + # --- Feishu: use native adapter for text + media --- + if platform == Platform.FEISHU: + last_result = None + for i, chunk in enumerate(chunks): + is_last = (i == len(chunks) - 1) + result = await _send_feishu( + pconfig, chat_id, chunk, + media_files=media_files if is_last else [], + thread_id=thread_id, + ) + if isinstance(result, dict) and result.get("error"): + return result + last_result = result + return last_result + + # --- DingTalk: use native adapter for text + media --- + if platform == Platform.DINGTALK: + last_result = None + for i, chunk in enumerate(chunks): + is_last = (i == len(chunks) - 1) + result = await _send_dingtalk( + pconfig.extra, chat_id, chunk, + media_files=media_files if is_last else [], + ) + if isinstance(result, dict) and result.get("error"): + return result + last_result = result + return last_result + # --- Signal: native attachment support via JSON-RPC attachments param --- if platform == Platform.SIGNAL and media_files: last_result = None @@ -528,11 +585,11 @@ async def _send_to_platform(platform, pconfig, chat_id, message, thread_id=None, last_result = result return last_result - # --- Non-media platforms --- + # --- Remaining platforms (no native media support) --- if media_files and not message.strip(): return { "error": ( - f"send_message MEDIA delivery is currently only supported for telegram, discord, matrix, weixin, and signal; " + f"send_message MEDIA delivery is currently only supported for telegram, discord, matrix, weixin, signal, feishu, and dingtalk; " f"target {platform.value} had only media attachments" ) } @@ -540,7 +597,7 @@ async def _send_to_platform(platform, pconfig, chat_id, message, thread_id=None, if media_files: warning = ( f"MEDIA attachments were omitted for {platform.value}; " - "native send_message media delivery is currently only supported for telegram, discord, matrix, weixin, and signal" + "native send_message media delivery is currently only supported for telegram, discord, matrix, weixin, signal, feishu, and dingtalk" ) last_result = None @@ -561,10 +618,6 @@ async def _send_to_platform(platform, pconfig, chat_id, message, thread_id=None, result = await _send_matrix(pconfig.token, pconfig.extra, chat_id, chunk) elif platform == Platform.HOMEASSISTANT: result = await _send_homeassistant(pconfig.token, pconfig.extra, chat_id, chunk) - elif platform == Platform.DINGTALK: - result = await _send_dingtalk(pconfig.extra, chat_id, chunk) - elif platform == Platform.FEISHU: - result = await _send_feishu(pconfig, chat_id, chunk, thread_id=thread_id) elif platform == Platform.WECOM: result = await _send_wecom(pconfig.extra, chat_id, chunk) elif platform == Platform.BLUEBUBBLES: @@ -1282,23 +1335,82 @@ async def _send_homeassistant(token, extra, chat_id, message): return _error(f"Home Assistant send failed: {e}") -async def _send_dingtalk(extra, chat_id, message): - """Send via DingTalk robot webhook. +async def _send_dingtalk(extra, chat_id, message, media_files=None): + """Send via DingTalk Robot OpenAPI (proactive), matching the Feishu path. - Note: The gateway's DingTalk adapter uses per-session webhook URLs from - incoming messages (dingtalk-stream SDK). For cross-platform send_message - delivery we use a static robot webhook URL instead, which must be - configured via ``DINGTALK_WEBHOOK_URL`` env var or ``webhook_url`` in the - platform's extra config. + Routes on ``chat_id`` shape: + - ``cidXXXX==`` (openConversationId) → ``/v1.0/robot/groupMessages/send`` + - plain staffId → ``/v1.0/robot/oToMessages/batchSend`` + - LWCP-encoded sender_id → fails fast with permission hint + (requires qyapi_get_department_* to resolve; see + project_dingtalk_feishu_send_gap memory for background) + + Falls back to the legacy ``DINGTALK_WEBHOOK_URL`` static robot webhook + ONLY when AppKey/Secret are not configured — preserves the single-group + deployment mode for users without OpenAPI access. + + ``media_files`` is a list of ``(local_path, is_voice_hint)`` tuples. + Each file is uploaded to DingTalk's /media/upload (OAPI legacy token) + and sent as a dedicated message with the matching msgKey + (sampleImageMsg / sampleAudio / sampleVideo / sampleFile). Requires + the app to have media/messaging permissions; otherwise media are + skipped and surfaced in ``warnings``. """ try: import httpx except ImportError: return {"error": "httpx not installed"} + + client_id = extra.get("client_id") or os.getenv("DINGTALK_CLIENT_ID", "") + client_secret = extra.get("client_secret") or os.getenv("DINGTALK_CLIENT_SECRET", "") + robot_code = extra.get("robot_code") or client_id + + if client_id and client_secret: + try: + from gateway.platforms.dingtalk import dingtalk_send_proactive + except ImportError: + return {"error": "DingTalk adapter not available"} + try: + result = await dingtalk_send_proactive( + client_id=client_id, + client_secret=client_secret, + robot_code=robot_code, + chat_id=chat_id, + content=message, + media_files=media_files, + ) + except Exception as e: + return _error(f"DingTalk send failed: {e}") + if result.get("error") and not result.get("success"): + return _error(result["error"]) + payload = { + "success": True, + "platform": "dingtalk", + "chat_id": chat_id, + "message_id": result.get("request_id", ""), + "route": result.get("route", ""), + } + media_results = result.get("media_results") or [] + failed_media = [r for r in media_results if r.get("error")] + if failed_media: + payload["warnings"] = [ + f"Media delivery failure: {m.get('error')}" for m in failed_media + ] + if media_results: + payload["media_results"] = media_results + return payload + + # Legacy single-group custom robot webhook fallback. + webhook_url = extra.get("webhook_url") or os.getenv("DINGTALK_WEBHOOK_URL", "") + if not webhook_url: + return { + "error": ( + "DingTalk not configured. Set DINGTALK_CLIENT_ID + " + "DINGTALK_CLIENT_SECRET (Robot OpenAPI, routed by chat_id) or " + "DINGTALK_WEBHOOK_URL (legacy single-group custom robot)." + ) + } try: - webhook_url = extra.get("webhook_url") or os.getenv("DINGTALK_WEBHOOK_URL", "") - if not webhook_url: - return {"error": "DingTalk not configured. Set DINGTALK_WEBHOOK_URL env var or webhook_url in dingtalk platform extra config."} async with httpx.AsyncClient(timeout=30.0) as client: resp = await client.post( webhook_url, @@ -1308,7 +1420,15 @@ async def _send_dingtalk(extra, chat_id, message): data = resp.json() if data.get("errcode", 0) != 0: return _error(f"DingTalk API error: {data.get('errmsg', 'unknown')}") - return {"success": True, "platform": "dingtalk", "chat_id": chat_id} + payload = { + "success": True, + "platform": "dingtalk", + "chat_id": chat_id, + "route": "webhook-legacy", + } + if warnings: + payload["warnings"] = warnings + return payload except Exception as e: return _error(f"DingTalk send failed: {e}")