diff --git a/gateway/platforms/feishu.py b/gateway/platforms/feishu.py index 01b1c3a14b..0b55e2b786 100644 --- a/gateway/platforms/feishu.py +++ b/gateway/platforms/feishu.py @@ -1172,6 +1172,7 @@ class FeishuAdapter(BasePlatformAdapter): ) def _apply_settings(self, settings: FeishuAdapterSettings) -> None: + self._last_inbound_message_ids: dict = {} # chat_id -> message_id self._app_id = settings.app_id self._app_secret = settings.app_secret self._domain_name = settings.domain_name @@ -2186,6 +2187,9 @@ class FeishuAdapter(BasePlatformAdapter): ) chat_id = getattr(message, "chat_id", "") or "" + # Track last inbound message_id per chat for auto-thread reply + if chat_id and message_id: + self._last_inbound_message_ids[chat_id] = message_id chat_info = await self.get_chat_info(chat_id) sender_profile = await self._resolve_sender_profile(sender_id) source = self.build_source( @@ -3272,7 +3276,11 @@ class FeishuAdapter(BasePlatformAdapter): reply_to: Optional[str], metadata: Optional[Dict[str, Any]], ) -> Any: - reply_in_thread = bool((metadata or {}).get("thread_id")) + _auto_thread = os.getenv("FEISHU_AUTO_THREAD", "false").lower() in ("true", "1", "yes") + reply_in_thread = bool((metadata or {}).get("thread_id")) or _auto_thread + # When auto_thread is on and no reply_to, use last inbound message_id + if _auto_thread and not reply_to: + reply_to = self._last_inbound_message_ids.get(chat_id) if reply_to: body = self._build_reply_message_body( content=payload, diff --git a/gateway/run.py b/gateway/run.py index 2eb745f92b..89b9ece4aa 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -7840,11 +7840,14 @@ class GatewayRunner: buffer_threshold=_scfg.buffer_threshold, cursor=_effective_cursor, ) + _feishu_auto_thread = os.getenv("FEISHU_AUTO_THREAD", "false").lower() in ("true", "1", "yes") + _sc_reply_to = event_message_id if (_feishu_auto_thread and source.platform == Platform.FEISHU) else None _stream_consumer = GatewayStreamConsumer( adapter=_adapter, chat_id=source.chat_id, config=_consumer_cfg, metadata=_thread_metadata, + reply_to=_sc_reply_to, ) except Exception as _sc_err: logger.debug("Proxy: could not set up stream consumer: %s", _sc_err) @@ -8406,11 +8409,14 @@ class GatewayRunner: buffer_threshold=_scfg.buffer_threshold, cursor=_effective_cursor, ) + _feishu_auto_thread2 = os.getenv("FEISHU_AUTO_THREAD", "false").lower() in ("true", "1", "yes") + _sc_reply_to2 = event_message_id if (_feishu_auto_thread2 and source.platform == Platform.FEISHU) else None _stream_consumer = GatewayStreamConsumer( adapter=_adapter, chat_id=source.chat_id, config=_consumer_cfg, metadata={"thread_id": _progress_thread_id} if _progress_thread_id else None, + reply_to=_sc_reply_to2, ) if _want_stream_deltas: _stream_delta_cb = _stream_consumer.on_delta diff --git a/gateway/stream_consumer.py b/gateway/stream_consumer.py index e6d96c802d..d584b196b6 100644 --- a/gateway/stream_consumer.py +++ b/gateway/stream_consumer.py @@ -82,6 +82,7 @@ class GatewayStreamConsumer: chat_id: str, config: Optional[StreamConsumerConfig] = None, metadata: Optional[dict] = None, + reply_to: Optional[str] = None, ): self.adapter = adapter self.chat_id = chat_id @@ -89,7 +90,7 @@ class GatewayStreamConsumer: self.metadata = metadata self._queue: queue.Queue = queue.Queue() self._accumulated = "" - self._message_id: Optional[str] = None + self._message_id: Optional[str] = reply_to self._already_sent = False self._edit_supported = True # Disabled when progressive edits are no longer usable self._last_edit_time = 0.0