diff --git a/gateway/platforms/wecom.py b/gateway/platforms/wecom.py index b1c04befa..db02bde5d 100644 --- a/gateway/platforms/wecom.py +++ b/gateway/platforms/wecom.py @@ -143,6 +143,9 @@ class WeComAdapter(BasePlatformAdapter): """WeCom AI Bot adapter backed by a persistent WebSocket connection.""" MAX_MESSAGE_LENGTH = MAX_MESSAGE_LENGTH + # Threshold for detecting WeCom client-side message splits. + # When a chunk is near the 4000-char limit, a continuation is almost certain. + _SPLIT_THRESHOLD = 3900 def __init__(self, config: PlatformConfig): super().__init__(config, Platform.WECOM) @@ -172,6 +175,13 @@ class WeComAdapter(BasePlatformAdapter): self._seen_messages: Dict[str, float] = {} self._reply_req_ids: Dict[str, str] = {} + # Text batching: merge rapid successive messages (Telegram-style). + # WeCom clients split long messages around 4000 chars. + self._text_batch_delay_seconds = float(os.getenv("HERMES_WECOM_TEXT_BATCH_DELAY_SECONDS", "0.6")) + self._text_batch_split_delay_seconds = float(os.getenv("HERMES_WECOM_TEXT_BATCH_SPLIT_DELAY_SECONDS", "2.0")) + self._pending_text_batches: Dict[str, MessageEvent] = {} + self._pending_text_batch_tasks: Dict[str, asyncio.Task] = {} + # ------------------------------------------------------------------ # Connection lifecycle # ------------------------------------------------------------------ @@ -519,7 +529,82 @@ class WeComAdapter(BasePlatformAdapter): timestamp=datetime.now(tz=timezone.utc), ) - await self.handle_message(event) + # Only batch plain text messages — commands, media, etc. dispatch + # immediately since they won't be split by the WeCom client. + if message_type == MessageType.TEXT: + self._enqueue_text_event(event) + else: + await self.handle_message(event) + + # ------------------------------------------------------------------ + # Text message aggregation (handles WeCom client-side splits) + # ------------------------------------------------------------------ + + def _text_batch_key(self, event: MessageEvent) -> str: + """Session-scoped key for text message batching.""" + from gateway.session import build_session_key + return build_session_key( + event.source, + group_sessions_per_user=self.config.extra.get("group_sessions_per_user", True), + thread_sessions_per_user=self.config.extra.get("thread_sessions_per_user", False), + ) + + def _enqueue_text_event(self, event: MessageEvent) -> None: + """Buffer a text event and reset the flush timer. + + When WeCom splits a long user message at 4000 chars, the chunks + arrive within a few hundred milliseconds. This merges them into + a single event before dispatching. + """ + key = self._text_batch_key(event) + existing = self._pending_text_batches.get(key) + chunk_len = len(event.text or "") + if existing is None: + event._last_chunk_len = chunk_len # type: ignore[attr-defined] + self._pending_text_batches[key] = event + else: + if event.text: + existing.text = f"{existing.text}\n{event.text}" if existing.text else event.text + existing._last_chunk_len = chunk_len # type: ignore[attr-defined] + # Merge any media that might be attached + if event.media_urls: + existing.media_urls.extend(event.media_urls) + existing.media_types.extend(event.media_types) + + # Cancel any pending flush and restart the timer + prior_task = self._pending_text_batch_tasks.get(key) + if prior_task and not prior_task.done(): + prior_task.cancel() + self._pending_text_batch_tasks[key] = asyncio.create_task( + self._flush_text_batch(key) + ) + + async def _flush_text_batch(self, key: str) -> None: + """Wait for the quiet period then dispatch the aggregated text. + + Uses a longer delay when the latest chunk is near WeCom's 4000-char + split point, since a continuation chunk is almost certain. + """ + current_task = asyncio.current_task() + try: + pending = self._pending_text_batches.get(key) + last_len = getattr(pending, "_last_chunk_len", 0) if pending else 0 + if last_len >= self._SPLIT_THRESHOLD: + delay = self._text_batch_split_delay_seconds + else: + delay = self._text_batch_delay_seconds + await asyncio.sleep(delay) + event = self._pending_text_batches.pop(key, None) + if not event: + return + logger.info( + "[WeCom] Flushing text batch %s (%d chars)", + key, len(event.text or ""), + ) + await self.handle_message(event) + finally: + if self._pending_text_batch_tasks.get(key) is current_task: + self._pending_text_batch_tasks.pop(key, None) @staticmethod def _extract_text(body: Dict[str, Any]) -> Tuple[str, Optional[str]]: