feat(dingtalk): proactive messaging via Robot OpenAPI + send_message_tool media

- Add DingTalk access-token management (_dingtalk_fetch_access_token,
  _dingtalk_fetch_oapi_token) with process-wide cache, 5 min safety
  margin, and asyncio.Lock for concurrent safety.
- Add _dingtalk_upload_media() — upload local files to /media/upload
  (legacy OAPI token) with mime auto-detection and 20 MB guard.
- Add _dingtalk_classify_chat_id() — route on chat_id shape:
  'cidXXX==' → group /groupMessages/send, plain staffId or
  'user:<staffId>' → 1:1 /oToMessages/batchSend.
- Add dingtalk_send_proactive() orchestrator — text + media delivery.
- In send(), fall back to dingtalk_send_proactive() when no session
  webhook is cached (proactive/cron/cross-platform delivery).
- Fix /sethome DM chat_id: persist 'user:<staffId>' instead of DM
  conversation_id so proactive DM delivery works (gateway/run.py).
- Enhance send_message_tool.py: MEDIA:<path> tag extraction, native
  DingTalk/Feishu routing with media_files support.
- Update test assertions for proactive-send error path.
This commit is contained in:
meng93 2026-04-23 13:14:03 +08:00
parent 563ed0e61f
commit 8221966ad6
10 changed files with 2190 additions and 42 deletions

3
.gitignore vendored
View file

@ -69,3 +69,6 @@ mini-swe-agent/
.nix-stamps/
result
website/static/api/skills-index.json
.claude

View file

@ -491,6 +491,22 @@ def load_gateway_config() -> GatewayConfig:
with open(config_yaml_path, encoding="utf-8") as f:
yaml_cfg = yaml.safe_load(f) or {}
# Hydrate top-level UPPERCASE_HOME_CHANNEL keys from config.yaml
# into os.environ so `_apply_env_overrides` picks them up on boot.
# Without this, /sethome-written yaml entries get orphaned across
# gateway restarts (yaml survives, process env does not), which
# makes onboarding prompts and home-channel routing flap between
# sessions. Env var (if already set) takes precedence.
for _k, _v in yaml_cfg.items():
if (
isinstance(_k, str)
and _k.endswith("_HOME_CHANNEL")
and _k.isupper()
and _v
and not os.getenv(_k)
):
os.environ[_k] = str(_v)
# Map config.yaml keys → GatewayConfig.from_dict() schema.
# Each key overwrites whatever gateway.json may have set.
sr = yaml_cfg.get("session_reset")

File diff suppressed because it is too large Load diff

View file

