From bf1f40996f195d7387ea0bdffd511637bab9e833 Mon Sep 17 00:00:00 2001 From: kjames2001 Date: Sun, 10 May 2026 21:59:05 -0700 Subject: [PATCH] fix(telegram): split-and-deliver oversized edits instead of silent truncation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When edit_message_text exceeded Telegram's 4096 UTF-16 codepoint limit, the adapter caught the BadRequest, best-effort truncated the content with '…', and returned SendResult(success=True). The stream consumer believed the full edit was delivered and never recovered, silently dropping everything past the truncation boundary on long replies. Returning failure isn't safe either — the consumer's existing fallback path can race against the next streaming tick, producing duplicate sends or gaps. Instead, the adapter now SPLITS the oversized payload across the existing message + new continuation messages, so the user always gets the full reply in correct order. How it works: 1. Pre-flight: if utf16_len(content) already exceeds MAX_MESSAGE_LENGTH, call the new _edit_overflow_split helper directly — saves a doomed round-trip + a Telegram error. 2. Reactive: if Telegram still returns 'message_too_long' after the pre-flight (e.g. parse_mode formatting inflated the payload past the limit via MarkdownV2 escapes), the same helper handles it. 3. _edit_overflow_split: - Splits via truncate_message(len_fn=utf16_len) — same chunking the non-streaming send() path uses; chunks get '(1/N)' suffixes. - Edits the original message_id with chunk 1 (with parse_mode + plain-fallback when finalize=True, mirroring the main edit path). - Sends each remaining chunk via self._bot.send_message threaded as a reply to the previous chunk so the user sees them as a contiguous block. MarkdownV2-with-plain-fallback per chunk on finalize. - Returns SendResult(success=True, message_id=, continuation_message_ids=(, , ...)) so the stream consumer can keep editing the most recent visible message and the gateway has full visibility into every message id. SendResult contract extension: Added optional continuation_message_ids: tuple = () field. When empty (the common case), behavior is unchanged. When populated, the caller knows the adapter delivered across multiple platform messages. Stream consumer integration: GatewayStreamConsumer._send_or_edit advances _message_id to the last-continuation id when it sees continuation_message_ids on a successful edit result, resets _last_sent_text (the new visible message holds only the final chunk's text), and fires on_new_message so tool-progress bubbles linearize below the new continuation rather than the original. Mirrors the openclaw #32535 inter-tool-leak guard. Composes with what just landed: - PR #23455 (UTF-16 length-aware splitting in stream consumer) prevents most overflows upstream by measuring text in UTF-16 codeunits before deciding to split. This PR is the safety net at the adapter boundary. - PR #23512 (native draft streaming, default for DM Telegram) routes DM streaming through send_draft, which has its own contract unaffected by this change. So this fix narrows in scope to the edit-based path: groups, supergroups, forum topics, every non-Telegram platform, and the per-response fallback after a draft failure. Salvage notes: - Cherry-picked from PR #19537 by @kjames2001. Original PR returned failure on overflow; this evolves to split-and-deliver so users never lose content and the consumer state stays consistent. - Dropped an unrelated model-picker hunk (line 2114-2117) that silently killed the 'X more available — type /model directly' hint by hardcoding total=len(models). Not in scope. - Restored the timeout-aware retryable=not is_timeout signal in send()'s fallthrough catch block. Closes #19537. --- gateway/platforms/base.py | 7 ++ gateway/platforms/telegram.py | 196 ++++++++++++++++++++++++++++++---- gateway/stream_consumer.py | 24 ++++- 3 files changed, 208 insertions(+), 19 deletions(-) diff --git a/gateway/platforms/base.py b/gateway/platforms/base.py index b5282114295..8e1f83c9b2a 100644 --- a/gateway/platforms/base.py +++ b/gateway/platforms/base.py @@ -1035,6 +1035,13 @@ class SendResult: error: Optional[str] = None raw_response: Any = None retryable: bool = False # True for transient connection errors — base will retry automatically + # When the adapter had to split an oversized payload across multiple + # platform messages (e.g. Telegram edit_message overflow split-and-deliver), + # ``message_id`` is the LAST visible message id (so subsequent edits target + # the most recent chunk) and these are the additional message ids that + # made up the full payload, in send order. Empty tuple for the common + # single-message case. + continuation_message_ids: tuple = () class EphemeralReply(str): diff --git a/gateway/platforms/telegram.py b/gateway/platforms/telegram.py index 7dbef1b6963..996c41ab619 100644 --- a/gateway/platforms/telegram.py +++ b/gateway/platforms/telegram.py @@ -77,7 +77,6 @@ from gateway.platforms.base import ( SUPPORTED_VIDEO_TYPES, SUPPORTED_DOCUMENT_TYPES, utf16_len, - _prefix_within_utf16_limit, ) from gateway.platforms.telegram_network import ( TelegramFallbackTransport, @@ -1559,10 +1558,18 @@ class TelegramAdapter(BasePlatformAdapter): except Exception as e: logger.error("[%s] Failed to send Telegram message: %s", self.name, e, exc_info=True) + err_str = str(e).lower() + # Message too long — content exceeded 4096 chars. Return failure so + # stream consumer enters fallback mode and sends the remainder. + if "message_too_long" in err_str or "too long" in err_str: + logger.debug( + "[%s] send() content too long, falling back to new-message continuation", + self.name, + ) + return SendResult(success=False, error="message_too_long") # TimedOut means the request may have reached Telegram — # mark as non-retryable so _send_with_retry() doesn't re-send. _to = locals().get("_TimedOut") - err_str = str(e).lower() is_timeout = (_to and isinstance(e, _to)) or "timed out" in err_str return SendResult(success=False, error=str(e), retryable=not is_timeout) @@ -1574,9 +1581,26 @@ class TelegramAdapter(BasePlatformAdapter): *, finalize: bool = False, ) -> SendResult: - """Edit a previously sent Telegram message.""" + """Edit a previously sent Telegram message. + + Telegram caps single-message text at 4096 UTF-16 codeunits. Streaming + replies that grow past this limit must NOT be silently truncated and + must NOT return failure (the consumer would re-send and create a + duplicate). Instead this method split-and-delivers: edit the + existing message with the first chunk and send the rest as + continuation messages, returning the final chunk's id so subsequent + edits target the most recent visible message. + """ if not self._bot: return SendResult(success=False, error="Not connected") + + # Pre-flight: if content already exceeds the limit, split-and-deliver + # without round-tripping a doomed edit. + if utf16_len(content) > self.MAX_MESSAGE_LENGTH: + return await self._edit_overflow_split( + chat_id, message_id, content, finalize=finalize, + ) + try: if not finalize: await self._bot.edit_message_text( @@ -1610,22 +1634,17 @@ class TelegramAdapter(BasePlatformAdapter): # "Message is not modified" — content identical, treat as success if "not modified" in err_str: return SendResult(success=True, message_id=message_id) - # Message too long — content exceeded 4096 chars (e.g. during - # streaming). Truncate and succeed so the stream consumer can - # split the overflow into a new message instead of dying. + # Reactive split-and-deliver: parse_mode formatting can inflate + # the payload past the limit even when the raw text was under + # (e.g. MarkdownV2 escapes). Same fix as the pre-flight path. if "message_too_long" in err_str or "too long" in err_str: - truncated = _prefix_within_utf16_limit( - content, self.MAX_MESSAGE_LENGTH - 20 - ) + "…" - try: - await self._bot.edit_message_text( - chat_id=int(chat_id), - message_id=int(message_id), - text=truncated, - ) - except Exception: - pass # best-effort truncation - return SendResult(success=True, message_id=message_id) + logger.debug( + "[%s] edit_message overflow (%d UTF-16 > %d), splitting", + self.name, utf16_len(content), self.MAX_MESSAGE_LENGTH, + ) + return await self._edit_overflow_split( + chat_id, message_id, content, finalize=finalize, + ) # Flood control / RetryAfter — short waits are retried inline, # long waits return a failure immediately so streaming can fall back # to a normal final send instead of leaving a truncated partial. @@ -1661,6 +1680,147 @@ class TelegramAdapter(BasePlatformAdapter): ) return SendResult(success=False, error=str(e)) + async def _edit_overflow_split( + self, + chat_id: str, + message_id: str, + content: str, + *, + finalize: bool, + ) -> SendResult: + """Split an oversized edit across the existing message + continuations. + + Edit the original ``message_id`` with chunk 1 (with the platform's + usual ``(1/N)`` suffix preserved), then send the remaining chunks as + new messages threaded as replies to the previous chunk so the user + sees them grouped. Returns ``SendResult(success=True, + message_id=, continuation_message_ids=(...))`` so the + stream consumer can keep editing the most recent visible message + and the gateway has full visibility into every message id we put on + screen. + + Falls back to ``SendResult(success=False)`` only if even the first- + chunk edit fails — that's a real adapter problem, not an overflow. + """ + chunks = self.truncate_message( + content, self.MAX_MESSAGE_LENGTH, len_fn=utf16_len, + ) + if len(chunks) <= 1: + # Defensive: shouldn't happen given the caller's pre-flight, but + # if truncate_message returned a single chunk just edit normally. + chunks = [content] + + # Step 1 — edit the existing message with the first chunk. + first_chunk = chunks[0] + try: + if finalize: + # Use format_message + parse_mode for the final chunk; + # mirror edit_message's main happy-path. + formatted = self.format_message(first_chunk) + try: + await self._bot.edit_message_text( + chat_id=int(chat_id), + message_id=int(message_id), + text=formatted, + parse_mode=ParseMode.MARKDOWN_V2, + ) + except Exception as fmt_err: + if "not modified" not in str(fmt_err).lower(): + await self._bot.edit_message_text( + chat_id=int(chat_id), + message_id=int(message_id), + text=first_chunk, + ) + else: + await self._bot.edit_message_text( + chat_id=int(chat_id), + message_id=int(message_id), + text=first_chunk, + ) + except Exception as e: + err_str = str(e).lower() + if "not modified" in err_str: + # First chunk identical to current text — fall through to + # send continuations. + pass + else: + logger.error( + "[%s] Overflow split: first-chunk edit failed: %s", + self.name, e, exc_info=True, + ) + return SendResult(success=False, error=str(e)) + + # Step 2 — send each remaining chunk as a continuation message, + # threaded as a reply to the previous so the user sees them as a + # contiguous block. We call self._bot.send_message directly so the + # continuation skips ``self.send``'s own pre-chunking pass (chunks + # are already correctly sized). Best-effort MarkdownV2 with plain + # fallback, mirroring send(). + continuation_ids: list[str] = [] + prev_id = message_id + for chunk in chunks[1:]: + sent_msg = None + for use_markdown in (True, False) if finalize else (False,): + try: + text = self.format_message(chunk) if use_markdown else chunk + sent_msg = await self._bot.send_message( + chat_id=int(chat_id), + text=text, + parse_mode=ParseMode.MARKDOWN_V2 if use_markdown else None, + reply_to_message_id=int(prev_id) if prev_id else None, + ) + break + except Exception as send_err: + if "reply message not found" in str(send_err).lower(): + # Drop the reply anchor and try again. + try: + sent_msg = await self._bot.send_message( + chat_id=int(chat_id), + text=chunk, + ) + break + except Exception as _retry_err: + logger.warning( + "[%s] Overflow continuation no-reply retry failed: %s", + self.name, _retry_err, + ) + sent_msg = None + break + if use_markdown: + # try plain text on next loop iteration + continue + logger.warning( + "[%s] Overflow continuation send failed: %s", + self.name, send_err, + ) + sent_msg = None + break + if sent_msg is None: + # Continuation failed — the user has chunk 1 + however many + # continuations succeeded. Report success with what we got + # so the stream consumer knows the edit landed; the + # remaining tail is lost on this attempt and the next + # streaming tick may retry. + logger.warning( + "[%s] Overflow split: stopped at %d/%d chunks delivered", + self.name, 1 + len(continuation_ids), len(chunks), + ) + break + new_id = str(getattr(sent_msg, "message_id", "")) or prev_id + continuation_ids.append(new_id) + prev_id = new_id + + last_id = continuation_ids[-1] if continuation_ids else message_id + logger.debug( + "[%s] Overflow split delivered %d chunks; last_id=%s", + self.name, 1 + len(continuation_ids), last_id, + ) + return SendResult( + success=True, + message_id=last_id, + continuation_message_ids=tuple(continuation_ids), + ) + async def delete_message(self, chat_id: str, message_id: str) -> bool: """Delete a previously sent Telegram message. diff --git a/gateway/stream_consumer.py b/gateway/stream_consumer.py index 440f65de272..5e553b67ee5 100644 --- a/gateway/stream_consumer.py +++ b/gateway/stream_consumer.py @@ -1153,7 +1153,29 @@ class GatewayStreamConsumer: ) if result.success: self._already_sent = True - self._last_sent_text = text + # Adapter may have split-and-delivered an oversized + # edit across the original message + N continuations. + # When that happens, ``message_id`` is the LAST visible + # continuation and ``_last_sent_text`` no longer reflects + # the on-screen content (the new message only holds the + # final chunk's text), so subsequent edits must target + # the new id and skip-if-same comparisons must reset. + # Fire on_new_message so tool-progress bubbles linearize + # below the new continuation, not the original. + # ``getattr`` with default keeps backwards compat with + # SimpleNamespace mocks in tests that pre-date the field. + _continuation_ids = getattr(result, "continuation_message_ids", ()) or () + if ( + _continuation_ids + and result.message_id + and result.message_id != self._message_id + ): + self._message_id = str(result.message_id) + self._message_created_ts = time.monotonic() + self._last_sent_text = "" + self._notify_new_message() + else: + self._last_sent_text = text # Successful edit — reset flood strike counter self._flood_strikes = 0 return True