fix(feishu): keep topic replies in threads

Route Feishu topic progress, status, approval, stream, and fallback messages through threaded replies by preserving the originating message id as the reply target. Add regressions for tool progress topic metadata and Feishu metadata-driven reply routing.
This commit is contained in:
Yuqian 2026-05-04 22:28:22 +08:00 committed by kshitij
parent 48c241840a
commit 441ef75d15
4 changed files with 121 additions and 6 deletions

View file

@ -4089,15 +4089,18 @@ class FeishuAdapter(BasePlatformAdapter):
reply_to: Optional[str], reply_to: Optional[str],
metadata: Optional[Dict[str, Any]], metadata: Optional[Dict[str, Any]],
) -> Any: ) -> Any:
effective_reply_to = reply_to
if not effective_reply_to and metadata and metadata.get("thread_id"):
effective_reply_to = metadata.get("reply_to_message_id") or metadata.get("reply_to")
reply_in_thread = bool((metadata or {}).get("thread_id")) reply_in_thread = bool((metadata or {}).get("thread_id"))
if reply_to: if effective_reply_to:
body = self._build_reply_message_body( body = self._build_reply_message_body(
content=payload, content=payload,
msg_type=msg_type, msg_type=msg_type,
reply_in_thread=reply_in_thread, reply_in_thread=reply_in_thread,
uuid_value=str(uuid.uuid4()), uuid_value=str(uuid.uuid4()),
) )
request = self._build_reply_message_request(reply_to, body) request = self._build_reply_message_request(effective_reply_to, body)
return await asyncio.to_thread(self._client.im.v1.message.reply, request) return await asyncio.to_thread(self._client.im.v1.message.reply, request)
body = self._build_create_message_body( body = self._build_create_message_body(

View file

@ -12929,12 +12929,19 @@ class GatewayRunner:
# - Slack DM threading needs event_message_id fallback (reply thread) # - Slack DM threading needs event_message_id fallback (reply thread)
# - Telegram uses message_thread_id only for forum topics; passing a # - Telegram uses message_thread_id only for forum topics; passing a
# normal DM/group message id as thread_id causes send failures # normal DM/group message id as thread_id causes send failures
# - Feishu only honors reply_in_thread when sending a reply, so topic
# progress uses the triggering event message as the reply target
# - Other platforms should use explicit source.thread_id only # - Other platforms should use explicit source.thread_id only
if source.platform == Platform.SLACK: if source.platform == Platform.SLACK:
_progress_thread_id = source.thread_id or event_message_id _progress_thread_id = source.thread_id or event_message_id
else: else:
_progress_thread_id = source.thread_id _progress_thread_id = source.thread_id
_progress_metadata = {"thread_id": _progress_thread_id} if _progress_thread_id else None _progress_metadata = {"thread_id": _progress_thread_id} if _progress_thread_id else None
_progress_reply_to = (
event_message_id
if source.platform == Platform.FEISHU and source.thread_id and event_message_id
else None
)
async def send_progress_messages(): async def send_progress_messages():
if not progress_queue: if not progress_queue:
@ -13048,15 +13055,30 @@ class GatewayRunner:
adapter.name, adapter.name,
) )
can_edit = False can_edit = False
await adapter.send(chat_id=source.chat_id, content=msg, metadata=_progress_metadata) await adapter.send(
chat_id=source.chat_id,
content=msg,
reply_to=_progress_reply_to,
metadata=_progress_metadata,
)
else: else:
if can_edit: if can_edit:
# First tool: send all accumulated text as new message # First tool: send all accumulated text as new message
full_text = "\n".join(progress_lines) full_text = "\n".join(progress_lines)
result = await adapter.send(chat_id=source.chat_id, content=full_text, metadata=_progress_metadata) result = await adapter.send(
chat_id=source.chat_id,
content=full_text,
reply_to=_progress_reply_to,
metadata=_progress_metadata,
)
else: else:
# Editing unsupported: send just this line # Editing unsupported: send just this line
result = await adapter.send(chat_id=source.chat_id, content=msg, metadata=_progress_metadata) result = await adapter.send(
chat_id=source.chat_id,
content=msg,
reply_to=_progress_reply_to,
metadata=_progress_metadata,
)
if result.success and result.message_id: if result.success and result.message_id:
progress_msg_id = result.message_id progress_msg_id = result.message_id
@ -13157,6 +13179,13 @@ class GatewayRunner:
_status_adapter = self.adapters.get(source.platform) _status_adapter = self.adapters.get(source.platform)
_status_chat_id = source.chat_id _status_chat_id = source.chat_id
_status_thread_metadata = {"thread_id": _progress_thread_id} if _progress_thread_id else None _status_thread_metadata = {"thread_id": _progress_thread_id} if _progress_thread_id else None
if source.platform == Platform.FEISHU and source.thread_id and event_message_id:
# Feishu topics only keep messages inside the topic when they are
# sent via the reply API with reply_in_thread=true. Status/interim,
# approval, and stream-consumer paths usually only receive metadata,
# so carry the triggering message id as a Feishu-specific fallback.
_status_thread_metadata = dict(_status_thread_metadata or {})
_status_thread_metadata["reply_to_message_id"] = event_message_id
def _status_callback_sync(event_type: str, message: str) -> None: def _status_callback_sync(event_type: str, message: str) -> None:
if not _status_adapter or not _run_still_current(): if not _status_adapter or not _run_still_current():
@ -13300,7 +13329,7 @@ class GatewayRunner:
adapter=_adapter, adapter=_adapter,
chat_id=source.chat_id, chat_id=source.chat_id,
config=_consumer_cfg, config=_consumer_cfg,
metadata={"thread_id": _progress_thread_id} if _progress_thread_id else None, metadata=_status_thread_metadata,
on_new_message=( on_new_message=(
(lambda: progress_queue.put(("__reset__",))) (lambda: progress_queue.put(("__reset__",)))
if progress_queue is not None if progress_queue is not None

View file

@ -1962,6 +1962,45 @@ class TestAdapterBehavior(unittest.TestCase):
self.assertEqual(result.message_id, "om_reply") self.assertEqual(result.message_id, "om_reply")
self.assertTrue(captured["request"].request_body.reply_in_thread) self.assertTrue(captured["request"].request_body.reply_in_thread)
@patch.dict(os.environ, {}, clear=True)
def test_send_uses_metadata_reply_target_for_threaded_feishu_topic(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
captured = {}
class _MessageAPI:
def reply(self, request):
captured["request"] = request
return SimpleNamespace(
success=lambda: True,
data=SimpleNamespace(message_id="om_reply"),
)
adapter._client = SimpleNamespace(
im=SimpleNamespace(v1=SimpleNamespace(message=_MessageAPI()))
)
async def _direct(func, *args, **kwargs):
return func(*args, **kwargs)
with patch("gateway.platforms.feishu.asyncio.to_thread", side_effect=_direct):
result = asyncio.run(
adapter.send(
chat_id="oc_chat",
content="status update",
metadata={
"thread_id": "omt-thread",
"reply_to_message_id": "om_trigger",
},
)
)
self.assertTrue(result.success)
self.assertEqual(captured["request"].message_id, "om_trigger")
self.assertTrue(captured["request"].request_body.reply_in_thread)
@patch.dict(os.environ, {}, clear=True) @patch.dict(os.environ, {}, clear=True)
def test_send_retries_transient_failure(self): def test_send_retries_transient_failure(self):
from gateway.config import PlatformConfig from gateway.config import PlatformConfig

View file

@ -303,6 +303,50 @@ async def test_run_agent_progress_uses_event_message_id_for_slack_dm(monkeypatch
assert all(call["metadata"] == {"thread_id": "1234567890.000001"} for call in adapter.typing) assert all(call["metadata"] == {"thread_id": "1234567890.000001"} for call in adapter.typing)
@pytest.mark.asyncio
async def test_run_agent_feishu_progress_replies_inside_existing_thread(monkeypatch, tmp_path):
"""Feishu needs reply_to plus reply_in_thread metadata for topic-scoped progress."""
monkeypatch.setenv("HERMES_TOOL_PROGRESS_MODE", "all")
fake_dotenv = types.ModuleType("dotenv")
fake_dotenv.load_dotenv = lambda *args, **kwargs: None
monkeypatch.setitem(sys.modules, "dotenv", fake_dotenv)
fake_run_agent = types.ModuleType("run_agent")
fake_run_agent.AIAgent = FakeAgent
monkeypatch.setitem(sys.modules, "run_agent", fake_run_agent)
adapter = ProgressCaptureAdapter(platform=Platform.FEISHU)
runner = _make_runner(adapter)
gateway_run = importlib.import_module("gateway.run")
monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path)
monkeypatch.setattr(gateway_run, "_resolve_runtime_agent_kwargs", lambda: {"api_key": "***"})
source = SessionSource(
platform=Platform.FEISHU,
chat_id="oc_chat",
chat_type="group",
thread_id="topic_17585",
)
result = await runner._run_agent(
message="hello",
context_prompt="",
history=[],
source=source,
session_id="sess-feishu-progress",
session_key="agent:main:feishu:group:oc_chat:topic_17585",
event_message_id="om_triggering_user_message",
)
assert result["final_response"] == "done"
assert adapter.sent
assert adapter.sent[0]["reply_to"] == "om_triggering_user_message"
assert adapter.sent[0]["metadata"] == {"thread_id": "topic_17585"}
assert adapter.edits
assert adapter.edits[0]["message_id"] == "progress-1"
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Preview truncation tests (all/new mode respects tool_preview_length) # Preview truncation tests (all/new mode respects tool_preview_length)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------