diff --git a/gateway/platforms/matrix.py b/gateway/platforms/matrix.py index e29ae379b..826d09cab 100644 --- a/gateway/platforms/matrix.py +++ b/gateway/platforms/matrix.py @@ -120,6 +120,11 @@ def check_matrix_requirements() -> bool: class MatrixAdapter(BasePlatformAdapter): """Gateway adapter for Matrix (any homeserver).""" + # Threshold for detecting Matrix client-side message splits. + # When a chunk is near the ~4000-char practical limit, a continuation + # is almost certain. + _SPLIT_THRESHOLD = 3900 + def __init__(self, config: PlatformConfig): super().__init__(config, Platform.MATRIX) @@ -172,6 +177,13 @@ class MatrixAdapter(BasePlatformAdapter): "MATRIX_REACTIONS", "true" ).lower() not in ("false", "0", "no") + # Text batching: merge rapid successive messages (Telegram-style). + # Matrix clients split long messages around 4000 chars. + self._text_batch_delay_seconds = float(os.getenv("HERMES_MATRIX_TEXT_BATCH_DELAY_SECONDS", "0.6")) + self._text_batch_split_delay_seconds = float(os.getenv("HERMES_MATRIX_TEXT_BATCH_SPLIT_DELAY_SECONDS", "2.0")) + self._pending_text_batches: Dict[str, MessageEvent] = {} + self._pending_text_batch_tasks: Dict[str, asyncio.Task] = {} + def _is_duplicate_event(self, event_id) -> bool: """Return True if this event was already processed. Tracks the ID otherwise.""" if not event_id: @@ -1088,7 +1100,81 @@ class MatrixAdapter(BasePlatformAdapter): # Acknowledge receipt so the room shows as read (fire-and-forget). self._background_read_receipt(room.room_id, event.event_id) - await self.handle_message(msg_event) + # Only batch plain text messages — commands dispatch immediately. + if msg_type == MessageType.TEXT: + self._enqueue_text_event(msg_event) + else: + await self.handle_message(msg_event) + + # ------------------------------------------------------------------ + # Text message aggregation (handles Matrix client-side splits) + # ------------------------------------------------------------------ + + def _text_batch_key(self, event: MessageEvent) -> str: + """Session-scoped key for text message batching.""" + from gateway.session import build_session_key + return build_session_key( + event.source, + group_sessions_per_user=self.config.extra.get("group_sessions_per_user", True), + thread_sessions_per_user=self.config.extra.get("thread_sessions_per_user", False), + ) + + def _enqueue_text_event(self, event: MessageEvent) -> None: + """Buffer a text event and reset the flush timer. + + When a Matrix client splits a long message, the chunks arrive within + a few hundred milliseconds. This merges them into a single event + before dispatching. + """ + key = self._text_batch_key(event) + existing = self._pending_text_batches.get(key) + chunk_len = len(event.text or "") + if existing is None: + event._last_chunk_len = chunk_len # type: ignore[attr-defined] + self._pending_text_batches[key] = event + else: + if event.text: + existing.text = f"{existing.text}\n{event.text}" if existing.text else event.text + existing._last_chunk_len = chunk_len # type: ignore[attr-defined] + # Merge any media that might be attached + if event.media_urls: + existing.media_urls.extend(event.media_urls) + existing.media_types.extend(event.media_types) + + # Cancel any pending flush and restart the timer + prior_task = self._pending_text_batch_tasks.get(key) + if prior_task and not prior_task.done(): + prior_task.cancel() + self._pending_text_batch_tasks[key] = asyncio.create_task( + self._flush_text_batch(key) + ) + + async def _flush_text_batch(self, key: str) -> None: + """Wait for the quiet period then dispatch the aggregated text. + + Uses a longer delay when the latest chunk is near Matrix's ~4000-char + split point, since a continuation chunk is almost certain. + """ + current_task = asyncio.current_task() + try: + pending = self._pending_text_batches.get(key) + last_len = getattr(pending, "_last_chunk_len", 0) if pending else 0 + if last_len >= self._SPLIT_THRESHOLD: + delay = self._text_batch_split_delay_seconds + else: + delay = self._text_batch_delay_seconds + await asyncio.sleep(delay) + event = self._pending_text_batches.pop(key, None) + if not event: + return + logger.info( + "[Matrix] Flushing text batch %s (%d chars)", + key, len(event.text or ""), + ) + await self.handle_message(event) + finally: + if self._pending_text_batch_tasks.get(key) is current_task: + self._pending_text_batch_tasks.pop(key, None) async def _on_room_message_media(self, room: Any, event: Any) -> None: """Handle incoming media messages (images, audio, video, files)."""