mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-18 04:41:56 +00:00
fix(gateway): stream consumer first message drops thread context
Cherry-picked from PR #13077 commits: - 5500c7d8 fix(gateway): stream consumer first message drops thread context - e84403b9 test(gateway): add regression tests for stream consumer thread routing Fixes: Streaming first message drops thread/topic context in Feishu group topics, Slack threads, Telegram forum topics. Adds initial_reply_to_id ctor arg to GatewayStreamConsumer, threaded through _send_or_edit and _send_new_chunk. Also fixes Feishu _send_raw_message fallback path (reply -> create) to use receive_id_type='thread_id' so the new message lands in the correct topic instead of the main channel. Authored by hrygo via PR #13077 (re-attributed from the bot-authored salvage commit on the original branch).
This commit is contained in:
parent
6636fecd47
commit
ff14666cdc
4 changed files with 221 additions and 15 deletions
|
|
@ -4273,21 +4273,31 @@ class FeishuAdapter(BasePlatformAdapter):
|
|||
request = self._build_reply_message_request(effective_reply_to, body)
|
||||
return await asyncio.to_thread(self._client.im.v1.message.reply, request)
|
||||
|
||||
body = self._build_create_message_body(
|
||||
receive_id=chat_id,
|
||||
msg_type=msg_type,
|
||||
content=payload,
|
||||
uuid_value=str(uuid.uuid4()),
|
||||
)
|
||||
# Detect whether chat_id is a user open_id (DM) or a chat_id (group).
|
||||
# Feishu API expects receive_id_type="open_id" for user DMs (ou_ prefix)
|
||||
# and receive_id_type="chat_id" for group chats (oc_ prefix, which IS
|
||||
# the chat_id format — see https://open.feishu.cn/document/).
|
||||
if chat_id.startswith("ou_"):
|
||||
receive_id_type = "open_id"
|
||||
# For topic/thread messages that fell back from reply→create, use
|
||||
# thread_id as receive_id so the message lands in the topic instead of
|
||||
# the main chat.
|
||||
_thread_id = (metadata or {}).get("thread_id")
|
||||
if _thread_id:
|
||||
body = self._build_create_message_body(
|
||||
receive_id=_thread_id,
|
||||
msg_type=msg_type,
|
||||
content=payload,
|
||||
uuid_value=str(uuid.uuid4()),
|
||||
)
|
||||
request = self._build_create_message_request("thread_id", body)
|
||||
else:
|
||||
receive_id_type = "chat_id"
|
||||
request = self._build_create_message_request(receive_id_type, body)
|
||||
body = self._build_create_message_body(
|
||||
receive_id=chat_id,
|
||||
msg_type=msg_type,
|
||||
content=payload,
|
||||
uuid_value=str(uuid.uuid4()),
|
||||
)
|
||||
# Detect whether chat_id is a user open_id (DM) or a chat_id (group).
|
||||
if chat_id.startswith("ou_"):
|
||||
receive_id_type = "open_id"
|
||||
else:
|
||||
receive_id_type = "chat_id"
|
||||
request = self._build_create_message_request(receive_id_type, body)
|
||||
return await asyncio.to_thread(self._client.im.v1.message.create, request)
|
||||
|
||||
@staticmethod
|
||||
|
|
|
|||
|
|
@ -13891,6 +13891,7 @@ class GatewayRunner:
|
|||
chat_id=source.chat_id,
|
||||
config=_consumer_cfg,
|
||||
metadata=_thread_metadata,
|
||||
initial_reply_to_id=event_message_id,
|
||||
)
|
||||
except Exception as _sc_err:
|
||||
logger.debug("Proxy: could not set up stream consumer: %s", _sc_err)
|
||||
|
|
@ -14716,6 +14717,7 @@ class GatewayRunner:
|
|||
if progress_queue is not None
|
||||
else None
|
||||
),
|
||||
initial_reply_to_id=event_message_id,
|
||||
)
|
||||
if _want_stream_deltas:
|
||||
def _stream_delta_cb(text: str) -> None:
|
||||
|
|
|
|||
|
|
@ -92,6 +92,7 @@ class GatewayStreamConsumer:
|
|||
config: Optional[StreamConsumerConfig] = None,
|
||||
metadata: Optional[dict] = None,
|
||||
on_new_message: Optional[callable] = None,
|
||||
initial_reply_to_id: Optional[str] = None,
|
||||
):
|
||||
self.adapter = adapter
|
||||
self.chat_id = chat_id
|
||||
|
|
@ -105,6 +106,7 @@ class GatewayStreamConsumer:
|
|||
# the content, not edit the old bubble above it.
|
||||
# Called with no arguments. Exceptions are swallowed.
|
||||
self._on_new_message = on_new_message
|
||||
self._initial_reply_to_id = initial_reply_to_id
|
||||
self._queue: queue.Queue = queue.Queue()
|
||||
self._accumulated = ""
|
||||
self._message_id: Optional[str] = None
|
||||
|
|
@ -1004,10 +1006,12 @@ class GatewayStreamConsumer:
|
|||
# The final response will be sent by the fallback path.
|
||||
return False
|
||||
else:
|
||||
# First message — send new
|
||||
# First message — send new, threaded to the original user message
|
||||
# so it lands in the correct topic/thread.
|
||||
result = await self.adapter.send(
|
||||
chat_id=self.chat_id,
|
||||
content=text,
|
||||
reply_to=self._initial_reply_to_id,
|
||||
metadata=self.metadata,
|
||||
)
|
||||
if result.success:
|
||||
|
|
|
|||
190
tests/gateway/test_stream_consumer_thread_routing.py
Normal file
190
tests/gateway/test_stream_consumer_thread_routing.py
Normal file
|
|
@ -0,0 +1,190 @@
|
|||
"""Regression tests for stream consumer thread/topic routing fix.
|
||||
|
||||
Verifies that GatewayStreamConsumer correctly passes reply_to on the first
|
||||
message send, ensuring messages land in the correct topic/thread instead of
|
||||
the main group chat.
|
||||
|
||||
Covers: #6969, #9916, #7355
|
||||
"""
|
||||
import asyncio
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
from types import SimpleNamespace
|
||||
|
||||
import pytest
|
||||
|
||||
from gateway.stream_consumer import (
|
||||
GatewayStreamConsumer,
|
||||
StreamConsumerConfig,
|
||||
)
|
||||
|
||||
|
||||
def _make_adapter(send_result=None, edit_result=None, max_length=4096):
|
||||
adapter = MagicMock()
|
||||
adapter.send = AsyncMock(
|
||||
return_value=send_result or SimpleNamespace(success=True, message_id="msg_1")
|
||||
)
|
||||
adapter.edit_message = AsyncMock(
|
||||
return_value=edit_result or SimpleNamespace(success=True)
|
||||
)
|
||||
adapter.MAX_MESSAGE_LENGTH = max_length
|
||||
return adapter
|
||||
|
||||
|
||||
class TestInitialReplyToId:
|
||||
"""Verify initial_reply_to_id is passed as reply_to on first send."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_first_send_uses_initial_reply_to_id(self):
|
||||
"""When initial_reply_to_id is set, first adapter.send() should
|
||||
include reply_to=initial_reply_to_id."""
|
||||
adapter = _make_adapter()
|
||||
consumer = GatewayStreamConsumer(
|
||||
adapter,
|
||||
"chat_123",
|
||||
metadata={"thread_id": "omt_topic123"},
|
||||
initial_reply_to_id="om_user_msg_456",
|
||||
)
|
||||
await consumer._send_or_edit("Hello world")
|
||||
|
||||
adapter.send.assert_called_once()
|
||||
call_kwargs = adapter.send.call_args[1]
|
||||
assert call_kwargs["reply_to"] == "om_user_msg_456", (
|
||||
"First send should pass initial_reply_to_id as reply_to"
|
||||
)
|
||||
assert call_kwargs["chat_id"] == "chat_123"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_first_send_without_initial_reply_to_id(self):
|
||||
"""When initial_reply_to_id is None, first send should have
|
||||
reply_to=None (backward compatible)."""
|
||||
adapter = _make_adapter()
|
||||
consumer = GatewayStreamConsumer(
|
||||
adapter,
|
||||
"chat_123",
|
||||
)
|
||||
await consumer._send_or_edit("Hello world")
|
||||
|
||||
adapter.send.assert_called_once()
|
||||
call_kwargs = adapter.send.call_args[1]
|
||||
assert call_kwargs.get("reply_to") is None
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_subsequent_edits_ignore_initial_reply_to_id(self):
|
||||
"""After first send, edits should use message_id, not initial_reply_to_id."""
|
||||
adapter = _make_adapter()
|
||||
consumer = GatewayStreamConsumer(
|
||||
adapter,
|
||||
"chat_123",
|
||||
metadata={"thread_id": "omt_topic123"},
|
||||
initial_reply_to_id="om_user_msg_456",
|
||||
)
|
||||
|
||||
# First send
|
||||
await consumer._send_or_edit("Hello world")
|
||||
assert adapter.send.call_count == 1
|
||||
|
||||
# Second call should edit, not send
|
||||
await consumer._send_or_edit("Hello world updated")
|
||||
assert adapter.send.call_count == 1, "Should edit, not send again"
|
||||
adapter.edit_message.assert_called_once()
|
||||
edit_kwargs = adapter.edit_message.call_args[1]
|
||||
assert edit_kwargs["message_id"] == "msg_1"
|
||||
assert edit_kwargs["chat_id"] == "chat_123"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_metadata_passed_on_first_send(self):
|
||||
"""Metadata (containing thread_id) should be forwarded on first send."""
|
||||
adapter = _make_adapter()
|
||||
metadata = {"thread_id": "omt_topic789"}
|
||||
consumer = GatewayStreamConsumer(
|
||||
adapter,
|
||||
"chat_123",
|
||||
metadata=metadata,
|
||||
initial_reply_to_id="om_msg_000",
|
||||
)
|
||||
await consumer._send_or_edit("Test")
|
||||
|
||||
call_kwargs = adapter.send.call_args[1]
|
||||
assert call_kwargs["metadata"] == metadata
|
||||
|
||||
|
||||
class TestFeishuFallbackThreadRouting:
|
||||
"""Verify FeishuAdapter._send_raw_message routes to topic on fallback."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_create_uses_thread_id_when_available(self):
|
||||
"""When reply_to=None and metadata has thread_id, message.create
|
||||
should use receive_id_type='thread_id'."""
|
||||
from gateway.platforms.feishu import FeishuAdapter
|
||||
|
||||
# We test the _send_raw_message method directly by mocking the client
|
||||
adapter = MagicMock(spec=FeishuAdapter)
|
||||
|
||||
# Set up the real _send_raw_message logic manually
|
||||
mock_client = MagicMock()
|
||||
mock_create_response = SimpleNamespace(
|
||||
success=lambda: True,
|
||||
data=SimpleNamespace(message_id="new_msg_1"),
|
||||
)
|
||||
mock_client.im.v1.message.create = MagicMock(return_value=mock_create_response)
|
||||
|
||||
# Use the real implementation path
|
||||
adapter._client = mock_client
|
||||
adapter._build_create_message_body = FeishuAdapter._build_create_message_body
|
||||
adapter._build_create_message_request = FeishuAdapter._build_create_message_request
|
||||
|
||||
# Call _send_raw_message with reply_to=None and thread_id in metadata
|
||||
import json
|
||||
result = await FeishuAdapter._send_raw_message(
|
||||
adapter,
|
||||
chat_id="oc_main_chat",
|
||||
msg_type="text",
|
||||
payload=json.dumps({"text": "hello"}),
|
||||
reply_to=None,
|
||||
metadata={"thread_id": "omt_topic_abc"},
|
||||
)
|
||||
|
||||
# Verify message.create was called (not message.reply)
|
||||
mock_client.im.v1.message.create.assert_called_once()
|
||||
|
||||
# The request should have receive_id_type="thread_id"
|
||||
call_args = mock_client.im.v1.message.create.call_args[0][0]
|
||||
body = call_args.body
|
||||
# receive_id should be the thread_id, not the chat_id
|
||||
import json as _json
|
||||
body_dict = _json.loads(body) if isinstance(body, str) else body
|
||||
# The body is a CreateMessageRequestBody — check its receive_id
|
||||
if hasattr(body, 'receive_id'):
|
||||
assert body.receive_id == "omt_topic_abc", (
|
||||
f"Expected receive_id='omt_topic_abc', got '{body.receive_id}'"
|
||||
)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_create_uses_chat_id_when_no_thread(self):
|
||||
"""When reply_to=None and metadata has no thread_id, message.create
|
||||
should use receive_id_type='chat_id' (original behavior)."""
|
||||
from gateway.platforms.feishu import FeishuAdapter
|
||||
|
||||
mock_client = MagicMock()
|
||||
mock_create_response = SimpleNamespace(
|
||||
success=lambda: True,
|
||||
data=SimpleNamespace(message_id="new_msg_1"),
|
||||
)
|
||||
mock_client.im.v1.message.create = MagicMock(return_value=mock_create_response)
|
||||
|
||||
adapter = MagicMock(spec=FeishuAdapter)
|
||||
adapter._client = mock_client
|
||||
adapter._build_create_message_body = FeishuAdapter._build_create_message_body
|
||||
adapter._build_create_message_request = FeishuAdapter._build_create_message_request
|
||||
|
||||
import json
|
||||
result = await FeishuAdapter._send_raw_message(
|
||||
adapter,
|
||||
chat_id="oc_main_chat",
|
||||
msg_type="text",
|
||||
payload=json.dumps({"text": "hello"}),
|
||||
reply_to=None,
|
||||
metadata=None,
|
||||
)
|
||||
|
||||
mock_client.im.v1.message.create.assert_called_once()
|
||||
Loading…
Add table
Add a link
Reference in a new issue