@ -4411,12 +4411,23 @@ class GatewayRunner:
if not history and source.platform and source.platform != Platform.LOCAL and source.platform != Platform.WEBHOOK:
platform_name = source.platform.value
env_key = f"{platform_name.upper()}_HOME_CHANNEL"
# Platform display-name overrides for mixed-case brands where
# naive .title() reads awkwardly ("Dingtalk" → "DingTalk").
_display_overrides = {
"dingtalk": "DingTalk",
"feishu": "Feishu",
"wecom": "WeCom",
"qqbot": "QQ",
"bluebubbles": "BlueBubbles",
"homeassistant": "Home Assistant",
}
display_name = _display_overrides.get(platform_name, platform_name.title())
if not os.getenv(env_key):
adapter = self.adapters.get(source.platform)
if adapter:
await adapter.send(
source.chat_id,
f"📬 No home channel is set for {platform_name.title()}. "
f"📬 No home channel is set for {display_name}. "
f"A home channel is where Hermes delivers cron job results "
f"and cross-platform messages.\n\n"
f"Type /sethome to make this chat your home channel, "
@ -5939,7 +5950,21 @@ class GatewayRunner:
platform_name = source.platform.value if source.platform else "unknown"
chat_id = source.chat_id
chat_name = source.chat_name or chat_id
# DingTalk DM quirk: source.chat_id holds the DM conversation_id
# (cid... form), which the Robot OpenAPI /groupMessages/send
# endpoint rejects as `resource.not.found`. Proactive DM sends
# require a real staffId routed through /oToMessages/batchSend.
# Persist as ``user:<staffId>`` so _dingtalk_classify_chat_id picks
# the oto bucket when cron/cross-platform deliveries fire.
if (
source.platform
and source.platform.value == "dingtalk"
and source.chat_type == "dm"
and source.user_id_alt
):
chat_id = f"user:{source.user_id_alt}"
env_key = f"{platform_name.upper()}_HOME_CHANNEL"
# Save to config.yaml

View file

@ -30,7 +30,7 @@ REGISTRATION_BASE_URL = os.environ.get(
"DINGTALK_REGISTRATION_BASE_URL", "https://oapi.dingtalk.com"
).rstrip("/")
REGISTRATION_SOURCE = os.environ.get("DINGTALK_REGISTRATION_SOURCE", "openClaw")
REGISTRATION_SOURCE = os.environ.get("DINGTALK_REGISTRATION_SOURCE", "DING_DWS_CLAW")
# ── API helpers ────────────────────────────────────────────────────────────

View file

@ -76,7 +76,15 @@ termux = [
"hermes-agent[honcho]",
"hermes-agent[acp]",
]
dingtalk = ["dingtalk-stream>=0.20,<1", "alibabacloud-dingtalk>=2.0.0", "qrcode>=7.0,<8"]
dingtalk = [
"dingtalk-stream>=0.20,<1",
"alibabacloud-dingtalk>=2.0.0",
"qrcode>=7.0,<8",
# File content parsing for rich-media messages (docx/pdf/xlsx)
"python-docx>=1.0,<2",
"pdfplumber>=0.10,<1",
"openpyxl>=3.1,<4",
]
feishu = ["lark-oapi>=1.5.3,<2", "qrcode>=7.0,<8"]
web = ["fastapi>=0.104.0,<1", "uvicorn[standard]>=0.24.0,<1"]
rl = [

107
scripts/gateway_guard.sh Executable file
View file

@ -0,0 +1,107 @@
#!/usr/bin/env bash
set -euo pipefail
ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)"
LOG_DIR="${ROOT_DIR}/logs"
PID_FILE="${LOG_DIR}/gateway-guard.pid"
RUN_LOG="${LOG_DIR}/gateway-guard.log"
mkdir -p "${LOG_DIR}"
usage() {
cat <<'EOF'
Usage: scripts/gateway_guard.sh <start|stop|restart|status|logs>
start Start gateway in background with auto-restart loop
stop Stop background guard process
restart Restart guard process
status Show whether guard is running
logs Follow logs
EOF
}
is_running() {
if [[ -f "${PID_FILE}" ]]; then
local pid
pid="$(cat "${PID_FILE}")"
if [[ -n "${pid}" ]] && kill -0 "${pid}" 2>/dev/null; then
return 0
fi
fi
return 1
}
start_guard() {
if is_running; then
echo "gateway guard is already running (pid: $(cat "${PID_FILE}"))"
exit 0
fi
(
cd "${ROOT_DIR}"
while true; do
echo "===== $(date '+%F %T') gateway start =====" >> "${RUN_LOG}"
if command -v caffeinate >/dev/null 2>&1; then
caffeinate -dimsu venv/bin/python -m hermes_cli.main gateway run --replace -v >> "${RUN_LOG}" 2>&1
else
venv/bin/python -m hermes_cli.main gateway run --replace -v >> "${RUN_LOG}" 2>&1
fi
code=$?
echo "===== $(date '+%F %T') gateway exited code=${code}, restart in 5s =====" >> "${RUN_LOG}"
sleep 5
done
) &
echo $! > "${PID_FILE}"
echo "gateway guard started (pid: $(cat "${PID_FILE}"))"
echo "log file: ${RUN_LOG}"
}
stop_guard() {
if ! is_running; then
echo "gateway guard is not running"
rm -f "${PID_FILE}"
exit 0
fi
local pid
pid="$(cat "${PID_FILE}")"
kill "${pid}" 2>/dev/null || true
sleep 1
if kill -0 "${pid}" 2>/dev/null; then
kill -9 "${pid}" 2>/dev/null || true
fi
rm -f "${PID_FILE}"
echo "gateway guard stopped"
}
status_guard() {
if is_running; then
echo "gateway guard is running (pid: $(cat "${PID_FILE}"))"
else
echo "gateway guard is not running"
fi
}
logs_guard() {
touch "${RUN_LOG}"
tail -f "${RUN_LOG}"
}
main() {
if [[ $# -lt 1 ]]; then
usage
exit 1
fi
case "${1}" in
start) start_guard ;;
stop) stop_guard ;;
restart) stop_guard; start_guard ;;
status) status_guard ;;
logs) logs_guard ;;
*) usage; exit 1 ;;
esac
}
main "$@"

