fix(dingtalk,api): validate session webhook URL origin, cap webhook cache, reject header injection

dingtalk.py: The session_webhook URL from incoming DingTalk messages is POSTed to
without any origin validation (line 290), enabling SSRF attacks via crafted webhook
URLs (e.g. http://169.254.169.254/ to reach cloud metadata).  Add a regex check
that only accepts the official DingTalk API origin (https://api.dingtalk.com/).
Also cap _session_webhooks dict at 500 entries with FIFO eviction to prevent
unbounded memory growth from long-running gateway instances.

api_server.py: The X-Hermes-Session-Id request header is accepted and echoed back
into response headers (lines 675, 697) without sanitization.  A session ID
containing \r\n enables HTTP response splitting / header injection.  Add a check
that rejects session IDs containing control characters (\r, \n, \x00).

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
aaronagent 2026-04-10 11:52:01 +08:00 committed by Teknium
parent b577697189
commit 37bb4f807b
2 changed files with 18 additions and 2 deletions

View file

@ -20,6 +20,7 @@ Configuration in config.yaml:
import asyncio
import logging
import os
import re
import time
import uuid
from datetime import datetime, timezone
@ -54,6 +55,8 @@ MAX_MESSAGE_LENGTH = 20000
DEDUP_WINDOW_SECONDS = 300
DEDUP_MAX_SIZE = 1000
RECONNECT_BACKOFF = [2, 5, 10, 30, 60]
_SESSION_WEBHOOKS_MAX = 500
_DINGTALK_WEBHOOK_RE = re.compile(r'^https://api\.dingtalk\.com/')
def check_dingtalk_requirements() -> bool:
@ -195,9 +198,15 @@ class DingTalkAdapter(BasePlatformAdapter):
chat_id = conversation_id or sender_id
chat_type = "group" if is_group else "dm"
# Store session webhook for reply routing
# Store session webhook for reply routing (validate origin to prevent SSRF)
session_webhook = getattr(message, "session_webhook", None) or ""
if session_webhook and chat_id:
if session_webhook and chat_id and _DINGTALK_WEBHOOK_RE.match(session_webhook):
if len(self._session_webhooks) >= _SESSION_WEBHOOKS_MAX:
# Evict oldest entry to cap memory growth
try:
self._session_webhooks.pop(next(iter(self._session_webhooks)))
except StopIteration:
pass
self._session_webhooks[chat_id] = session_webhook
source = self.build_source(