fix(dingtalk): fire-and-forget message processing & session_webhook fallback

Fixes #11463: DingTalk channel receives messages but fails to reply
with 'No session_webhook available'.

Two changes:

1. **Fire-and-forget message processing**: process() now dispatches
   _on_message as a background task via asyncio.create_task instead of
   awaiting it. This ensures the SDK ACK is returned immediately,
   preventing heartbeat timeouts and disconnections when message
   processing takes longer than the SDK's ACK deadline.

2. **session_webhook extraction fallback**: If ChatbotMessage.from_dict()
   fails to map the sessionWebhook field (possible across SDK versions),
   the handler now falls back to extracting it directly from the raw
   callback data dict using both 'sessionWebhook' and 'session_webhook'
   key variants.

Added 3 tests covering webhook extraction, fallback behavior, and
fire-and-forget ACK timing.
This commit is contained in:
kagura-agent 2026-04-17 17:36:29 +08:00 committed by Teknium
parent 91e7aff219
commit 47a0dd1024
2 changed files with 137 additions and 5 deletions

View file

@ -483,13 +483,39 @@ class _IncomingHandler(ChatbotHandler if DINGTALK_STREAM_AVAILABLE else object):
"""Called by dingtalk-stream when a message arrives.
dingtalk-stream >= 0.24 passes a CallbackMessage whose `.data` contains
the chatbot payload. Convert it to ChatbotMessage and await the adapter
handler directly on the main event loop.
the chatbot payload. Convert it to ChatbotMessage via
``ChatbotMessage.from_dict()``.
Message processing is dispatched as a background task so that this
method returns the ACK immediately blocking here would prevent the
SDK from sending heartbeats, eventually causing a disconnect.
"""
try:
chatbot_msg = ChatbotMessage.from_dict(callback_message.data)
data = callback_message.data
chatbot_msg = ChatbotMessage.from_dict(data)
# Ensure session_webhook is populated even if the SDK's
# from_dict() did not map it (field name mismatch across
# SDK versions).
if not getattr(chatbot_msg, "session_webhook", None):
webhook = (
data.get("sessionWebhook")
or data.get("session_webhook")
or ""
)
if webhook:
chatbot_msg.session_webhook = webhook
# Fire-and-forget: return ACK immediately, process in background.
asyncio.create_task(self._safe_on_message(chatbot_msg))
except Exception:
logger.exception("[DingTalk] Error preparing incoming message")
return dingtalk_stream.AckMessage.STATUS_OK, "OK"
async def _safe_on_message(self, chatbot_msg: "ChatbotMessage") -> None:
"""Wrapper that catches exceptions from _on_message."""
try:
await self._adapter._on_message(chatbot_msg)
except Exception:
logger.exception("[DingTalk] Error processing incoming message")
return dingtalk_stream.AckMessage.STATUS_OK, "OK"

View file

@ -575,3 +575,109 @@ class TestShouldProcessMessage:
# Different group still blocked
assert adapter._should_process_message(msg, "hi", is_group=True, chat_id="grp2") is False
# ---------------------------------------------------------------------------
# _IncomingHandler.process — session_webhook extraction & fire-and-forget
# ---------------------------------------------------------------------------
class TestIncomingHandlerProcess:
"""Verify that _IncomingHandler.process correctly converts callback data
and dispatches message processing as a background task (fire-and-forget)
so the SDK ACK is returned immediately."""
@pytest.mark.asyncio
async def test_process_extracts_session_webhook(self):
"""session_webhook must be populated from callback data."""
from gateway.platforms.dingtalk import _IncomingHandler, DingTalkAdapter
adapter = DingTalkAdapter(PlatformConfig(enabled=True))
adapter._on_message = AsyncMock()
handler = _IncomingHandler(adapter, asyncio.get_running_loop())
callback = MagicMock()
callback.data = {
"msgtype": "text",
"text": {"content": "hello"},
"senderId": "user1",
"conversationId": "conv1",
"sessionWebhook": "https://oapi.dingtalk.com/robot/sendBySession?session=abc",
"msgId": "msg-001",
}
result = await handler.process(callback)
# Should return ACK immediately (STATUS_OK = 200)
assert result[0] == 200
# Let the background task run
await asyncio.sleep(0.05)
# _on_message should have been called with a ChatbotMessage
adapter._on_message.assert_called_once()
chatbot_msg = adapter._on_message.call_args[0][0]
assert chatbot_msg.session_webhook == "https://oapi.dingtalk.com/robot/sendBySession?session=abc"
@pytest.mark.asyncio
async def test_process_fallback_session_webhook_when_from_dict_misses_it(self):
"""If ChatbotMessage.from_dict does not map sessionWebhook (e.g. SDK
version mismatch), the handler should fall back to extracting it
directly from the raw data dict."""
from gateway.platforms.dingtalk import _IncomingHandler, DingTalkAdapter
adapter = DingTalkAdapter(PlatformConfig(enabled=True))
adapter._on_message = AsyncMock()
handler = _IncomingHandler(adapter, asyncio.get_running_loop())
callback = MagicMock()
# Use a key that from_dict might not recognise in some SDK versions
callback.data = {
"msgtype": "text",
"text": {"content": "hi"},
"senderId": "user2",
"conversationId": "conv2",
"session_webhook": "https://oapi.dingtalk.com/robot/sendBySession?session=def",
"msgId": "msg-002",
}
await handler.process(callback)
await asyncio.sleep(0.05)
adapter._on_message.assert_called_once()
chatbot_msg = adapter._on_message.call_args[0][0]
assert chatbot_msg.session_webhook == "https://oapi.dingtalk.com/robot/sendBySession?session=def"
@pytest.mark.asyncio
async def test_process_returns_ack_immediately(self):
"""process() must not block on _on_message — it should return
the ACK tuple before the message is fully processed."""
from gateway.platforms.dingtalk import _IncomingHandler, DingTalkAdapter
processing_started = asyncio.Event()
processing_gate = asyncio.Event()
async def slow_on_message(msg):
processing_started.set()
await processing_gate.wait() # Block until we release
adapter = DingTalkAdapter(PlatformConfig(enabled=True))
adapter._on_message = slow_on_message
handler = _IncomingHandler(adapter, asyncio.get_running_loop())
callback = MagicMock()
callback.data = {
"msgtype": "text",
"text": {"content": "test"},
"senderId": "u",
"conversationId": "c",
"sessionWebhook": "https://oapi.dingtalk.com/x",
"msgId": "m",
}
# process() should return immediately even though _on_message blocks
result = await handler.process(callback)
assert result[0] == 200
# Clean up: release the gate so the background task finishes
processing_gate.set()
await asyncio.sleep(0.05)