View file

@ -186,7 +186,9 @@ class TestSend:
result = await adapter.send("chat-123", "Hello!")
assert result.success is False
assert "session_webhook" in result.error
# No session webhook AND no client_id/secret → proactive-send path trips the
# credentials guard first (adapter drops back to OAPI when webhook missing).
assert "client_id" in result.error and "client_secret" in result.error
@pytest.mark.asyncio
async def test_send_uses_cached_webhook(self):
@ -421,6 +423,180 @@ class TestExtractText:
msg.rich_text = None
assert DingTalkAdapter._extract_text(msg) == ""
def test_quoted_text_from_dict_payload(self):
from gateway.platforms.dingtalk import DingTalkAdapter
msg = MagicMock()
msg.text = {
"content": "follow-up question",
"isReplyMsg": True,
"repliedMsg": {
"msgType": "text",
"content": {"text": "original statement"},
},
}
msg.rich_text_content = None
msg.rich_text = None
assert DingTalkAdapter._extract_text(msg) == (
"follow-up question\n[引用] original statement"
)
def test_quoted_text_from_textcontent_extensions(self):
from gateway.platforms.dingtalk import DingTalkAdapter
text_obj = SimpleNamespace(
content="follow-up",
extensions={
"isReplyMsg": True,
"repliedMsg": {
"msgType": "text",
"content": {"text": "earlier msg"},
},
},
)
msg = MagicMock()
msg.text = text_obj
msg.rich_text_content = None
msg.rich_text = None
assert DingTalkAdapter._extract_text(msg) == "follow-up\n[引用] earlier msg"
def test_quoted_picture_renders_placeholder(self):
from gateway.platforms.dingtalk import DingTalkAdapter
msg = MagicMock()
msg.text = {
"content": "what is this?",
"isReplyMsg": True,
"repliedMsg": {"msgType": "picture", "content": {}},
}
msg.rich_text_content = None
msg.rich_text = None
assert DingTalkAdapter._extract_text(msg) == "what is this?\n[引用] [图片]"
def test_quoted_file_shows_name(self):
from gateway.platforms.dingtalk import DingTalkAdapter
msg = MagicMock()
msg.text = {
"content": "read this",
"isReplyMsg": True,
"repliedMsg": {
"msgType": "file",
"content": {"fileName": "report.pdf"},
},
}
msg.rich_text_content = None
msg.rich_text = None
assert DingTalkAdapter._extract_text(msg) == "read this\n[引用] [文件: report.pdf]"
def test_quoted_content_as_json_string(self):
from gateway.platforms.dingtalk import DingTalkAdapter
msg = MagicMock()
msg.text = {
"content": "hmm",
"isReplyMsg": True,
"repliedMsg": {
"msgType": "text",
"content": json.dumps({"text": "stringified body"}),
},
}
msg.rich_text_content = None
msg.rich_text = None
assert DingTalkAdapter._extract_text(msg) == "hmm\n[引用] stringified body"
def test_nested_quote_recurses(self):
from gateway.platforms.dingtalk import DingTalkAdapter
msg = MagicMock()
msg.text = {
"content": "L0",
"isReplyMsg": True,
"repliedMsg": {
"msgType": "text",
"content": {
"text": "L1",
"isReplyMsg": True,
"repliedMsg": {
"msgType": "text",
"content": {"text": "L2"},
},
},
},
}
msg.rich_text_content = None
msg.rich_text = None
assert DingTalkAdapter._extract_text(msg) == "L0\n[引用] L1\n[引用] L2"
def test_no_quote_leaves_content_unchanged(self):
from gateway.platforms.dingtalk import DingTalkAdapter
msg = MagicMock()
msg.text = {"content": "plain", "isReplyMsg": False}
msg.rich_text_content = None
msg.rich_text = None
assert DingTalkAdapter._extract_text(msg) == "plain"
def test_interactive_card_forward_compat_text(self):
"""When DingTalk IM team populates content.text server-side, hermes
surfaces it automatically without code change."""
from gateway.platforms.dingtalk import DingTalkAdapter
msg = MagicMock()
msg.text = {
"content": "总结一下",
"isReplyMsg": True,
"repliedMsg": {
"msgType": "interactiveCard",
"content": {"text": "今日销售额: 1.2万 · 订单数: 38"},
},
}
msg.rich_text_content = None
msg.rich_text = None
assert (
DingTalkAdapter._extract_text(msg)
== "总结一下\n[引用] 今日销售额: 1.2万 · 订单数: 38"
)
def test_interactive_card_forward_compat_markdown(self):
from gateway.platforms.dingtalk import DingTalkAdapter
msg = MagicMock()
msg.text = {
"content": "继续",
"isReplyMsg": True,
"repliedMsg": {
"msgType": "interactiveCard",
"content": {"markdown": "## 卡片标题\n卡片正文"},
},
}
msg.rich_text_content = None
msg.rich_text = None
result = DingTalkAdapter._extract_text(msg)
assert result == "继续\n[引用] ## 卡片标题\n卡片正文"
def test_interactive_card_forward_compat_title_fallback(self):
from gateway.platforms.dingtalk import DingTalkAdapter
msg = MagicMock()
msg.text = {
"content": "?",
"isReplyMsg": True,
"repliedMsg": {
"msgType": "interactiveCard",
"content": {"title": "审批通知"},
},
}
msg.rich_text_content = None
msg.rich_text = None
assert DingTalkAdapter._extract_text(msg) == "?\n[引用] 审批通知"
def test_interactive_card_no_content_falls_back_to_placeholder(self):
"""Current DingTalk behavior: content is not populated → placeholder."""
from gateway.platforms.dingtalk import DingTalkAdapter
msg = MagicMock()
msg.text = {
"content": "这个是啥",
"isReplyMsg": True,
"repliedMsg": {"msgType": "interactiveCard"},
}
msg.rich_text_content = None
msg.rich_text = None
assert (
DingTalkAdapter._extract_text(msg)
== "这个是啥\n[引用] [interactiveCard消息]"
)
# ---------------------------------------------------------------------------
# Group gating — require_mention + allowed_users (parity with other platforms)

