diff --git a/gateway/platforms/base.py b/gateway/platforms/base.py index b5282114295..8e1f83c9b2a 100644 --- a/gateway/platforms/base.py +++ b/gateway/platforms/base.py @@ -1035,6 +1035,13 @@ class SendResult: error: Optional[str] = None raw_response: Any = None retryable: bool = False # True for transient connection errors — base will retry automatically + # When the adapter had to split an oversized payload across multiple + # platform messages (e.g. Telegram edit_message overflow split-and-deliver), + # ``message_id`` is the LAST visible message id (so subsequent edits target + # the most recent chunk) and these are the additional message ids that + # made up the full payload, in send order. Empty tuple for the common + # single-message case. + continuation_message_ids: tuple = () class EphemeralReply(str): diff --git a/gateway/platforms/telegram.py b/gateway/platforms/telegram.py index 7dbef1b6963..996c41ab619 100644 --- a/gateway/platforms/telegram.py +++ b/gateway/platforms/telegram.py @@ -77,7 +77,6 @@ from gateway.platforms.base import ( SUPPORTED_VIDEO_TYPES, SUPPORTED_DOCUMENT_TYPES, utf16_len, - _prefix_within_utf16_limit, ) from gateway.platforms.telegram_network import ( TelegramFallbackTransport, @@ -1559,10 +1558,18 @@ class TelegramAdapter(BasePlatformAdapter): except Exception as e: logger.error("[%s] Failed to send Telegram message: %s", self.name, e, exc_info=True) + err_str = str(e).lower() + # Message too long — content exceeded 4096 chars. Return failure so + # stream consumer enters fallback mode and sends the remainder. + if "message_too_long" in err_str or "too long" in err_str: + logger.debug( + "[%s] send() content too long, falling back to new-message continuation", + self.name, + ) + return SendResult(success=False, error="message_too_long") # TimedOut means the request may have reached Telegram — # mark as non-retryable so _send_with_retry() doesn't re-send. _to = locals().get("_TimedOut") - err_str = str(e).lower() is_timeout = (_to and isinstance(e, _to)) or "timed out" in err_str return SendResult(success=False, error=str(e), retryable=not is_timeout) @@ -1574,9 +1581,26 @@ class TelegramAdapter(BasePlatformAdapter): *, finalize: bool = False, ) -> SendResult: - """Edit a previously sent Telegram message.""" + """Edit a previously sent Telegram message. + + Telegram caps single-message text at 4096 UTF-16 codeunits. Streaming + replies that grow past this limit must NOT be silently truncated and + must NOT return failure (the consumer would re-send and create a + duplicate). Instead this method split-and-delivers: edit the + existing message with the first chunk and send the rest as + continuation messages, returning the final chunk's id so subsequent + edits target the most recent visible message. + """ if not self._bot: return SendResult(success=False, error="Not connected") + + # Pre-flight: if content already exceeds the limit, split-and-deliver + # without round-tripping a doomed edit. + if utf16_len(content) > self.MAX_MESSAGE_LENGTH: + return await self._edit_overflow_split( + chat_id, message_id, content, finalize=finalize, + ) + try: if not finalize: await self._bot.edit_message_text( @@ -1610,22 +1634,17 @@ class TelegramAdapter(BasePlatformAdapter): # "Message is not modified" — content identical, treat as success if "not modified" in err_str: return SendResult(success=True, message_id=message_id) - # Message too long — content exceeded 4096 chars (e.g. during - # streaming). Truncate and succeed so the stream consumer can - # split the overflow into a new message instead of dying. + # Reactive split-and-deliver: parse_mode formatting can inflate + # the payload past the limit even when the raw text was under + # (e.g. MarkdownV2 escapes). Same fix as the pre-flight path. if "message_too_long" in err_str or "too long" in err_str: - truncated = _prefix_within_utf16_limit( - content, self.MAX_MESSAGE_LENGTH - 20 - ) + "…" - try: - await self._bot.edit_message_text( - chat_id=int(chat_id), - message_id=int(message_id), - text=truncated, - ) - except Exception: - pass # best-effort truncation - return SendResult(success=True, message_id=message_id) + logger.debug( + "[%s] edit_message overflow (%d UTF-16 > %d), splitting", + self.name, utf16_len(content), self.MAX_MESSAGE_LENGTH, + ) + return await self._edit_overflow_split( + chat_id, message_id, content, finalize=finalize, + ) # Flood control / RetryAfter — short waits are retried inline, # long waits return a failure immediately so streaming can fall back # to a normal final send instead of leaving a truncated partial. @@ -1661,6 +1680,147 @@ class TelegramAdapter(BasePlatformAdapter): ) return SendResult(success=False, error=str(e)) + async def _edit_overflow_split( + self, + chat_id: str, + message_id: str, + content: str, + *, + finalize: bool, + ) -> SendResult: + """Split an oversized edit across the existing message + continuations. + + Edit the original ``message_id`` with chunk 1 (with the platform's + usual ``(1/N)`` suffix preserved), then send the remaining chunks as + new messages threaded as replies to the previous chunk so the user + sees them grouped. Returns ``SendResult(success=True, + message_id=, continuation_message_ids=(...))`` so the + stream consumer can keep editing the most recent visible message + and the gateway has full visibility into every message id we put on + screen. + + Falls back to ``SendResult(success=False)`` only if even the first- + chunk edit fails — that's a real adapter problem, not an overflow. + """ + chunks = self.truncate_message( + content, self.MAX_MESSAGE_LENGTH, len_fn=utf16_len, + ) + if len(chunks) <= 1: + # Defensive: shouldn't happen given the caller's pre-flight, but + # if truncate_message returned a single chunk just edit normally. + chunks = [content] + + # Step 1 — edit the existing message with the first chunk. + first_chunk = chunks[0] + try: + if finalize: + # Use format_message + parse_mode for the final chunk; + # mirror edit_message's main happy-path. + formatted = self.format_message(first_chunk) + try: + await self._bot.edit_message_text( + chat_id=int(chat_id), + message_id=int(message_id), + text=formatted, + parse_mode=ParseMode.MARKDOWN_V2, + ) + except Exception as fmt_err: + if "not modified" not in str(fmt_err).lower(): + await self._bot.edit_message_text( + chat_id=int(chat_id), + message_id=int(message_id), + text=first_chunk, + ) + else: + await self._bot.edit_message_text( + chat_id=int(chat_id), + message_id=int(message_id), + text=first_chunk, + ) + except Exception as e: + err_str = str(e).lower() + if "not modified" in err_str: + # First chunk identical to current text — fall through to + # send continuations. + pass + else: + logger.error( + "[%s] Overflow split: first-chunk edit failed: %s", + self.name, e, exc_info=True, + ) + return SendResult(success=False, error=str(e)) + + # Step 2 — send each remaining chunk as a continuation message, + # threaded as a reply to the previous so the user sees them as a + # contiguous block. We call self._bot.send_message directly so the + # continuation skips ``self.send``'s own pre-chunking pass (chunks + # are already correctly sized). Best-effort MarkdownV2 with plain + # fallback, mirroring send(). + continuation_ids: list[str] = [] + prev_id = message_id + for chunk in chunks[1:]: + sent_msg = None + for use_markdown in (True, False) if finalize else (False,): + try: + text = self.format_message(chunk) if use_markdown else chunk + sent_msg = await self._bot.send_message( + chat_id=int(chat_id), + text=text, + parse_mode=ParseMode.MARKDOWN_V2 if use_markdown else None, + reply_to_message_id=int(prev_id) if prev_id else None, + ) + break + except Exception as send_err: + if "reply message not found" in str(send_err).lower(): + # Drop the reply anchor and try again. + try: + sent_msg = await self._bot.send_message( + chat_id=int(chat_id), + text=chunk, + ) + break + except Exception as _retry_err: + logger.warning( + "[%s] Overflow continuation no-reply retry failed: %s", + self.name, _retry_err, + ) + sent_msg = None + break + if use_markdown: + # try plain text on next loop iteration + continue + logger.warning( + "[%s] Overflow continuation send failed: %s", + self.name, send_err, + ) + sent_msg = None + break + if sent_msg is None: + # Continuation failed — the user has chunk 1 + however many + # continuations succeeded. Report success with what we got + # so the stream consumer knows the edit landed; the + # remaining tail is lost on this attempt and the next + # streaming tick may retry. + logger.warning( + "[%s] Overflow split: stopped at %d/%d chunks delivered", + self.name, 1 + len(continuation_ids), len(chunks), + ) + break + new_id = str(getattr(sent_msg, "message_id", "")) or prev_id + continuation_ids.append(new_id) + prev_id = new_id + + last_id = continuation_ids[-1] if continuation_ids else message_id + logger.debug( + "[%s] Overflow split delivered %d chunks; last_id=%s", + self.name, 1 + len(continuation_ids), last_id, + ) + return SendResult( + success=True, + message_id=last_id, + continuation_message_ids=tuple(continuation_ids), + ) + async def delete_message(self, chat_id: str, message_id: str) -> bool: """Delete a previously sent Telegram message. diff --git a/gateway/stream_consumer.py b/gateway/stream_consumer.py index 440f65de272..5e553b67ee5 100644 --- a/gateway/stream_consumer.py +++ b/gateway/stream_consumer.py @@ -1153,7 +1153,29 @@ class GatewayStreamConsumer: ) if result.success: self._already_sent = True - self._last_sent_text = text + # Adapter may have split-and-delivered an oversized + # edit across the original message + N continuations. + # When that happens, ``message_id`` is the LAST visible + # continuation and ``_last_sent_text`` no longer reflects + # the on-screen content (the new message only holds the + # final chunk's text), so subsequent edits must target + # the new id and skip-if-same comparisons must reset. + # Fire on_new_message so tool-progress bubbles linearize + # below the new continuation, not the original. + # ``getattr`` with default keeps backwards compat with + # SimpleNamespace mocks in tests that pre-date the field. + _continuation_ids = getattr(result, "continuation_message_ids", ()) or () + if ( + _continuation_ids + and result.message_id + and result.message_id != self._message_id + ): + self._message_id = str(result.message_id) + self._message_created_ts = time.monotonic() + self._last_sent_text = "" + self._notify_new_message() + else: + self._last_sent_text = text # Successful edit — reset flood strike counter self._flood_strikes = 0 return True