diff --git a/gateway/platforms/discord.py b/gateway/platforms/discord.py index a19b6d6663..4e7d013e3d 100644 --- a/gateway/platforms/discord.py +++ b/gateway/platforms/discord.py @@ -422,6 +422,7 @@ class DiscordAdapter(BasePlatformAdapter): # Discord message limits MAX_MESSAGE_LENGTH = 2000 + _SPLIT_THRESHOLD = 1900 # near the 2000-char split point # Auto-disconnect from voice channel after this many seconds of inactivity VOICE_TIMEOUT = 300 @@ -433,6 +434,11 @@ class DiscordAdapter(BasePlatformAdapter): self._allowed_user_ids: set = set() # For button approval authorization # Voice channel state (per-guild) self._voice_clients: Dict[int, Any] = {} # guild_id -> VoiceClient + # Text batching: merge rapid successive messages (Telegram-style) + self._text_batch_delay_seconds = float(os.getenv("HERMES_DISCORD_TEXT_BATCH_DELAY_SECONDS", "0.6")) + self._text_batch_split_delay_seconds = float(os.getenv("HERMES_DISCORD_TEXT_BATCH_SPLIT_DELAY_SECONDS", "2.0")) + self._pending_text_batches: Dict[str, MessageEvent] = {} + self._pending_text_batch_tasks: Dict[str, asyncio.Task] = {} self._voice_text_channels: Dict[int, int] = {} # guild_id -> text_channel_id self._voice_timeout_tasks: Dict[int, asyncio.Task] = {} # guild_id -> timeout task # Phase 2: voice listening @@ -2466,7 +2472,80 @@ class DiscordAdapter(BasePlatformAdapter): if thread_id: self._track_thread(thread_id) - await self.handle_message(event) + # Only batch plain text messages — commands, media, etc. dispatch + # immediately since they won't be split by the Discord client. + if msg_type == MessageType.TEXT: + self._enqueue_text_event(event) + else: + await self.handle_message(event) + + # ------------------------------------------------------------------ + # Text message aggregation (handles Discord client-side splits) + # ------------------------------------------------------------------ + + def _text_batch_key(self, event: MessageEvent) -> str: + """Session-scoped key for text message batching.""" + from gateway.session import build_session_key + return build_session_key( + event.source, + group_sessions_per_user=self.config.extra.get("group_sessions_per_user", True), + thread_sessions_per_user=self.config.extra.get("thread_sessions_per_user", False), + ) + + def _enqueue_text_event(self, event: MessageEvent) -> None: + """Buffer a text event and reset the flush timer. + + When Discord splits a long user message at 2000 chars, the chunks + arrive within a few hundred milliseconds. This merges them into + a single event before dispatching. + """ + key = self._text_batch_key(event) + existing = self._pending_text_batches.get(key) + chunk_len = len(event.text or "") + if existing is None: + event._last_chunk_len = chunk_len # type: ignore[attr-defined] + self._pending_text_batches[key] = event + else: + if event.text: + existing.text = f"{existing.text}\n{event.text}" if existing.text else event.text + existing._last_chunk_len = chunk_len # type: ignore[attr-defined] + if event.media_urls: + existing.media_urls.extend(event.media_urls) + existing.media_types.extend(event.media_types) + + prior_task = self._pending_text_batch_tasks.get(key) + if prior_task and not prior_task.done(): + prior_task.cancel() + self._pending_text_batch_tasks[key] = asyncio.create_task( + self._flush_text_batch(key) + ) + + async def _flush_text_batch(self, key: str) -> None: + """Wait for the quiet period then dispatch the aggregated text. + + Uses a longer delay when the latest chunk is near Discord's 2000-char + split point, since a continuation chunk is almost certain. + """ + current_task = asyncio.current_task() + try: + pending = self._pending_text_batches.get(key) + last_len = getattr(pending, "_last_chunk_len", 0) if pending else 0 + if last_len >= self._SPLIT_THRESHOLD: + delay = self._text_batch_split_delay_seconds + else: + delay = self._text_batch_delay_seconds + await asyncio.sleep(delay) + event = self._pending_text_batches.pop(key, None) + if not event: + return + logger.info( + "[Discord] Flushing text batch %s (%d chars)", + key, len(event.text or ""), + ) + await self.handle_message(event) + finally: + if self._pending_text_batch_tasks.get(key) is current_task: + self._pending_text_batch_tasks.pop(key, None) # ---------------------------------------------------------------------------