hermes-agent/tests/gateway/test_dingtalk.py
Teknium aca81ac7bb test(dingtalk): cover require_mention + allowed_users gating
Adds 16 regression tests for the gating logic introduced in the
salvaged commit:

  * TestAllowedUsersGate — empty/wildcard/case-insensitive matching,
    staff_id vs sender_id, env var CSV population
  * TestMentionPatterns — compilation, case-insensitivity, invalid
    regex is skipped-not-raised, JSON env var, newline fallback
  * TestShouldProcessMessage — DM always accepted, group gating via
    require_mention / is_in_at_list / wake-word pattern / free_response_chats

Also adds yule975 to scripts/release.py AUTHOR_MAP (release CI blocks
unmapped emails).
2026-04-17 04:21:49 -07:00

577 lines
22 KiB
Python

"""Tests for DingTalk platform adapter."""
import asyncio
import json
from datetime import datetime, timezone
from types import SimpleNamespace
from unittest.mock import AsyncMock, MagicMock, patch, PropertyMock
import pytest
from gateway.config import Platform, PlatformConfig
# ---------------------------------------------------------------------------
# Requirements check
# ---------------------------------------------------------------------------
class TestDingTalkRequirements:
def test_returns_false_when_sdk_missing(self, monkeypatch):
with patch.dict("sys.modules", {"dingtalk_stream": None}):
monkeypatch.setattr(
"gateway.platforms.dingtalk.DINGTALK_STREAM_AVAILABLE", False
)
from gateway.platforms.dingtalk import check_dingtalk_requirements
assert check_dingtalk_requirements() is False
def test_returns_false_when_env_vars_missing(self, monkeypatch):
monkeypatch.setattr(
"gateway.platforms.dingtalk.DINGTALK_STREAM_AVAILABLE", True
)
monkeypatch.setattr("gateway.platforms.dingtalk.HTTPX_AVAILABLE", True)
monkeypatch.delenv("DINGTALK_CLIENT_ID", raising=False)
monkeypatch.delenv("DINGTALK_CLIENT_SECRET", raising=False)
from gateway.platforms.dingtalk import check_dingtalk_requirements
assert check_dingtalk_requirements() is False
def test_returns_true_when_all_available(self, monkeypatch):
monkeypatch.setattr(
"gateway.platforms.dingtalk.DINGTALK_STREAM_AVAILABLE", True
)
monkeypatch.setattr("gateway.platforms.dingtalk.HTTPX_AVAILABLE", True)
monkeypatch.setenv("DINGTALK_CLIENT_ID", "test-id")
monkeypatch.setenv("DINGTALK_CLIENT_SECRET", "test-secret")
from gateway.platforms.dingtalk import check_dingtalk_requirements
assert check_dingtalk_requirements() is True
# ---------------------------------------------------------------------------
# Adapter construction
# ---------------------------------------------------------------------------
class TestDingTalkAdapterInit:
def test_reads_config_from_extra(self):
from gateway.platforms.dingtalk import DingTalkAdapter
config = PlatformConfig(
enabled=True,
extra={"client_id": "cfg-id", "client_secret": "cfg-secret"},
)
adapter = DingTalkAdapter(config)
assert adapter._client_id == "cfg-id"
assert adapter._client_secret == "cfg-secret"
assert adapter.name == "Dingtalk" # base class uses .title()
def test_falls_back_to_env_vars(self, monkeypatch):
monkeypatch.setenv("DINGTALK_CLIENT_ID", "env-id")
monkeypatch.setenv("DINGTALK_CLIENT_SECRET", "env-secret")
from gateway.platforms.dingtalk import DingTalkAdapter
config = PlatformConfig(enabled=True)
adapter = DingTalkAdapter(config)
assert adapter._client_id == "env-id"
assert adapter._client_secret == "env-secret"
# ---------------------------------------------------------------------------
# Message text extraction
# ---------------------------------------------------------------------------
class TestExtractText:
def test_extracts_dict_text(self):
from gateway.platforms.dingtalk import DingTalkAdapter
msg = MagicMock()
msg.text = {"content": " hello world "}
msg.rich_text = None
assert DingTalkAdapter._extract_text(msg) == "hello world"
def test_extracts_string_text(self):
from gateway.platforms.dingtalk import DingTalkAdapter
msg = MagicMock()
msg.text = "plain text"
msg.rich_text = None
assert DingTalkAdapter._extract_text(msg) == "plain text"
def test_falls_back_to_rich_text(self):
from gateway.platforms.dingtalk import DingTalkAdapter
msg = MagicMock()
msg.text = ""
msg.rich_text = [{"text": "part1"}, {"text": "part2"}, {"image": "url"}]
assert DingTalkAdapter._extract_text(msg) == "part1 part2"
def test_returns_empty_for_no_content(self):
from gateway.platforms.dingtalk import DingTalkAdapter
msg = MagicMock()
msg.text = ""
msg.rich_text = None
assert DingTalkAdapter._extract_text(msg) == ""
# ---------------------------------------------------------------------------
# Deduplication
# ---------------------------------------------------------------------------
class TestDeduplication:
def test_first_message_not_duplicate(self):
from gateway.platforms.dingtalk import DingTalkAdapter
adapter = DingTalkAdapter(PlatformConfig(enabled=True))
assert adapter._dedup.is_duplicate("msg-1") is False
def test_second_same_message_is_duplicate(self):
from gateway.platforms.dingtalk import DingTalkAdapter
adapter = DingTalkAdapter(PlatformConfig(enabled=True))
adapter._dedup.is_duplicate("msg-1")
assert adapter._dedup.is_duplicate("msg-1") is True
def test_different_messages_not_duplicate(self):
from gateway.platforms.dingtalk import DingTalkAdapter
adapter = DingTalkAdapter(PlatformConfig(enabled=True))
adapter._dedup.is_duplicate("msg-1")
assert adapter._dedup.is_duplicate("msg-2") is False
def test_cache_cleanup_on_overflow(self):
from gateway.platforms.dingtalk import DingTalkAdapter
adapter = DingTalkAdapter(PlatformConfig(enabled=True))
max_size = adapter._dedup._max_size
# Fill beyond max
for i in range(max_size + 10):
adapter._dedup.is_duplicate(f"msg-{i}")
# Cache should have been pruned
assert len(adapter._dedup._seen) <= max_size + 10
# ---------------------------------------------------------------------------
# Send
# ---------------------------------------------------------------------------
class TestSend:
@pytest.mark.asyncio
async def test_send_posts_to_webhook(self):
from gateway.platforms.dingtalk import DingTalkAdapter
adapter = DingTalkAdapter(PlatformConfig(enabled=True))
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.text = "OK"
mock_client = AsyncMock()
mock_client.post = AsyncMock(return_value=mock_response)
adapter._http_client = mock_client
result = await adapter.send(
"chat-123", "Hello!",
metadata={"session_webhook": "https://dingtalk.example/webhook"}
)
assert result.success is True
mock_client.post.assert_called_once()
call_args = mock_client.post.call_args
assert call_args[0][0] == "https://dingtalk.example/webhook"
payload = call_args[1]["json"]
assert payload["msgtype"] == "markdown"
assert payload["markdown"]["title"] == "Hermes"
assert payload["markdown"]["text"] == "Hello!"
@pytest.mark.asyncio
async def test_send_fails_without_webhook(self):
from gateway.platforms.dingtalk import DingTalkAdapter
adapter = DingTalkAdapter(PlatformConfig(enabled=True))
adapter._http_client = AsyncMock()
result = await adapter.send("chat-123", "Hello!")
assert result.success is False
assert "session_webhook" in result.error
@pytest.mark.asyncio
async def test_send_uses_cached_webhook(self):
from gateway.platforms.dingtalk import DingTalkAdapter
adapter = DingTalkAdapter(PlatformConfig(enabled=True))
mock_response = MagicMock()
mock_response.status_code = 200
mock_client = AsyncMock()
mock_client.post = AsyncMock(return_value=mock_response)
adapter._http_client = mock_client
adapter._session_webhooks["chat-123"] = "https://cached.example/webhook"
result = await adapter.send("chat-123", "Hello!")
assert result.success is True
assert mock_client.post.call_args[0][0] == "https://cached.example/webhook"
@pytest.mark.asyncio
async def test_send_handles_http_error(self):
from gateway.platforms.dingtalk import DingTalkAdapter
adapter = DingTalkAdapter(PlatformConfig(enabled=True))
mock_response = MagicMock()
mock_response.status_code = 400
mock_response.text = "Bad Request"
mock_client = AsyncMock()
mock_client.post = AsyncMock(return_value=mock_response)
adapter._http_client = mock_client
result = await adapter.send(
"chat-123", "Hello!",
metadata={"session_webhook": "https://example/webhook"}
)
assert result.success is False
assert "400" in result.error
# ---------------------------------------------------------------------------
# Connect / disconnect
# ---------------------------------------------------------------------------
class TestConnect:
@pytest.mark.asyncio
async def test_disconnect_closes_session_websocket(self):
from gateway.platforms.dingtalk import DingTalkAdapter
adapter = DingTalkAdapter(PlatformConfig(enabled=True))
websocket = AsyncMock()
blocker = asyncio.Event()
async def _run_forever():
try:
await blocker.wait()
except asyncio.CancelledError:
return
adapter._stream_client = SimpleNamespace(websocket=websocket)
adapter._stream_task = asyncio.create_task(_run_forever())
adapter._running = True
await adapter.disconnect()
websocket.close.assert_awaited_once()
assert adapter._stream_task is None
@pytest.mark.asyncio
async def test_connect_fails_without_sdk(self, monkeypatch):
monkeypatch.setattr(
"gateway.platforms.dingtalk.DINGTALK_STREAM_AVAILABLE", False
)
from gateway.platforms.dingtalk import DingTalkAdapter
adapter = DingTalkAdapter(PlatformConfig(enabled=True))
result = await adapter.connect()
assert result is False
@pytest.mark.asyncio
async def test_connect_fails_without_credentials(self):
from gateway.platforms.dingtalk import DingTalkAdapter
adapter = DingTalkAdapter(PlatformConfig(enabled=True))
adapter._client_id = ""
adapter._client_secret = ""
result = await adapter.connect()
assert result is False
@pytest.mark.asyncio
async def test_disconnect_cleans_up(self):
from gateway.platforms.dingtalk import DingTalkAdapter
adapter = DingTalkAdapter(PlatformConfig(enabled=True))
adapter._session_webhooks["a"] = "http://x"
adapter._dedup._seen["b"] = 1.0
adapter._http_client = AsyncMock()
adapter._stream_task = None
await adapter.disconnect()
assert len(adapter._session_webhooks) == 0
assert len(adapter._dedup._seen) == 0
assert adapter._http_client is None
# ---------------------------------------------------------------------------
# Platform enum
# ---------------------------------------------------------------------------
# ---------------------------------------------------------------------------
# SDK compatibility regression tests (dingtalk-stream >= 0.20 / 0.24)
# ---------------------------------------------------------------------------
class TestWebhookDomainAllowlist:
"""Guard the webhook origin allowlist against regression.
The SDK started returning reply webhooks on ``oapi.dingtalk.com`` in
addition to ``api.dingtalk.com``. Both must be accepted, and hostile
lookalikes must still be rejected (SSRF defence-in-depth).
"""
def test_api_domain_accepted(self):
from gateway.platforms.dingtalk import _DINGTALK_WEBHOOK_RE
assert _DINGTALK_WEBHOOK_RE.match(
"https://api.dingtalk.com/robot/send?access_token=x"
)
def test_oapi_domain_accepted(self):
from gateway.platforms.dingtalk import _DINGTALK_WEBHOOK_RE
assert _DINGTALK_WEBHOOK_RE.match(
"https://oapi.dingtalk.com/robot/send?access_token=x"
)
def test_http_rejected(self):
from gateway.platforms.dingtalk import _DINGTALK_WEBHOOK_RE
assert not _DINGTALK_WEBHOOK_RE.match("http://api.dingtalk.com/robot/send")
def test_suffix_attack_rejected(self):
from gateway.platforms.dingtalk import _DINGTALK_WEBHOOK_RE
assert not _DINGTALK_WEBHOOK_RE.match(
"https://api.dingtalk.com.evil.example/"
)
def test_unsanctioned_subdomain_rejected(self):
from gateway.platforms.dingtalk import _DINGTALK_WEBHOOK_RE
# Only api.* and oapi.* are allowed — e.g. eapi.dingtalk.com must not slip through
assert not _DINGTALK_WEBHOOK_RE.match("https://eapi.dingtalk.com/robot/send")
class TestHandlerProcessIsAsync:
"""dingtalk-stream >= 0.20 requires ``process`` to be a coroutine."""
def test_process_is_coroutine_function(self):
from gateway.platforms.dingtalk import _IncomingHandler
assert asyncio.iscoroutinefunction(_IncomingHandler.process)
class TestExtractText:
"""_extract_text must handle both legacy and current SDK payload shapes.
Before SDK 0.20 ``message.text`` was a ``dict`` with a ``content`` key.
From 0.20 onward it is a ``TextContent`` dataclass whose ``__str__``
returns ``"TextContent(content=...)"`` — falling back to ``str(text)``
leaks that repr into the agent's input.
"""
def test_text_as_dict_legacy(self):
from gateway.platforms.dingtalk import DingTalkAdapter
msg = MagicMock()
msg.text = {"content": "hello world"}
msg.rich_text_content = None
msg.rich_text = None
assert DingTalkAdapter._extract_text(msg) == "hello world"
def test_text_as_textcontent_object(self):
"""SDK >= 0.20 shape: object with ``.content`` attribute."""
from gateway.platforms.dingtalk import DingTalkAdapter
class FakeTextContent:
content = "hello from new sdk"
def __str__(self): # mimic real SDK repr
return f"TextContent(content={self.content})"
msg = MagicMock()
msg.text = FakeTextContent()
msg.rich_text_content = None
msg.rich_text = None
result = DingTalkAdapter._extract_text(msg)
assert result == "hello from new sdk"
assert "TextContent(" not in result
def test_text_content_attr_with_empty_string(self):
from gateway.platforms.dingtalk import DingTalkAdapter
class FakeTextContent:
content = ""
msg = MagicMock()
msg.text = FakeTextContent()
msg.rich_text_content = None
msg.rich_text = None
assert DingTalkAdapter._extract_text(msg) == ""
def test_rich_text_content_new_shape(self):
"""SDK >= 0.20 exposes rich text as ``message.rich_text_content.rich_text_list``."""
from gateway.platforms.dingtalk import DingTalkAdapter
class FakeRichText:
rich_text_list = [{"text": "hello "}, {"text": "world"}]
msg = MagicMock()
msg.text = None
msg.rich_text_content = FakeRichText()
msg.rich_text = None
result = DingTalkAdapter._extract_text(msg)
assert "hello" in result and "world" in result
def test_rich_text_legacy_shape(self):
"""Legacy ``message.rich_text`` list remains supported."""
from gateway.platforms.dingtalk import DingTalkAdapter
msg = MagicMock()
msg.text = None
msg.rich_text_content = None
msg.rich_text = [{"text": "legacy "}, {"text": "rich"}]
result = DingTalkAdapter._extract_text(msg)
assert "legacy" in result and "rich" in result
def test_empty_message(self):
from gateway.platforms.dingtalk import DingTalkAdapter
msg = MagicMock()
msg.text = None
msg.rich_text_content = None
msg.rich_text = None
assert DingTalkAdapter._extract_text(msg) == ""
# ---------------------------------------------------------------------------
# Group gating — require_mention + allowed_users (parity with other platforms)
# ---------------------------------------------------------------------------
def _make_gating_adapter(monkeypatch, *, extra=None, env=None):
"""Build a DingTalkAdapter with only the gating fields populated.
Clears every DINGTALK_* gating env var before applying the caller's
overrides so individual tests stay isolated.
"""
for key in (
"DINGTALK_REQUIRE_MENTION",
"DINGTALK_MENTION_PATTERNS",
"DINGTALK_FREE_RESPONSE_CHATS",
"DINGTALK_ALLOWED_USERS",
):
monkeypatch.delenv(key, raising=False)
for key, value in (env or {}).items():
monkeypatch.setenv(key, value)
from gateway.platforms.dingtalk import DingTalkAdapter
return DingTalkAdapter(PlatformConfig(enabled=True, extra=extra or {}))
class TestAllowedUsersGate:
def test_empty_allowlist_allows_everyone(self, monkeypatch):
adapter = _make_gating_adapter(monkeypatch)
assert adapter._is_user_allowed("anyone", "any-staff") is True
def test_wildcard_allowlist_allows_everyone(self, monkeypatch):
adapter = _make_gating_adapter(monkeypatch, extra={"allowed_users": ["*"]})
assert adapter._is_user_allowed("anyone", "any-staff") is True
def test_matches_sender_id_case_insensitive(self, monkeypatch):
adapter = _make_gating_adapter(
monkeypatch, extra={"allowed_users": ["SenderABC"]}
)
assert adapter._is_user_allowed("senderabc", "") is True
def test_matches_staff_id(self, monkeypatch):
adapter = _make_gating_adapter(
monkeypatch, extra={"allowed_users": ["staff_1234"]}
)
assert adapter._is_user_allowed("", "staff_1234") is True
def test_rejects_unknown_user(self, monkeypatch):
adapter = _make_gating_adapter(
monkeypatch, extra={"allowed_users": ["staff_1234"]}
)
assert adapter._is_user_allowed("other-sender", "other-staff") is False
def test_env_var_csv_populates_allowlist(self, monkeypatch):
adapter = _make_gating_adapter(
monkeypatch, env={"DINGTALK_ALLOWED_USERS": "alice,bob,carol"}
)
assert adapter._is_user_allowed("alice", "") is True
assert adapter._is_user_allowed("dave", "") is False
class TestMentionPatterns:
def test_empty_patterns_list(self, monkeypatch):
adapter = _make_gating_adapter(monkeypatch)
assert adapter._mention_patterns == []
assert adapter._message_matches_mention_patterns("anything") is False
def test_pattern_matches_text(self, monkeypatch):
adapter = _make_gating_adapter(
monkeypatch, extra={"mention_patterns": ["^hermes"]}
)
assert adapter._message_matches_mention_patterns("hermes please help") is True
assert adapter._message_matches_mention_patterns("please hermes help") is False
def test_pattern_is_case_insensitive(self, monkeypatch):
adapter = _make_gating_adapter(
monkeypatch, extra={"mention_patterns": ["^hermes"]}
)
assert adapter._message_matches_mention_patterns("HERMES help") is True
def test_invalid_regex_is_skipped_not_raised(self, monkeypatch):
adapter = _make_gating_adapter(
monkeypatch,
extra={"mention_patterns": ["[unclosed", "^valid"]},
)
# Invalid pattern dropped, valid one kept
assert len(adapter._mention_patterns) == 1
assert adapter._message_matches_mention_patterns("valid trigger") is True
def test_env_var_json_populates_patterns(self, monkeypatch):
adapter = _make_gating_adapter(
monkeypatch,
env={"DINGTALK_MENTION_PATTERNS": '["^bot", "^assistant"]'},
)
assert len(adapter._mention_patterns) == 2
assert adapter._message_matches_mention_patterns("bot ping") is True
def test_env_var_newline_fallback_when_not_json(self, monkeypatch):
adapter = _make_gating_adapter(
monkeypatch,
env={"DINGTALK_MENTION_PATTERNS": "^bot\n^assistant"},
)
assert len(adapter._mention_patterns) == 2
class TestShouldProcessMessage:
def test_dm_always_accepted(self, monkeypatch):
adapter = _make_gating_adapter(
monkeypatch, extra={"require_mention": True}
)
msg = MagicMock(is_in_at_list=False)
assert adapter._should_process_message(msg, "hi", is_group=False, chat_id="dm1") is True
def test_group_rejected_when_require_mention_and_no_trigger(self, monkeypatch):
adapter = _make_gating_adapter(
monkeypatch, extra={"require_mention": True}
)
msg = MagicMock(is_in_at_list=False)
assert adapter._should_process_message(msg, "hi", is_group=True, chat_id="grp1") is False
def test_group_accepted_when_require_mention_disabled(self, monkeypatch):
adapter = _make_gating_adapter(
monkeypatch, extra={"require_mention": False}
)
msg = MagicMock(is_in_at_list=False)
assert adapter._should_process_message(msg, "hi", is_group=True, chat_id="grp1") is True
def test_group_accepted_when_bot_is_mentioned(self, monkeypatch):
adapter = _make_gating_adapter(
monkeypatch, extra={"require_mention": True}
)
msg = MagicMock(is_in_at_list=True)
assert adapter._should_process_message(msg, "hi", is_group=True, chat_id="grp1") is True
def test_group_accepted_when_text_matches_wake_word(self, monkeypatch):
adapter = _make_gating_adapter(
monkeypatch,
extra={"require_mention": True, "mention_patterns": ["^hermes"]},
)
msg = MagicMock(is_in_at_list=False)
assert adapter._should_process_message(msg, "hermes help", is_group=True, chat_id="grp1") is True
def test_group_accepted_when_chat_in_free_response_list(self, monkeypatch):
adapter = _make_gating_adapter(
monkeypatch,
extra={"require_mention": True, "free_response_chats": ["grp1"]},
)
msg = MagicMock(is_in_at_list=False)
assert adapter._should_process_message(msg, "hi", is_group=True, chat_id="grp1") is True
# Different group still blocked
assert adapter._should_process_message(msg, "hi", is_group=True, chat_id="grp2") is False