diff --git a/gateway/platforms/feishu.py b/gateway/platforms/feishu.py index e1528b9bca..2c2b6f8750 100644 --- a/gateway/platforms/feishu.py +++ b/gateway/platforms/feishu.py @@ -4089,15 +4089,18 @@ class FeishuAdapter(BasePlatformAdapter): reply_to: Optional[str], metadata: Optional[Dict[str, Any]], ) -> Any: + effective_reply_to = reply_to + if not effective_reply_to and metadata and metadata.get("thread_id"): + effective_reply_to = metadata.get("reply_to_message_id") or metadata.get("reply_to") reply_in_thread = bool((metadata or {}).get("thread_id")) - if reply_to: + if effective_reply_to: body = self._build_reply_message_body( content=payload, msg_type=msg_type, reply_in_thread=reply_in_thread, uuid_value=str(uuid.uuid4()), ) - request = self._build_reply_message_request(reply_to, body) + request = self._build_reply_message_request(effective_reply_to, body) return await asyncio.to_thread(self._client.im.v1.message.reply, request) body = self._build_create_message_body( diff --git a/gateway/run.py b/gateway/run.py index fe2ed84e6c..ff512205b8 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -12929,12 +12929,19 @@ class GatewayRunner: # - Slack DM threading needs event_message_id fallback (reply thread) # - Telegram uses message_thread_id only for forum topics; passing a # normal DM/group message id as thread_id causes send failures + # - Feishu only honors reply_in_thread when sending a reply, so topic + # progress uses the triggering event message as the reply target # - Other platforms should use explicit source.thread_id only if source.platform == Platform.SLACK: _progress_thread_id = source.thread_id or event_message_id else: _progress_thread_id = source.thread_id _progress_metadata = {"thread_id": _progress_thread_id} if _progress_thread_id else None + _progress_reply_to = ( + event_message_id + if source.platform == Platform.FEISHU and source.thread_id and event_message_id + else None + ) async def send_progress_messages(): if not progress_queue: @@ -13048,15 +13055,30 @@ class GatewayRunner: adapter.name, ) can_edit = False - await adapter.send(chat_id=source.chat_id, content=msg, metadata=_progress_metadata) + await adapter.send( + chat_id=source.chat_id, + content=msg, + reply_to=_progress_reply_to, + metadata=_progress_metadata, + ) else: if can_edit: # First tool: send all accumulated text as new message full_text = "\n".join(progress_lines) - result = await adapter.send(chat_id=source.chat_id, content=full_text, metadata=_progress_metadata) + result = await adapter.send( + chat_id=source.chat_id, + content=full_text, + reply_to=_progress_reply_to, + metadata=_progress_metadata, + ) else: # Editing unsupported: send just this line - result = await adapter.send(chat_id=source.chat_id, content=msg, metadata=_progress_metadata) + result = await adapter.send( + chat_id=source.chat_id, + content=msg, + reply_to=_progress_reply_to, + metadata=_progress_metadata, + ) if result.success and result.message_id: progress_msg_id = result.message_id @@ -13157,6 +13179,13 @@ class GatewayRunner: _status_adapter = self.adapters.get(source.platform) _status_chat_id = source.chat_id _status_thread_metadata = {"thread_id": _progress_thread_id} if _progress_thread_id else None + if source.platform == Platform.FEISHU and source.thread_id and event_message_id: + # Feishu topics only keep messages inside the topic when they are + # sent via the reply API with reply_in_thread=true. Status/interim, + # approval, and stream-consumer paths usually only receive metadata, + # so carry the triggering message id as a Feishu-specific fallback. + _status_thread_metadata = dict(_status_thread_metadata or {}) + _status_thread_metadata["reply_to_message_id"] = event_message_id def _status_callback_sync(event_type: str, message: str) -> None: if not _status_adapter or not _run_still_current(): @@ -13300,7 +13329,7 @@ class GatewayRunner: adapter=_adapter, chat_id=source.chat_id, config=_consumer_cfg, - metadata={"thread_id": _progress_thread_id} if _progress_thread_id else None, + metadata=_status_thread_metadata, on_new_message=( (lambda: progress_queue.put(("__reset__",))) if progress_queue is not None diff --git a/tests/gateway/test_feishu.py b/tests/gateway/test_feishu.py index f4ac80f2e1..63287d88cb 100644 --- a/tests/gateway/test_feishu.py +++ b/tests/gateway/test_feishu.py @@ -1962,6 +1962,45 @@ class TestAdapterBehavior(unittest.TestCase): self.assertEqual(result.message_id, "om_reply") self.assertTrue(captured["request"].request_body.reply_in_thread) + @patch.dict(os.environ, {}, clear=True) + def test_send_uses_metadata_reply_target_for_threaded_feishu_topic(self): + from gateway.config import PlatformConfig + from gateway.platforms.feishu import FeishuAdapter + + adapter = FeishuAdapter(PlatformConfig()) + captured = {} + + class _MessageAPI: + def reply(self, request): + captured["request"] = request + return SimpleNamespace( + success=lambda: True, + data=SimpleNamespace(message_id="om_reply"), + ) + + adapter._client = SimpleNamespace( + im=SimpleNamespace(v1=SimpleNamespace(message=_MessageAPI())) + ) + + async def _direct(func, *args, **kwargs): + return func(*args, **kwargs) + + with patch("gateway.platforms.feishu.asyncio.to_thread", side_effect=_direct): + result = asyncio.run( + adapter.send( + chat_id="oc_chat", + content="status update", + metadata={ + "thread_id": "omt-thread", + "reply_to_message_id": "om_trigger", + }, + ) + ) + + self.assertTrue(result.success) + self.assertEqual(captured["request"].message_id, "om_trigger") + self.assertTrue(captured["request"].request_body.reply_in_thread) + @patch.dict(os.environ, {}, clear=True) def test_send_retries_transient_failure(self): from gateway.config import PlatformConfig diff --git a/tests/gateway/test_run_progress_topics.py b/tests/gateway/test_run_progress_topics.py index 478a9e2773..fb52e1e586 100644 --- a/tests/gateway/test_run_progress_topics.py +++ b/tests/gateway/test_run_progress_topics.py @@ -303,6 +303,50 @@ async def test_run_agent_progress_uses_event_message_id_for_slack_dm(monkeypatch assert all(call["metadata"] == {"thread_id": "1234567890.000001"} for call in adapter.typing) +@pytest.mark.asyncio +async def test_run_agent_feishu_progress_replies_inside_existing_thread(monkeypatch, tmp_path): + """Feishu needs reply_to plus reply_in_thread metadata for topic-scoped progress.""" + monkeypatch.setenv("HERMES_TOOL_PROGRESS_MODE", "all") + + fake_dotenv = types.ModuleType("dotenv") + fake_dotenv.load_dotenv = lambda *args, **kwargs: None + monkeypatch.setitem(sys.modules, "dotenv", fake_dotenv) + + fake_run_agent = types.ModuleType("run_agent") + fake_run_agent.AIAgent = FakeAgent + monkeypatch.setitem(sys.modules, "run_agent", fake_run_agent) + + adapter = ProgressCaptureAdapter(platform=Platform.FEISHU) + runner = _make_runner(adapter) + gateway_run = importlib.import_module("gateway.run") + monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path) + monkeypatch.setattr(gateway_run, "_resolve_runtime_agent_kwargs", lambda: {"api_key": "***"}) + + source = SessionSource( + platform=Platform.FEISHU, + chat_id="oc_chat", + chat_type="group", + thread_id="topic_17585", + ) + + result = await runner._run_agent( + message="hello", + context_prompt="", + history=[], + source=source, + session_id="sess-feishu-progress", + session_key="agent:main:feishu:group:oc_chat:topic_17585", + event_message_id="om_triggering_user_message", + ) + + assert result["final_response"] == "done" + assert adapter.sent + assert adapter.sent[0]["reply_to"] == "om_triggering_user_message" + assert adapter.sent[0]["metadata"] == {"thread_id": "topic_17585"} + assert adapter.edits + assert adapter.edits[0]["message_id"] == "progress-1" + + # --------------------------------------------------------------------------- # Preview truncation tests (all/new mode respects tool_preview_length) # ---------------------------------------------------------------------------