From c0da5d09a67b97094b6db9b2dfd20657b043a273 Mon Sep 17 00:00:00 2001 From: Aubrey Freeman III Date: Thu, 16 Apr 2026 13:05:22 -0500 Subject: [PATCH] fix: use UTF-16 length for Telegram stream consumer message splitting The stream consumer measured message length using Python's len() (Unicode code points), but Telegram's actual limit is in UTF-16 code units. This caused messages with supplementary characters (emoji, CJK, etc.) to exceed Telegram's 4096-character limit, resulting in truncated messages with formatting artifacts. Changes: - Add message_len_fn property to BasePlatformAdapter (defaults to len) - Override in TelegramAdapter to return utf16_len - Stream consumer uses adapter.message_len_fn for: - safe_limit calculation - overflow detection - truncate_message calls - split point calculation (via _custom_unit_to_cp) - fallback final send chunking Fixes truncated messages with black square artifacts on Telegram when the model generates responses containing multi-byte Unicode characters. --- gateway/platforms/base.py | 9 ++++++ gateway/platforms/telegram.py | 5 ++++ gateway/stream_consumer.py | 52 +++++++++++++++++++++++++++-------- 3 files changed, 54 insertions(+), 12 deletions(-) diff --git a/gateway/platforms/base.py b/gateway/platforms/base.py index d471818a27c..55a53952169 100644 --- a/gateway/platforms/base.py +++ b/gateway/platforms/base.py @@ -1311,6 +1311,15 @@ class BasePlatformAdapter(ABC): # _keep_typing skips send_typing when the chat_id is in this set. self._typing_paused: set = set() + @property + def message_len_fn(self) -> Callable[[str], int]: + """Return the length function for measuring message size on this platform. + + Override in adapters whose platform counts characters differently from + Python ``len`` (e.g. Telegram counts UTF-16 code units). + """ + return len + @property def has_fatal_error(self) -> bool: return self._fatal_error_message is not None diff --git a/gateway/platforms/telegram.py b/gateway/platforms/telegram.py index ae34ee9210c..201912de80a 100644 --- a/gateway/platforms/telegram.py +++ b/gateway/platforms/telegram.py @@ -283,6 +283,11 @@ class TelegramAdapter(BasePlatformAdapter): MEDIA_GROUP_WAIT_SECONDS = 0.8 _GENERAL_TOPIC_THREAD_ID = "1" + @property + def message_len_fn(self): + """Telegram measures message length in UTF-16 code units.""" + return utf16_len + def __init__(self, config: PlatformConfig): super().__init__(config, Platform.TELEGRAM) self._app: Optional[Application] = None diff --git a/gateway/stream_consumer.py b/gateway/stream_consumer.py index 0539b825b83..4ef557ef997 100644 --- a/gateway/stream_consumer.py +++ b/gateway/stream_consumer.py @@ -21,7 +21,10 @@ import queue import re import time from dataclasses import dataclass -from typing import Any, Optional +from typing import Any, Callable, Optional + +from gateway.platforms.base import BasePlatformAdapter as _BasePlatformAdapter +from gateway.platforms.base import _custom_unit_to_cp logger = logging.getLogger("gateway.stream_consumer") @@ -301,9 +304,18 @@ class GatewayStreamConsumer: async def run(self) -> None: """Async task that drains the queue and edits the platform message.""" - # Platform message length limit — leave room for cursor + formatting + # Platform message length limit — leave room for cursor + formatting. + # Use the adapter's length function (e.g. utf16_len for Telegram) so + # overflow detection matches what the platform actually enforces. + # Gate on isinstance(BasePlatformAdapter) so test MagicMocks (whose + # auto-attributes return mock objects, not callables) fall back to len. + _len_fn: "Callable[[str], int]" = ( + self.adapter.message_len_fn + if isinstance(self.adapter, _BasePlatformAdapter) + else len + ) _raw_limit = getattr(self.adapter, "MAX_MESSAGE_LENGTH", 4096) - _safe_limit = max(500, _raw_limit - len(self.cfg.cursor) - 100) + _safe_limit = max(500, _raw_limit - _len_fn(self.cfg.cursor) - 100) try: while True: @@ -345,6 +357,10 @@ class GatewayStreamConsumer: should_edit = should_edit or ( (elapsed >= self._current_edit_interval and self._accumulated) + # buffer_threshold is intentionally codepoint-based: + # it's a debounce heuristic ("send updates roughly + # every N visible characters"), not a platform-limit + # check. _len_fn is reserved for overflow detection. or len(self._accumulated) >= self.cfg.buffer_threshold ) @@ -353,7 +369,7 @@ class GatewayStreamConsumer: # Split overflow: if accumulated text exceeds the platform # limit, split into properly sized chunks. if ( - len(self._accumulated) > _safe_limit + _len_fn(self._accumulated) > _safe_limit and self._message_id is None ): # No existing message to edit (first message or after a @@ -362,7 +378,7 @@ class GatewayStreamConsumer: # proper word/code-fence boundaries and chunk # indicators like "(1/2)". chunks = self.adapter.truncate_message( - self._accumulated, _safe_limit + self._accumulated, _safe_limit, len_fn=_len_fn, ) chunks_delivered = False reply_to = self._message_id or self._initial_reply_to_id @@ -389,11 +405,14 @@ class GatewayStreamConsumer: # Existing message: edit it with the first chunk, then # start a new message for the overflow remainder. while ( - len(self._accumulated) > _safe_limit + _len_fn(self._accumulated) > _safe_limit and self._message_id is not None and self._edit_supported ): - split_at = self._accumulated.rfind("\n", 0, _safe_limit) + _cp_budget = _custom_unit_to_cp( + self._accumulated, _safe_limit, _len_fn, + ) + split_at = self._accumulated.rfind("\n", 0, _cp_budget) if split_at < _safe_limit // 2: split_at = _safe_limit chunk = self._accumulated[:split_at] @@ -584,14 +603,18 @@ class GatewayStreamConsumer: return final_text @staticmethod - def _split_text_chunks(text: str, limit: int) -> list[str]: + def _split_text_chunks( + text: str, limit: int, + len_fn: "Callable[[str], int]" = len, + ) -> list[str]: """Split text into reasonably sized chunks for fallback sends.""" - if len(text) <= limit: + if len_fn(text) <= limit: return [text] chunks: list[str] = [] remaining = text - while len(remaining) > limit: - split_at = remaining.rfind("\n", 0, limit) + while len_fn(remaining) > limit: + _cp_budget = _custom_unit_to_cp(remaining, limit, len_fn) + split_at = remaining.rfind("\n", 0, _cp_budget) if split_at < limit // 2: split_at = limit chunks.append(remaining[:split_at]) @@ -647,8 +670,13 @@ class GatewayStreamConsumer: return raw_limit = getattr(self.adapter, "MAX_MESSAGE_LENGTH", 4096) + _len_fn: "Callable[[str], int]" = ( + self.adapter.message_len_fn + if isinstance(self.adapter, _BasePlatformAdapter) + else len + ) safe_limit = max(500, raw_limit - 100) - chunks = self._split_text_chunks(continuation, safe_limit) + chunks = self._split_text_chunks(continuation, safe_limit, len_fn=_len_fn) stale_message_id = self._message_id # partial message to clean up last_message_id: Optional[str] = None