diff --git a/gateway/platforms/feishu.py b/gateway/platforms/feishu.py index 0470aaa2665..46604fa1e30 100644 --- a/gateway/platforms/feishu.py +++ b/gateway/platforms/feishu.py @@ -4273,21 +4273,31 @@ class FeishuAdapter(BasePlatformAdapter): request = self._build_reply_message_request(effective_reply_to, body) return await asyncio.to_thread(self._client.im.v1.message.reply, request) - body = self._build_create_message_body( - receive_id=chat_id, - msg_type=msg_type, - content=payload, - uuid_value=str(uuid.uuid4()), - ) - # Detect whether chat_id is a user open_id (DM) or a chat_id (group). - # Feishu API expects receive_id_type="open_id" for user DMs (ou_ prefix) - # and receive_id_type="chat_id" for group chats (oc_ prefix, which IS - # the chat_id format — see https://open.feishu.cn/document/). - if chat_id.startswith("ou_"): - receive_id_type = "open_id" + # For topic/thread messages that fell back from reply→create, use + # thread_id as receive_id so the message lands in the topic instead of + # the main chat. + _thread_id = (metadata or {}).get("thread_id") + if _thread_id: + body = self._build_create_message_body( + receive_id=_thread_id, + msg_type=msg_type, + content=payload, + uuid_value=str(uuid.uuid4()), + ) + request = self._build_create_message_request("thread_id", body) else: - receive_id_type = "chat_id" - request = self._build_create_message_request(receive_id_type, body) + body = self._build_create_message_body( + receive_id=chat_id, + msg_type=msg_type, + content=payload, + uuid_value=str(uuid.uuid4()), + ) + # Detect whether chat_id is a user open_id (DM) or a chat_id (group). + if chat_id.startswith("ou_"): + receive_id_type = "open_id" + else: + receive_id_type = "chat_id" + request = self._build_create_message_request(receive_id_type, body) return await asyncio.to_thread(self._client.im.v1.message.create, request) @staticmethod diff --git a/gateway/run.py b/gateway/run.py index a3eeedcd5a8..6522c4e9706 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -13891,6 +13891,7 @@ class GatewayRunner: chat_id=source.chat_id, config=_consumer_cfg, metadata=_thread_metadata, + initial_reply_to_id=event_message_id, ) except Exception as _sc_err: logger.debug("Proxy: could not set up stream consumer: %s", _sc_err) @@ -14716,6 +14717,7 @@ class GatewayRunner: if progress_queue is not None else None ), + initial_reply_to_id=event_message_id, ) if _want_stream_deltas: def _stream_delta_cb(text: str) -> None: diff --git a/gateway/stream_consumer.py b/gateway/stream_consumer.py index 918c49ce85a..e99eadf72d8 100644 --- a/gateway/stream_consumer.py +++ b/gateway/stream_consumer.py @@ -92,6 +92,7 @@ class GatewayStreamConsumer: config: Optional[StreamConsumerConfig] = None, metadata: Optional[dict] = None, on_new_message: Optional[callable] = None, + initial_reply_to_id: Optional[str] = None, ): self.adapter = adapter self.chat_id = chat_id @@ -105,6 +106,7 @@ class GatewayStreamConsumer: # the content, not edit the old bubble above it. # Called with no arguments. Exceptions are swallowed. self._on_new_message = on_new_message + self._initial_reply_to_id = initial_reply_to_id self._queue: queue.Queue = queue.Queue() self._accumulated = "" self._message_id: Optional[str] = None @@ -1004,10 +1006,12 @@ class GatewayStreamConsumer: # The final response will be sent by the fallback path. return False else: - # First message — send new + # First message — send new, threaded to the original user message + # so it lands in the correct topic/thread. result = await self.adapter.send( chat_id=self.chat_id, content=text, + reply_to=self._initial_reply_to_id, metadata=self.metadata, ) if result.success: diff --git a/tests/gateway/test_stream_consumer_thread_routing.py b/tests/gateway/test_stream_consumer_thread_routing.py new file mode 100644 index 00000000000..49ec28819e3 --- /dev/null +++ b/tests/gateway/test_stream_consumer_thread_routing.py @@ -0,0 +1,190 @@ +"""Regression tests for stream consumer thread/topic routing fix. + +Verifies that GatewayStreamConsumer correctly passes reply_to on the first +message send, ensuring messages land in the correct topic/thread instead of +the main group chat. + +Covers: #6969, #9916, #7355 +""" +import asyncio +from unittest.mock import AsyncMock, MagicMock, patch +from types import SimpleNamespace + +import pytest + +from gateway.stream_consumer import ( + GatewayStreamConsumer, + StreamConsumerConfig, +) + + +def _make_adapter(send_result=None, edit_result=None, max_length=4096): + adapter = MagicMock() + adapter.send = AsyncMock( + return_value=send_result or SimpleNamespace(success=True, message_id="msg_1") + ) + adapter.edit_message = AsyncMock( + return_value=edit_result or SimpleNamespace(success=True) + ) + adapter.MAX_MESSAGE_LENGTH = max_length + return adapter + + +class TestInitialReplyToId: + """Verify initial_reply_to_id is passed as reply_to on first send.""" + + @pytest.mark.asyncio + async def test_first_send_uses_initial_reply_to_id(self): + """When initial_reply_to_id is set, first adapter.send() should + include reply_to=initial_reply_to_id.""" + adapter = _make_adapter() + consumer = GatewayStreamConsumer( + adapter, + "chat_123", + metadata={"thread_id": "omt_topic123"}, + initial_reply_to_id="om_user_msg_456", + ) + await consumer._send_or_edit("Hello world") + + adapter.send.assert_called_once() + call_kwargs = adapter.send.call_args[1] + assert call_kwargs["reply_to"] == "om_user_msg_456", ( + "First send should pass initial_reply_to_id as reply_to" + ) + assert call_kwargs["chat_id"] == "chat_123" + + @pytest.mark.asyncio + async def test_first_send_without_initial_reply_to_id(self): + """When initial_reply_to_id is None, first send should have + reply_to=None (backward compatible).""" + adapter = _make_adapter() + consumer = GatewayStreamConsumer( + adapter, + "chat_123", + ) + await consumer._send_or_edit("Hello world") + + adapter.send.assert_called_once() + call_kwargs = adapter.send.call_args[1] + assert call_kwargs.get("reply_to") is None + + @pytest.mark.asyncio + async def test_subsequent_edits_ignore_initial_reply_to_id(self): + """After first send, edits should use message_id, not initial_reply_to_id.""" + adapter = _make_adapter() + consumer = GatewayStreamConsumer( + adapter, + "chat_123", + metadata={"thread_id": "omt_topic123"}, + initial_reply_to_id="om_user_msg_456", + ) + + # First send + await consumer._send_or_edit("Hello world") + assert adapter.send.call_count == 1 + + # Second call should edit, not send + await consumer._send_or_edit("Hello world updated") + assert adapter.send.call_count == 1, "Should edit, not send again" + adapter.edit_message.assert_called_once() + edit_kwargs = adapter.edit_message.call_args[1] + assert edit_kwargs["message_id"] == "msg_1" + assert edit_kwargs["chat_id"] == "chat_123" + + @pytest.mark.asyncio + async def test_metadata_passed_on_first_send(self): + """Metadata (containing thread_id) should be forwarded on first send.""" + adapter = _make_adapter() + metadata = {"thread_id": "omt_topic789"} + consumer = GatewayStreamConsumer( + adapter, + "chat_123", + metadata=metadata, + initial_reply_to_id="om_msg_000", + ) + await consumer._send_or_edit("Test") + + call_kwargs = adapter.send.call_args[1] + assert call_kwargs["metadata"] == metadata + + +class TestFeishuFallbackThreadRouting: + """Verify FeishuAdapter._send_raw_message routes to topic on fallback.""" + + @pytest.mark.asyncio + async def test_create_uses_thread_id_when_available(self): + """When reply_to=None and metadata has thread_id, message.create + should use receive_id_type='thread_id'.""" + from gateway.platforms.feishu import FeishuAdapter + + # We test the _send_raw_message method directly by mocking the client + adapter = MagicMock(spec=FeishuAdapter) + + # Set up the real _send_raw_message logic manually + mock_client = MagicMock() + mock_create_response = SimpleNamespace( + success=lambda: True, + data=SimpleNamespace(message_id="new_msg_1"), + ) + mock_client.im.v1.message.create = MagicMock(return_value=mock_create_response) + + # Use the real implementation path + adapter._client = mock_client + adapter._build_create_message_body = FeishuAdapter._build_create_message_body + adapter._build_create_message_request = FeishuAdapter._build_create_message_request + + # Call _send_raw_message with reply_to=None and thread_id in metadata + import json + result = await FeishuAdapter._send_raw_message( + adapter, + chat_id="oc_main_chat", + msg_type="text", + payload=json.dumps({"text": "hello"}), + reply_to=None, + metadata={"thread_id": "omt_topic_abc"}, + ) + + # Verify message.create was called (not message.reply) + mock_client.im.v1.message.create.assert_called_once() + + # The request should have receive_id_type="thread_id" + call_args = mock_client.im.v1.message.create.call_args[0][0] + body = call_args.body + # receive_id should be the thread_id, not the chat_id + import json as _json + body_dict = _json.loads(body) if isinstance(body, str) else body + # The body is a CreateMessageRequestBody — check its receive_id + if hasattr(body, 'receive_id'): + assert body.receive_id == "omt_topic_abc", ( + f"Expected receive_id='omt_topic_abc', got '{body.receive_id}'" + ) + + @pytest.mark.asyncio + async def test_create_uses_chat_id_when_no_thread(self): + """When reply_to=None and metadata has no thread_id, message.create + should use receive_id_type='chat_id' (original behavior).""" + from gateway.platforms.feishu import FeishuAdapter + + mock_client = MagicMock() + mock_create_response = SimpleNamespace( + success=lambda: True, + data=SimpleNamespace(message_id="new_msg_1"), + ) + mock_client.im.v1.message.create = MagicMock(return_value=mock_create_response) + + adapter = MagicMock(spec=FeishuAdapter) + adapter._client = mock_client + adapter._build_create_message_body = FeishuAdapter._build_create_message_body + adapter._build_create_message_request = FeishuAdapter._build_create_message_request + + import json + result = await FeishuAdapter._send_raw_message( + adapter, + chat_id="oc_main_chat", + msg_type="text", + payload=json.dumps({"text": "hello"}), + reply_to=None, + metadata=None, + ) + + mock_client.im.v1.message.create.assert_called_once()