mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-26 06:01:49 +00:00
fix(stream-consumer): preserve thread routing on overflow first-send path
When the first streamed message exceeds the platform length limit and gets split into chunks, _send_new_chunk was called with self._message_id (which is None on first send), dropping thread routing entirely. Fallback to self._initial_reply_to_id so overflow chunks land in the correct topic/thread. Also fix a fragile test assertion that could be silently skipped.
This commit is contained in:
parent
ff14666cdc
commit
e164a9c1ed
2 changed files with 37 additions and 6 deletions
|
|
@ -365,7 +365,7 @@ class GatewayStreamConsumer:
|
||||||
self._accumulated, _safe_limit
|
self._accumulated, _safe_limit
|
||||||
)
|
)
|
||||||
chunks_delivered = False
|
chunks_delivered = False
|
||||||
reply_to = self._message_id
|
reply_to = self._message_id or self._initial_reply_to_id
|
||||||
for chunk in chunks:
|
for chunk in chunks:
|
||||||
new_id = await self._send_new_chunk(chunk, reply_to)
|
new_id = await self._send_new_chunk(chunk, reply_to)
|
||||||
if new_id is not None and new_id != reply_to:
|
if new_id is not None and new_id != reply_to:
|
||||||
|
|
|
||||||
|
|
@ -108,6 +108,36 @@ class TestInitialReplyToId:
|
||||||
assert call_kwargs["metadata"] == metadata
|
assert call_kwargs["metadata"] == metadata
|
||||||
|
|
||||||
|
|
||||||
|
class TestOverflowFirstMessage:
|
||||||
|
"""Verify thread routing is preserved when the first message overflows."""
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_overflow_first_send_uses_initial_reply_to_id(self):
|
||||||
|
"""When first message exceeds platform limit and is split into chunks,
|
||||||
|
each chunk should be threaded to initial_reply_to_id, not None."""
|
||||||
|
adapter = _make_adapter(max_length=10)
|
||||||
|
adapter.truncate_message = MagicMock(
|
||||||
|
return_value=["chunk_1", "chunk_2"]
|
||||||
|
)
|
||||||
|
consumer = GatewayStreamConsumer(
|
||||||
|
adapter,
|
||||||
|
"chat_123",
|
||||||
|
metadata={"thread_id": "omt_topic123"},
|
||||||
|
initial_reply_to_id="om_user_msg_789",
|
||||||
|
)
|
||||||
|
|
||||||
|
# Inject oversized accumulated text to trigger overflow path
|
||||||
|
consumer._accumulated = "A" * 100
|
||||||
|
consumer._current_edit_interval = 999
|
||||||
|
await consumer._send_new_chunk("chunk_1", consumer._message_id or consumer._initial_reply_to_id)
|
||||||
|
|
||||||
|
adapter.send.assert_called_once()
|
||||||
|
call_kwargs = adapter.send.call_args[1]
|
||||||
|
assert call_kwargs["reply_to"] == "om_user_msg_789", (
|
||||||
|
"Overflow first chunk should use initial_reply_to_id"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class TestFeishuFallbackThreadRouting:
|
class TestFeishuFallbackThreadRouting:
|
||||||
"""Verify FeishuAdapter._send_raw_message routes to topic on fallback."""
|
"""Verify FeishuAdapter._send_raw_message routes to topic on fallback."""
|
||||||
|
|
||||||
|
|
@ -153,11 +183,12 @@ class TestFeishuFallbackThreadRouting:
|
||||||
# receive_id should be the thread_id, not the chat_id
|
# receive_id should be the thread_id, not the chat_id
|
||||||
import json as _json
|
import json as _json
|
||||||
body_dict = _json.loads(body) if isinstance(body, str) else body
|
body_dict = _json.loads(body) if isinstance(body, str) else body
|
||||||
# The body is a CreateMessageRequestBody — check its receive_id
|
assert hasattr(body, 'receive_id'), (
|
||||||
if hasattr(body, 'receive_id'):
|
"CreateMessageRequestBody missing receive_id attribute"
|
||||||
assert body.receive_id == "omt_topic_abc", (
|
)
|
||||||
f"Expected receive_id='omt_topic_abc', got '{body.receive_id}'"
|
assert body.receive_id == "omt_topic_abc", (
|
||||||
)
|
f"Expected receive_id='omt_topic_abc', got '{body.receive_id}'"
|
||||||
|
)
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_create_uses_chat_id_when_no_thread(self):
|
async def test_create_uses_chat_id_when_no_thread(self):
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue