diff --git a/gateway/stream_consumer.py b/gateway/stream_consumer.py index e99eadf72d8..0539b825b83 100644 --- a/gateway/stream_consumer.py +++ b/gateway/stream_consumer.py @@ -365,7 +365,7 @@ class GatewayStreamConsumer: self._accumulated, _safe_limit ) chunks_delivered = False - reply_to = self._message_id + reply_to = self._message_id or self._initial_reply_to_id for chunk in chunks: new_id = await self._send_new_chunk(chunk, reply_to) if new_id is not None and new_id != reply_to: diff --git a/tests/gateway/test_stream_consumer_thread_routing.py b/tests/gateway/test_stream_consumer_thread_routing.py index 49ec28819e3..0fe7d13c845 100644 --- a/tests/gateway/test_stream_consumer_thread_routing.py +++ b/tests/gateway/test_stream_consumer_thread_routing.py @@ -108,6 +108,36 @@ class TestInitialReplyToId: assert call_kwargs["metadata"] == metadata +class TestOverflowFirstMessage: + """Verify thread routing is preserved when the first message overflows.""" + + @pytest.mark.asyncio + async def test_overflow_first_send_uses_initial_reply_to_id(self): + """When first message exceeds platform limit and is split into chunks, + each chunk should be threaded to initial_reply_to_id, not None.""" + adapter = _make_adapter(max_length=10) + adapter.truncate_message = MagicMock( + return_value=["chunk_1", "chunk_2"] + ) + consumer = GatewayStreamConsumer( + adapter, + "chat_123", + metadata={"thread_id": "omt_topic123"}, + initial_reply_to_id="om_user_msg_789", + ) + + # Inject oversized accumulated text to trigger overflow path + consumer._accumulated = "A" * 100 + consumer._current_edit_interval = 999 + await consumer._send_new_chunk("chunk_1", consumer._message_id or consumer._initial_reply_to_id) + + adapter.send.assert_called_once() + call_kwargs = adapter.send.call_args[1] + assert call_kwargs["reply_to"] == "om_user_msg_789", ( + "Overflow first chunk should use initial_reply_to_id" + ) + + class TestFeishuFallbackThreadRouting: """Verify FeishuAdapter._send_raw_message routes to topic on fallback.""" @@ -153,11 +183,12 @@ class TestFeishuFallbackThreadRouting: # receive_id should be the thread_id, not the chat_id import json as _json body_dict = _json.loads(body) if isinstance(body, str) else body - # The body is a CreateMessageRequestBody — check its receive_id - if hasattr(body, 'receive_id'): - assert body.receive_id == "omt_topic_abc", ( - f"Expected receive_id='omt_topic_abc', got '{body.receive_id}'" - ) + assert hasattr(body, 'receive_id'), ( + "CreateMessageRequestBody missing receive_id attribute" + ) + assert body.receive_id == "omt_topic_abc", ( + f"Expected receive_id='omt_topic_abc', got '{body.receive_id}'" + ) @pytest.mark.asyncio async def test_create_uses_chat_id_when_no_thread(self):