View file

@ -214,4 +214,4 @@ class TestConfigOverrides:
import importlib
import hermes_cli.dingtalk_auth as mod
importlib.reload(mod)
assert mod.REGISTRATION_SOURCE == "openClaw"
assert mod.REGISTRATION_SOURCE == "DING_DWS_CLAW"

View file

@ -108,7 +108,19 @@ SEND_MESSAGE_SCHEMA = {
"(not just a bare platform name), call send_message(action='list') FIRST to see "
"available targets, then send to the correct one.\n"
"If the user just says a platform name like 'send to telegram', send directly "
"to the home channel without listing first."
"to the home channel without listing first.\n\n"
"MEDIA ATTACHMENTS (images / videos / audio / voice):\n"
"This tool CAN send media files — do not tell the user it is text-only.\n"
"Embed local files in the `message` using either form:\n"
" • `MEDIA:/absolute/path/to/file.png` tag anywhere in the message, OR\n"
" • a bare absolute path like `/Users/me/pic.jpg` (auto-detected).\n"
"Supported extensions: .png .jpg .jpeg .gif .webp (image);\n"
".mp4 .mov .avi .mkv .webm (video); .mp3 .wav .m4a .ogg .opus (audio).\n"
"Add `[[audio_as_voice]]` to deliver audio as a voice message where "
"the platform supports it (DingTalk/Telegram/Feishu/WhatsApp/Signal).\n"
"Multiple attachments: include multiple MEDIA tags or paths.\n"
"Works on: DingTalk, Feishu, Telegram, Discord, Slack, WhatsApp, Signal, "
"WeCom, Weixin, BlueBubbles, Matrix, QQBot, Email."
),
"parameters": {
"type": "object",
@ -124,7 +136,14 @@ SEND_MESSAGE_SCHEMA = {
},
"message": {
"type": "string",
"description": "The message text to send"
"description": (
"The message body. Plain text, markdown, or text with "
"embedded media. To attach a local file, include either "
"`MEDIA:/absolute/path/to/file.ext` OR a bare absolute "
"path (e.g. `/Users/me/photo.jpg`) anywhere in the string. "
"The file is uploaded and sent as a native attachment "
"(image/video/audio) by the platform adapter."
)
}
},
"required": []
@ -246,7 +265,16 @@ def _handle_send(args):
from gateway.platforms.base import BasePlatformAdapter
# Two-stage media extraction (mirrors what inbound-reply rendering does):
# 1. `MEDIA:<path>` tags + `[[audio_as_voice]]` directive
# 2. bare absolute paths (e.g. "/Users/me/pic.jpg") auto-detected
# outside of fenced code blocks.
# Stage 2 matters for the common case where the agent just pastes the
# file path it already knows (what the user asked about in this flow).
media_files, cleaned_message = BasePlatformAdapter.extract_media(message)
bare_paths, cleaned_message = BasePlatformAdapter.extract_local_files(cleaned_message)
for _p in bare_paths:
media_files.append((_p, False))
mirror_text = cleaned_message.strip() or _describe_media_for_mirror(media_files)
used_home_channel = False
@ -512,6 +540,35 @@ async def _send_to_platform(platform, pconfig, chat_id, message, thread_id=None,
last_result = result
return last_result
# --- Feishu: use native adapter for text + media ---
if platform == Platform.FEISHU:
last_result = None
for i, chunk in enumerate(chunks):
is_last = (i == len(chunks) - 1)
result = await _send_feishu(
pconfig, chat_id, chunk,
media_files=media_files if is_last else [],
thread_id=thread_id,
)
if isinstance(result, dict) and result.get("error"):
return result
last_result = result
return last_result
# --- DingTalk: use native adapter for text + media ---
if platform == Platform.DINGTALK:
last_result = None
for i, chunk in enumerate(chunks):
is_last = (i == len(chunks) - 1)
result = await _send_dingtalk(
pconfig.extra, chat_id, chunk,
media_files=media_files if is_last else [],
)
if isinstance(result, dict) and result.get("error"):
return result
last_result = result
return last_result
# --- Signal: native attachment support via JSON-RPC attachments param ---
if platform == Platform.SIGNAL and media_files:
last_result = None
@ -528,11 +585,11 @@ async def _send_to_platform(platform, pconfig, chat_id, message, thread_id=None,
last_result = result
return last_result
# --- Non-media platforms ---
# --- Remaining platforms (no native media support) ---
if media_files and not message.strip():
return {
"error": (
f"send_message MEDIA delivery is currently only supported for telegram, discord, matrix, weixin, and signal; "
f"send_message MEDIA delivery is currently only supported for telegram, discord, matrix, weixin, signal, feishu, and dingtalk; "
f"target {platform.value} had only media attachments"
)
}
@ -540,7 +597,7 @@ async def _send_to_platform(platform, pconfig, chat_id, message, thread_id=None,
if media_files:
warning = (
f"MEDIA attachments were omitted for {platform.value}; "
"native send_message media delivery is currently only supported for telegram, discord, matrix, weixin, and signal"
"native send_message media delivery is currently only supported for telegram, discord, matrix, weixin, signal, feishu, and dingtalk"
)
last_result = None
@ -561,10 +618,6 @@ async def _send_to_platform(platform, pconfig, chat_id, message, thread_id=None,
result = await _send_matrix(pconfig.token, pconfig.extra, chat_id, chunk)
elif platform == Platform.HOMEASSISTANT:
result = await _send_homeassistant(pconfig.token, pconfig.extra, chat_id, chunk)
elif platform == Platform.DINGTALK:
result = await _send_dingtalk(pconfig.extra, chat_id, chunk)
elif platform == Platform.FEISHU:
result = await _send_feishu(pconfig, chat_id, chunk, thread_id=thread_id)
elif platform == Platform.WECOM:
result = await _send_wecom(pconfig.extra, chat_id, chunk)
elif platform == Platform.BLUEBUBBLES:
@ -1282,23 +1335,82 @@ async def _send_homeassistant(token, extra, chat_id, message):
return _error(f"Home Assistant send failed: {e}")
async def _send_dingtalk(extra, chat_id, message):
"""Send via DingTalk robot webhook.
async def _send_dingtalk(extra, chat_id, message, media_files=None):
"""Send via DingTalk Robot OpenAPI (proactive), matching the Feishu path.
Note: The gateway's DingTalk adapter uses per-session webhook URLs from
incoming messages (dingtalk-stream SDK). For cross-platform send_message
delivery we use a static robot webhook URL instead, which must be
configured via ``DINGTALK_WEBHOOK_URL`` env var or ``webhook_url`` in the
platform's extra config.
Routes on ``chat_id`` shape:
- ``cidXXXX==`` (openConversationId) ``/v1.0/robot/groupMessages/send``
- plain staffId ``/v1.0/robot/oToMessages/batchSend``
- LWCP-encoded sender_id fails fast with permission hint
(requires qyapi_get_department_* to resolve; see
project_dingtalk_feishu_send_gap memory for background)
Falls back to the legacy ``DINGTALK_WEBHOOK_URL`` static robot webhook
ONLY when AppKey/Secret are not configured preserves the single-group
deployment mode for users without OpenAPI access.
``media_files`` is a list of ``(local_path, is_voice_hint)`` tuples.
Each file is uploaded to DingTalk's /media/upload (OAPI legacy token)
and sent as a dedicated message with the matching msgKey
(sampleImageMsg / sampleAudio / sampleVideo / sampleFile). Requires
the app to have media/messaging permissions; otherwise media are
skipped and surfaced in ``warnings``.
"""
try:
import httpx
except ImportError:
return {"error": "httpx not installed"}
client_id = extra.get("client_id") or os.getenv("DINGTALK_CLIENT_ID", "")
client_secret = extra.get("client_secret") or os.getenv("DINGTALK_CLIENT_SECRET", "")
robot_code = extra.get("robot_code") or client_id
if client_id and client_secret:
try:
from gateway.platforms.dingtalk import dingtalk_send_proactive
except ImportError:
return {"error": "DingTalk adapter not available"}
try:
result = await dingtalk_send_proactive(
client_id=client_id,
client_secret=client_secret,
robot_code=robot_code,
chat_id=chat_id,
content=message,
media_files=media_files,
)
except Exception as e:
return _error(f"DingTalk send failed: {e}")
if result.get("error") and not result.get("success"):
return _error(result["error"])
payload = {
"success": True,
"platform": "dingtalk",
"chat_id": chat_id,
"message_id": result.get("request_id", ""),
"route": result.get("route", ""),
}
media_results = result.get("media_results") or []
failed_media = [r for r in media_results if r.get("error")]
if failed_media:
payload["warnings"] = [
f"Media delivery failure: {m.get('error')}" for m in failed_media
]
if media_results:
payload["media_results"] = media_results
return payload
# Legacy single-group custom robot webhook fallback.
webhook_url = extra.get("webhook_url") or os.getenv("DINGTALK_WEBHOOK_URL", "")
if not webhook_url:
return {
"error": (
"DingTalk not configured. Set DINGTALK_CLIENT_ID + "
"DINGTALK_CLIENT_SECRET (Robot OpenAPI, routed by chat_id) or "
"DINGTALK_WEBHOOK_URL (legacy single-group custom robot)."
)
}
try:
webhook_url = extra.get("webhook_url") or os.getenv("DINGTALK_WEBHOOK_URL", "")
if not webhook_url:
return {"error": "DingTalk not configured. Set DINGTALK_WEBHOOK_URL env var or webhook_url in dingtalk platform extra config."}
async with httpx.AsyncClient(timeout=30.0) as client:
resp = await client.post(
webhook_url,
@ -1308,7 +1420,15 @@ async def _send_dingtalk(extra, chat_id, message):
data = resp.json()
if data.get("errcode", 0) != 0:
return _error(f"DingTalk API error: {data.get('errmsg', 'unknown')}")
return {"success": True, "platform": "dingtalk", "chat_id": chat_id}
payload = {
"success": True,
"platform": "dingtalk",
"chat_id": chat_id,
"route": "webhook-legacy",
}
if warnings:
payload["warnings"] = warnings
return payload
except Exception as e:
return _error(f"DingTalk send failed: {e}")