From 441ef75d157d6308a9f14d42a7b0ec8566866ef8 Mon Sep 17 00:00:00 2001 From: Yuqian Date: Mon, 4 May 2026 22:28:22 +0800 Subject: [PATCH] fix(feishu): keep topic replies in threads Route Feishu topic progress, status, approval, stream, and fallback messages through threaded replies by preserving the originating message id as the reply target. Add regressions for tool progress topic metadata and Feishu metadata-driven reply routing. --- gateway/platforms/feishu.py | 7 ++-- gateway/run.py | 37 ++++++++++++++++--- tests/gateway/test_feishu.py | 39 ++++++++++++++++++++ tests/gateway/test_run_progress_topics.py | 44 +++++++++++++++++++++++ 4 files changed, 121 insertions(+), 6 deletions(-) diff --git a/gateway/platforms/feishu.py b/gateway/platforms/feishu.py index e1528b9bca..2c2b6f8750 100644 --- a/gateway/platforms/feishu.py +++ b/gateway/platforms/feishu.py @@ -4089,15 +4089,18 @@ class FeishuAdapter(BasePlatformAdapter): reply_to: Optional[str], metadata: Optional[Dict[str, Any]], ) -> Any: + effective_reply_to = reply_to + if not effective_reply_to and metadata and metadata.get("thread_id"): + effective_reply_to = metadata.get("reply_to_message_id") or metadata.get("reply_to") reply_in_thread = bool((metadata or {}).get("thread_id")) - if reply_to: + if effective_reply_to: body = self._build_reply_message_body( content=payload, msg_type=msg_type, reply_in_thread=reply_in_thread, uuid_value=str(uuid.uuid4()), ) - request = self._build_reply_message_request(reply_to, body) + request = self._build_reply_message_request(effective_reply_to, body) return await asyncio.to_thread(self._client.im.v1.message.reply, request) body = self._build_create_message_body( diff --git a/gateway/run.py b/gateway/run.py index fe2ed84e6c..ff512205b8 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -12929,12 +12929,19 @@ class GatewayRunner: # - Slack DM threading needs event_message_id fallback (reply thread) # - Telegram uses message_thread_id only for forum topics; passing a # normal DM/group message id as thread_id causes send failures + # - Feishu only honors reply_in_thread when sending a reply, so topic + # progress uses the triggering event message as the reply target # - Other platforms should use explicit source.thread_id only if source.platform == Platform.SLACK: _progress_thread_id = source.thread_id or event_message_id else: _progress_thread_id = source.thread_id _progress_metadata = {"thread_id": _progress_thread_id} if _progress_thread_id else None + _progress_reply_to = ( + event_message_id + if source.platform == Platform.FEISHU and source.thread_id and event_message_id + else None + ) async def send_progress_messages(): if not progress_queue: @@ -13048,15 +13055,30 @@ class GatewayRunner: adapter.name, ) can_edit = False - await adapter.send(chat_id=source.chat_id, content=msg, metadata=_progress_metadata) + await adapter.send( + chat_id=source.chat_id, + content=msg, + reply_to=_progress_reply_to, + metadata=_progress_metadata, + ) else: if can_edit: # First tool: send all accumulated text as new message full_text = "\n".join(progress_lines) - result = await adapter.send(chat_id=source.chat_id, content=full_text, metadata=_progress_metadata) + result = await adapter.send( + chat_id=source.chat_id, + content=full_text, + reply_to=_progress_reply_to, + metadata=_progress_metadata, + ) else: # Editing unsupported: send just this line - result = await adapter.send(chat_id=source.chat_id, content=msg, metadata=_progress_metadata) + result = await adapter.send( + chat_id=source.chat_id, + content=msg, + reply_to=_progress_reply_to, + metadata=_progress_metadata, + ) if result.success and result.message_id: progress_msg_id = result.message_id @@ -13157,6 +13179,13 @@ class GatewayRunner: _status_adapter = self.adapters.get(source.platform) _status_chat_id = source.chat_id _status_thread_metadata = {"thread_id": _progress_thread_id} if _progress_thread_id else None + if source.platform == Platform.FEISHU and source.thread_id and event_message_id: + # Feishu topics only keep messages inside the topic when they are + # sent via the reply API with reply_in_thread=true. Status/interim, + # approval, and stream-consumer paths usually only receive metadata, + # so carry the triggering message id as a Feishu-specific fallback. + _status_thread_metadata = dict(_status_thread_metadata or {}) + _status_thread_metadata["reply_to_message_id"] = event_message_id def _status_callback_sync(event_type: str, message: str) -> None: if not _status_adapter or not _run_still_current(): @@ -13300,7 +13329,7 @@ class GatewayRunner: adapter=_adapter, chat_id=source.chat_id, config=_consumer_cfg, - metadata={"thread_id": _progress_thread_id} if _progress_thread_id else None, + metadata=_status_thread_metadata, on_new_message=( (lambda: progress_queue.put(("__reset__",))) if progress_queue is not None diff --git a/tests/gateway/test_feishu.py b/tests/gateway/test_feishu.py index f4ac80f2e1..63287d88cb 100644 --- a/tests/gateway/test_feishu.py +++ b/tests/gateway/test_feishu.py @@ -1962,6 +1962,45 @@ class TestAdapterBehavior(unittest.TestCase): self.assertEqual(result.message_id, "om_reply") self.assertTrue(captured["request"].request_body.reply_in_thread) + @patch.dict(os.environ, {}, clear=True) + def test_send_uses_metadata_reply_target_for_threaded_feishu_topic(self): + from gateway.config import PlatformConfig + from gateway.platforms.feishu import FeishuAdapter + + adapter = FeishuAdapter(PlatformConfig()) + captured = {} + + class _MessageAPI: + def reply(self, request): + captured["request"] = request + return SimpleNamespace( + success=lambda: True, + data=SimpleNamespace(message_id="om_reply"), + ) + + adapter._client = SimpleNamespace( + im=SimpleNamespace(v1=SimpleNamespace(message=_MessageAPI())) + ) + + async def _direct(func, *args, **kwargs): + return func(*args, **kwargs) + + with patch("gateway.platforms.feishu.asyncio.to_thread", side_effect=_direct): + result = asyncio.run( + adapter.send( + chat_id="oc_chat", + content="status update", + metadata={ + "thread_id": "omt-thread", + "reply_to_message_id": "om_trigger", + }, + ) + ) + + self.assertTrue(result.success) + self.assertEqual(captured["request"].message_id, "om_trigger") + self.assertTrue(captured["request"].request_body.reply_in_thread) + @patch.dict(os.environ, {}, clear=True) def test_send_retries_transient_failure(self): from gateway.config import PlatformConfig diff --git a/tests/gateway/test_run_progress_topics.py b/tests/gateway/test_run_progress_topics.py index 478a9e2773..fb52e1e586 100644 --- a/tests/gateway/test_run_progress_topics.py +++ b/tests/gateway/test_run_progress_topics.py @@ -303,6 +303,50 @@ async def test_run_agent_progress_uses_event_message_id_for_slack_dm(monkeypatch assert all(call["metadata"] == {"thread_id": "1234567890.000001"} for call in adapter.typing) +@pytest.mark.asyncio +async def test_run_agent_feishu_progress_replies_inside_existing_thread(monkeypatch, tmp_path): + """Feishu needs reply_to plus reply_in_thread metadata for topic-scoped progress.""" + monkeypatch.setenv("HERMES_TOOL_PROGRESS_MODE", "all") + + fake_dotenv = types.ModuleType("dotenv") + fake_dotenv.load_dotenv = lambda *args, **kwargs: None + monkeypatch.setitem(sys.modules, "dotenv", fake_dotenv) + + fake_run_agent = types.ModuleType("run_agent") + fake_run_agent.AIAgent = FakeAgent + monkeypatch.setitem(sys.modules, "run_agent", fake_run_agent) + + adapter = ProgressCaptureAdapter(platform=Platform.FEISHU) + runner = _make_runner(adapter) + gateway_run = importlib.import_module("gateway.run") + monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path) + monkeypatch.setattr(gateway_run, "_resolve_runtime_agent_kwargs", lambda: {"api_key": "***"}) + + source = SessionSource( + platform=Platform.FEISHU, + chat_id="oc_chat", + chat_type="group", + thread_id="topic_17585", + ) + + result = await runner._run_agent( + message="hello", + context_prompt="", + history=[], + source=source, + session_id="sess-feishu-progress", + session_key="agent:main:feishu:group:oc_chat:topic_17585", + event_message_id="om_triggering_user_message", + ) + + assert result["final_response"] == "done" + assert adapter.sent + assert adapter.sent[0]["reply_to"] == "om_triggering_user_message" + assert adapter.sent[0]["metadata"] == {"thread_id": "topic_17585"} + assert adapter.edits + assert adapter.edits[0]["message_id"] == "progress-1" + + # --------------------------------------------------------------------------- # Preview truncation tests (all/new mode respects tool_preview_length) # ---------------------------------------------------------------------------