diff --git a/gateway/platforms/dingtalk.py b/gateway/platforms/dingtalk.py index 923e6bd2b..67c6ee8db 100644 --- a/gateway/platforms/dingtalk.py +++ b/gateway/platforms/dingtalk.py @@ -483,13 +483,39 @@ class _IncomingHandler(ChatbotHandler if DINGTALK_STREAM_AVAILABLE else object): """Called by dingtalk-stream when a message arrives. dingtalk-stream >= 0.24 passes a CallbackMessage whose `.data` contains - the chatbot payload. Convert it to ChatbotMessage and await the adapter - handler directly on the main event loop. + the chatbot payload. Convert it to ChatbotMessage via + ``ChatbotMessage.from_dict()``. + + Message processing is dispatched as a background task so that this + method returns the ACK immediately — blocking here would prevent the + SDK from sending heartbeats, eventually causing a disconnect. """ try: - chatbot_msg = ChatbotMessage.from_dict(callback_message.data) + data = callback_message.data + chatbot_msg = ChatbotMessage.from_dict(data) + + # Ensure session_webhook is populated even if the SDK's + # from_dict() did not map it (field name mismatch across + # SDK versions). + if not getattr(chatbot_msg, "session_webhook", None): + webhook = ( + data.get("sessionWebhook") + or data.get("session_webhook") + or "" + ) + if webhook: + chatbot_msg.session_webhook = webhook + + # Fire-and-forget: return ACK immediately, process in background. + asyncio.create_task(self._safe_on_message(chatbot_msg)) + except Exception: + logger.exception("[DingTalk] Error preparing incoming message") + + return dingtalk_stream.AckMessage.STATUS_OK, "OK" + + async def _safe_on_message(self, chatbot_msg: "ChatbotMessage") -> None: + """Wrapper that catches exceptions from _on_message.""" + try: await self._adapter._on_message(chatbot_msg) except Exception: logger.exception("[DingTalk] Error processing incoming message") - - return dingtalk_stream.AckMessage.STATUS_OK, "OK" diff --git a/tests/gateway/test_dingtalk.py b/tests/gateway/test_dingtalk.py index 8404281d8..a004e17aa 100644 --- a/tests/gateway/test_dingtalk.py +++ b/tests/gateway/test_dingtalk.py @@ -575,3 +575,109 @@ class TestShouldProcessMessage: # Different group still blocked assert adapter._should_process_message(msg, "hi", is_group=True, chat_id="grp2") is False + +# --------------------------------------------------------------------------- +# _IncomingHandler.process — session_webhook extraction & fire-and-forget +# --------------------------------------------------------------------------- + + +class TestIncomingHandlerProcess: + """Verify that _IncomingHandler.process correctly converts callback data + and dispatches message processing as a background task (fire-and-forget) + so the SDK ACK is returned immediately.""" + + @pytest.mark.asyncio + async def test_process_extracts_session_webhook(self): + """session_webhook must be populated from callback data.""" + from gateway.platforms.dingtalk import _IncomingHandler, DingTalkAdapter + + adapter = DingTalkAdapter(PlatformConfig(enabled=True)) + adapter._on_message = AsyncMock() + handler = _IncomingHandler(adapter, asyncio.get_running_loop()) + + callback = MagicMock() + callback.data = { + "msgtype": "text", + "text": {"content": "hello"}, + "senderId": "user1", + "conversationId": "conv1", + "sessionWebhook": "https://oapi.dingtalk.com/robot/sendBySession?session=abc", + "msgId": "msg-001", + } + + result = await handler.process(callback) + # Should return ACK immediately (STATUS_OK = 200) + assert result[0] == 200 + + # Let the background task run + await asyncio.sleep(0.05) + + # _on_message should have been called with a ChatbotMessage + adapter._on_message.assert_called_once() + chatbot_msg = adapter._on_message.call_args[0][0] + assert chatbot_msg.session_webhook == "https://oapi.dingtalk.com/robot/sendBySession?session=abc" + + @pytest.mark.asyncio + async def test_process_fallback_session_webhook_when_from_dict_misses_it(self): + """If ChatbotMessage.from_dict does not map sessionWebhook (e.g. SDK + version mismatch), the handler should fall back to extracting it + directly from the raw data dict.""" + from gateway.platforms.dingtalk import _IncomingHandler, DingTalkAdapter + + adapter = DingTalkAdapter(PlatformConfig(enabled=True)) + adapter._on_message = AsyncMock() + handler = _IncomingHandler(adapter, asyncio.get_running_loop()) + + callback = MagicMock() + # Use a key that from_dict might not recognise in some SDK versions + callback.data = { + "msgtype": "text", + "text": {"content": "hi"}, + "senderId": "user2", + "conversationId": "conv2", + "session_webhook": "https://oapi.dingtalk.com/robot/sendBySession?session=def", + "msgId": "msg-002", + } + + await handler.process(callback) + await asyncio.sleep(0.05) + + adapter._on_message.assert_called_once() + chatbot_msg = adapter._on_message.call_args[0][0] + assert chatbot_msg.session_webhook == "https://oapi.dingtalk.com/robot/sendBySession?session=def" + + @pytest.mark.asyncio + async def test_process_returns_ack_immediately(self): + """process() must not block on _on_message — it should return + the ACK tuple before the message is fully processed.""" + from gateway.platforms.dingtalk import _IncomingHandler, DingTalkAdapter + + processing_started = asyncio.Event() + processing_gate = asyncio.Event() + + async def slow_on_message(msg): + processing_started.set() + await processing_gate.wait() # Block until we release + + adapter = DingTalkAdapter(PlatformConfig(enabled=True)) + adapter._on_message = slow_on_message + handler = _IncomingHandler(adapter, asyncio.get_running_loop()) + + callback = MagicMock() + callback.data = { + "msgtype": "text", + "text": {"content": "test"}, + "senderId": "u", + "conversationId": "c", + "sessionWebhook": "https://oapi.dingtalk.com/x", + "msgId": "m", + } + + # process() should return immediately even though _on_message blocks + result = await handler.process(callback) + assert result[0] == 200 + + # Clean up: release the gate so the background task finishes + processing_gate.set() + await asyncio.sleep(0.05) +