mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-05 02:31:47 +00:00
fix(matrix): add text batching to merge split long messages
Ports the adaptive batching pattern from the Telegram adapter. Matrix clients split messages around 4000 chars. Adaptive delay waits 2.0s when a chunk is near the limit, 0.6s otherwise. Only text messages are batched; commands dispatch immediately. Ref #6892
This commit is contained in:
parent
0fc0c1c83b
commit
07148cac9a
1 changed files with 87 additions and 1 deletions
|
|
@ -120,6 +120,11 @@ def check_matrix_requirements() -> bool:
|
||||||
class MatrixAdapter(BasePlatformAdapter):
|
class MatrixAdapter(BasePlatformAdapter):
|
||||||
"""Gateway adapter for Matrix (any homeserver)."""
|
"""Gateway adapter for Matrix (any homeserver)."""
|
||||||
|
|
||||||
|
# Threshold for detecting Matrix client-side message splits.
|
||||||
|
# When a chunk is near the ~4000-char practical limit, a continuation
|
||||||
|
# is almost certain.
|
||||||
|
_SPLIT_THRESHOLD = 3900
|
||||||
|
|
||||||
def __init__(self, config: PlatformConfig):
|
def __init__(self, config: PlatformConfig):
|
||||||
super().__init__(config, Platform.MATRIX)
|
super().__init__(config, Platform.MATRIX)
|
||||||
|
|
||||||
|
|
@ -172,6 +177,13 @@ class MatrixAdapter(BasePlatformAdapter):
|
||||||
"MATRIX_REACTIONS", "true"
|
"MATRIX_REACTIONS", "true"
|
||||||
).lower() not in ("false", "0", "no")
|
).lower() not in ("false", "0", "no")
|
||||||
|
|
||||||
|
# Text batching: merge rapid successive messages (Telegram-style).
|
||||||
|
# Matrix clients split long messages around 4000 chars.
|
||||||
|
self._text_batch_delay_seconds = float(os.getenv("HERMES_MATRIX_TEXT_BATCH_DELAY_SECONDS", "0.6"))
|
||||||
|
self._text_batch_split_delay_seconds = float(os.getenv("HERMES_MATRIX_TEXT_BATCH_SPLIT_DELAY_SECONDS", "2.0"))
|
||||||
|
self._pending_text_batches: Dict[str, MessageEvent] = {}
|
||||||
|
self._pending_text_batch_tasks: Dict[str, asyncio.Task] = {}
|
||||||
|
|
||||||
def _is_duplicate_event(self, event_id) -> bool:
|
def _is_duplicate_event(self, event_id) -> bool:
|
||||||
"""Return True if this event was already processed. Tracks the ID otherwise."""
|
"""Return True if this event was already processed. Tracks the ID otherwise."""
|
||||||
if not event_id:
|
if not event_id:
|
||||||
|
|
@ -1088,7 +1100,81 @@ class MatrixAdapter(BasePlatformAdapter):
|
||||||
# Acknowledge receipt so the room shows as read (fire-and-forget).
|
# Acknowledge receipt so the room shows as read (fire-and-forget).
|
||||||
self._background_read_receipt(room.room_id, event.event_id)
|
self._background_read_receipt(room.room_id, event.event_id)
|
||||||
|
|
||||||
await self.handle_message(msg_event)
|
# Only batch plain text messages — commands dispatch immediately.
|
||||||
|
if msg_type == MessageType.TEXT:
|
||||||
|
self._enqueue_text_event(msg_event)
|
||||||
|
else:
|
||||||
|
await self.handle_message(msg_event)
|
||||||
|
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
# Text message aggregation (handles Matrix client-side splits)
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
|
||||||
|
def _text_batch_key(self, event: MessageEvent) -> str:
|
||||||
|
"""Session-scoped key for text message batching."""
|
||||||
|
from gateway.session import build_session_key
|
||||||
|
return build_session_key(
|
||||||
|
event.source,
|
||||||
|
group_sessions_per_user=self.config.extra.get("group_sessions_per_user", True),
|
||||||
|
thread_sessions_per_user=self.config.extra.get("thread_sessions_per_user", False),
|
||||||
|
)
|
||||||
|
|
||||||
|
def _enqueue_text_event(self, event: MessageEvent) -> None:
|
||||||
|
"""Buffer a text event and reset the flush timer.
|
||||||
|
|
||||||
|
When a Matrix client splits a long message, the chunks arrive within
|
||||||
|
a few hundred milliseconds. This merges them into a single event
|
||||||
|
before dispatching.
|
||||||
|
"""
|
||||||
|
key = self._text_batch_key(event)
|
||||||
|
existing = self._pending_text_batches.get(key)
|
||||||
|
chunk_len = len(event.text or "")
|
||||||
|
if existing is None:
|
||||||
|
event._last_chunk_len = chunk_len # type: ignore[attr-defined]
|
||||||
|
self._pending_text_batches[key] = event
|
||||||
|
else:
|
||||||
|
if event.text:
|
||||||
|
existing.text = f"{existing.text}\n{event.text}" if existing.text else event.text
|
||||||
|
existing._last_chunk_len = chunk_len # type: ignore[attr-defined]
|
||||||
|
# Merge any media that might be attached
|
||||||
|
if event.media_urls:
|
||||||
|
existing.media_urls.extend(event.media_urls)
|
||||||
|
existing.media_types.extend(event.media_types)
|
||||||
|
|
||||||
|
# Cancel any pending flush and restart the timer
|
||||||
|
prior_task = self._pending_text_batch_tasks.get(key)
|
||||||
|
if prior_task and not prior_task.done():
|
||||||
|
prior_task.cancel()
|
||||||
|
self._pending_text_batch_tasks[key] = asyncio.create_task(
|
||||||
|
self._flush_text_batch(key)
|
||||||
|
)
|
||||||
|
|
||||||
|
async def _flush_text_batch(self, key: str) -> None:
|
||||||
|
"""Wait for the quiet period then dispatch the aggregated text.
|
||||||
|
|
||||||
|
Uses a longer delay when the latest chunk is near Matrix's ~4000-char
|
||||||
|
split point, since a continuation chunk is almost certain.
|
||||||
|
"""
|
||||||
|
current_task = asyncio.current_task()
|
||||||
|
try:
|
||||||
|
pending = self._pending_text_batches.get(key)
|
||||||
|
last_len = getattr(pending, "_last_chunk_len", 0) if pending else 0
|
||||||
|
if last_len >= self._SPLIT_THRESHOLD:
|
||||||
|
delay = self._text_batch_split_delay_seconds
|
||||||
|
else:
|
||||||
|
delay = self._text_batch_delay_seconds
|
||||||
|
await asyncio.sleep(delay)
|
||||||
|
event = self._pending_text_batches.pop(key, None)
|
||||||
|
if not event:
|
||||||
|
return
|
||||||
|
logger.info(
|
||||||
|
"[Matrix] Flushing text batch %s (%d chars)",
|
||||||
|
key, len(event.text or ""),
|
||||||
|
)
|
||||||
|
await self.handle_message(event)
|
||||||
|
finally:
|
||||||
|
if self._pending_text_batch_tasks.get(key) is current_task:
|
||||||
|
self._pending_text_batch_tasks.pop(key, None)
|
||||||
|
|
||||||
async def _on_room_message_media(self, room: Any, event: Any) -> None:
|
async def _on_room_message_media(self, room: Any, event: Any) -> None:
|
||||||
"""Handle incoming media messages (images, audio, video, files)."""
|
"""Handle incoming media messages (images, audio, video, files)."""
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue