fix(telegram): split-and-deliver oversized edits instead of silent truncation

When edit_message_text exceeded Telegram's 4096 UTF-16 codepoint limit,
the adapter caught the BadRequest, best-effort truncated the content
with '…', and returned SendResult(success=True). The stream consumer
believed the full edit was delivered and never recovered, silently
dropping everything past the truncation boundary on long replies.

Returning failure isn't safe either — the consumer's existing fallback
path can race against the next streaming tick, producing duplicate
sends or gaps. Instead, the adapter now SPLITS the oversized payload
across the existing message + new continuation messages, so the user
always gets the full reply in correct order.

How it works:

1. Pre-flight: if utf16_len(content) already exceeds MAX_MESSAGE_LENGTH,
   call the new _edit_overflow_split helper directly — saves a doomed
   round-trip + a Telegram error.

2. Reactive: if Telegram still returns 'message_too_long' after the
   pre-flight (e.g. parse_mode formatting inflated the payload past
   the limit via MarkdownV2 escapes), the same helper handles it.

3. _edit_overflow_split:
   - Splits via truncate_message(len_fn=utf16_len) — same chunking the
     non-streaming send() path uses; chunks get '(1/N)' suffixes.
   - Edits the original message_id with chunk 1 (with parse_mode +
     plain-fallback when finalize=True, mirroring the main edit path).
   - Sends each remaining chunk via self._bot.send_message threaded as
     a reply to the previous chunk so the user sees them as a
     contiguous block. MarkdownV2-with-plain-fallback per chunk on
     finalize.
   - Returns SendResult(success=True, message_id=<last_chunk_id>,
     continuation_message_ids=(<chunk2_id>, <chunk3_id>, ...)) so the
     stream consumer can keep editing the most recent visible message
     and the gateway has full visibility into every message id.

SendResult contract extension:

  Added optional continuation_message_ids: tuple = () field. When
  empty (the common case), behavior is unchanged. When populated, the
  caller knows the adapter delivered across multiple platform messages.

Stream consumer integration:

  GatewayStreamConsumer._send_or_edit advances _message_id to the
  last-continuation id when it sees continuation_message_ids on a
  successful edit result, resets _last_sent_text (the new visible
  message holds only the final chunk's text), and fires
  on_new_message so tool-progress bubbles linearize below the new
  continuation rather than the original. Mirrors the openclaw #32535
  inter-tool-leak guard.

Composes with what just landed:

  - PR #23455 (UTF-16 length-aware splitting in stream consumer)
    prevents most overflows upstream by measuring text in UTF-16
    codeunits before deciding to split. This PR is the safety net at
    the adapter boundary.
  - PR #23512 (native draft streaming, default for DM Telegram) routes
    DM streaming through send_draft, which has its own contract
    unaffected by this change. So this fix narrows in scope to the
    edit-based path: groups, supergroups, forum topics, every
    non-Telegram platform, and the per-response fallback after a
    draft failure.

Salvage notes:

  - Cherry-picked from PR #19537 by @kjames2001. Original PR returned
    failure on overflow; this evolves to split-and-deliver so users
    never lose content and the consumer state stays consistent.
  - Dropped an unrelated model-picker hunk (line 2114-2117) that
    silently killed the 'X more available — type /model <name>
    directly' hint by hardcoding total=len(models). Not in scope.
  - Restored the timeout-aware retryable=not is_timeout signal in
    send()'s fallthrough catch block.

Closes #19537.
This commit is contained in:
kjames2001 2026-05-10 21:59:05 -07:00 committed by Teknium
parent 3b122cc1ac
commit bf1f40996f
3 changed files with 208 additions and 19 deletions

View file

@ -1035,6 +1035,13 @@ class SendResult:
error: Optional[str] = None error: Optional[str] = None
raw_response: Any = None raw_response: Any = None
retryable: bool = False # True for transient connection errors — base will retry automatically retryable: bool = False # True for transient connection errors — base will retry automatically
# When the adapter had to split an oversized payload across multiple
# platform messages (e.g. Telegram edit_message overflow split-and-deliver),
# ``message_id`` is the LAST visible message id (so subsequent edits target
# the most recent chunk) and these are the additional message ids that
# made up the full payload, in send order. Empty tuple for the common
# single-message case.
continuation_message_ids: tuple = ()
class EphemeralReply(str): class EphemeralReply(str):

View file

@ -77,7 +77,6 @@ from gateway.platforms.base import (
SUPPORTED_VIDEO_TYPES, SUPPORTED_VIDEO_TYPES,
SUPPORTED_DOCUMENT_TYPES, SUPPORTED_DOCUMENT_TYPES,
utf16_len, utf16_len,
_prefix_within_utf16_limit,
) )
from gateway.platforms.telegram_network import ( from gateway.platforms.telegram_network import (
TelegramFallbackTransport, TelegramFallbackTransport,
@ -1559,10 +1558,18 @@ class TelegramAdapter(BasePlatformAdapter):
except Exception as e: except Exception as e:
logger.error("[%s] Failed to send Telegram message: %s", self.name, e, exc_info=True) logger.error("[%s] Failed to send Telegram message: %s", self.name, e, exc_info=True)
err_str = str(e).lower()
# Message too long — content exceeded 4096 chars. Return failure so
# stream consumer enters fallback mode and sends the remainder.
if "message_too_long" in err_str or "too long" in err_str:
logger.debug(
"[%s] send() content too long, falling back to new-message continuation",
self.name,
)
return SendResult(success=False, error="message_too_long")
# TimedOut means the request may have reached Telegram — # TimedOut means the request may have reached Telegram —
# mark as non-retryable so _send_with_retry() doesn't re-send. # mark as non-retryable so _send_with_retry() doesn't re-send.
_to = locals().get("_TimedOut") _to = locals().get("_TimedOut")
err_str = str(e).lower()
is_timeout = (_to and isinstance(e, _to)) or "timed out" in err_str is_timeout = (_to and isinstance(e, _to)) or "timed out" in err_str
return SendResult(success=False, error=str(e), retryable=not is_timeout) return SendResult(success=False, error=str(e), retryable=not is_timeout)
@ -1574,9 +1581,26 @@ class TelegramAdapter(BasePlatformAdapter):
*, *,
finalize: bool = False, finalize: bool = False,
) -> SendResult: ) -> SendResult:
"""Edit a previously sent Telegram message.""" """Edit a previously sent Telegram message.
Telegram caps single-message text at 4096 UTF-16 codeunits. Streaming
replies that grow past this limit must NOT be silently truncated and
must NOT return failure (the consumer would re-send and create a
duplicate). Instead this method split-and-delivers: edit the
existing message with the first chunk and send the rest as
continuation messages, returning the final chunk's id so subsequent
edits target the most recent visible message.
"""
if not self._bot: if not self._bot:
return SendResult(success=False, error="Not connected") return SendResult(success=False, error="Not connected")
# Pre-flight: if content already exceeds the limit, split-and-deliver
# without round-tripping a doomed edit.
if utf16_len(content) > self.MAX_MESSAGE_LENGTH:
return await self._edit_overflow_split(
chat_id, message_id, content, finalize=finalize,
)
try: try:
if not finalize: if not finalize:
await self._bot.edit_message_text( await self._bot.edit_message_text(
@ -1610,22 +1634,17 @@ class TelegramAdapter(BasePlatformAdapter):
# "Message is not modified" — content identical, treat as success # "Message is not modified" — content identical, treat as success
if "not modified" in err_str: if "not modified" in err_str:
return SendResult(success=True, message_id=message_id) return SendResult(success=True, message_id=message_id)
# Message too long — content exceeded 4096 chars (e.g. during # Reactive split-and-deliver: parse_mode formatting can inflate
# streaming). Truncate and succeed so the stream consumer can # the payload past the limit even when the raw text was under
# split the overflow into a new message instead of dying. # (e.g. MarkdownV2 escapes). Same fix as the pre-flight path.
if "message_too_long" in err_str or "too long" in err_str: if "message_too_long" in err_str or "too long" in err_str:
truncated = _prefix_within_utf16_limit( logger.debug(
content, self.MAX_MESSAGE_LENGTH - 20 "[%s] edit_message overflow (%d UTF-16 > %d), splitting",
) + "" self.name, utf16_len(content), self.MAX_MESSAGE_LENGTH,
try: )
await self._bot.edit_message_text( return await self._edit_overflow_split(
chat_id=int(chat_id), chat_id, message_id, content, finalize=finalize,
message_id=int(message_id), )
text=truncated,
)
except Exception:
pass # best-effort truncation
return SendResult(success=True, message_id=message_id)
# Flood control / RetryAfter — short waits are retried inline, # Flood control / RetryAfter — short waits are retried inline,
# long waits return a failure immediately so streaming can fall back # long waits return a failure immediately so streaming can fall back
# to a normal final send instead of leaving a truncated partial. # to a normal final send instead of leaving a truncated partial.
@ -1661,6 +1680,147 @@ class TelegramAdapter(BasePlatformAdapter):
) )
return SendResult(success=False, error=str(e)) return SendResult(success=False, error=str(e))
async def _edit_overflow_split(
self,
chat_id: str,
message_id: str,
content: str,
*,
finalize: bool,
) -> SendResult:
"""Split an oversized edit across the existing message + continuations.
Edit the original ``message_id`` with chunk 1 (with the platform's
usual ``(1/N)`` suffix preserved), then send the remaining chunks as
new messages threaded as replies to the previous chunk so the user
sees them grouped. Returns ``SendResult(success=True,
message_id=<last-chunk-id>, continuation_message_ids=(...))`` so the
stream consumer can keep editing the most recent visible message
and the gateway has full visibility into every message id we put on
screen.
Falls back to ``SendResult(success=False)`` only if even the first-
chunk edit fails that's a real adapter problem, not an overflow.
"""
chunks = self.truncate_message(
content, self.MAX_MESSAGE_LENGTH, len_fn=utf16_len,
)
if len(chunks) <= 1:
# Defensive: shouldn't happen given the caller's pre-flight, but
# if truncate_message returned a single chunk just edit normally.
chunks = [content]
# Step 1 — edit the existing message with the first chunk.
first_chunk = chunks[0]
try:
if finalize:
# Use format_message + parse_mode for the final chunk;
# mirror edit_message's main happy-path.
formatted = self.format_message(first_chunk)
try:
await self._bot.edit_message_text(
chat_id=int(chat_id),
message_id=int(message_id),
text=formatted,
parse_mode=ParseMode.MARKDOWN_V2,
)
except Exception as fmt_err:
if "not modified" not in str(fmt_err).lower():
await self._bot.edit_message_text(
chat_id=int(chat_id),
message_id=int(message_id),
text=first_chunk,
)
else:
await self._bot.edit_message_text(
chat_id=int(chat_id),
message_id=int(message_id),
text=first_chunk,
)
except Exception as e:
err_str = str(e).lower()
if "not modified" in err_str:
# First chunk identical to current text — fall through to
# send continuations.
pass
else:
logger.error(
"[%s] Overflow split: first-chunk edit failed: %s",
self.name, e, exc_info=True,
)
return SendResult(success=False, error=str(e))
# Step 2 — send each remaining chunk as a continuation message,
# threaded as a reply to the previous so the user sees them as a
# contiguous block. We call self._bot.send_message directly so the
# continuation skips ``self.send``'s own pre-chunking pass (chunks
# are already correctly sized). Best-effort MarkdownV2 with plain
# fallback, mirroring send().
continuation_ids: list[str] = []
prev_id = message_id
for chunk in chunks[1:]:
sent_msg = None
for use_markdown in (True, False) if finalize else (False,):
try:
text = self.format_message(chunk) if use_markdown else chunk
sent_msg = await self._bot.send_message(
chat_id=int(chat_id),
text=text,
parse_mode=ParseMode.MARKDOWN_V2 if use_markdown else None,
reply_to_message_id=int(prev_id) if prev_id else None,
)
break
except Exception as send_err:
if "reply message not found" in str(send_err).lower():
# Drop the reply anchor and try again.
try:
sent_msg = await self._bot.send_message(
chat_id=int(chat_id),
text=chunk,
)
break
except Exception as _retry_err:
logger.warning(
"[%s] Overflow continuation no-reply retry failed: %s",
self.name, _retry_err,
)
sent_msg = None
break
if use_markdown:
# try plain text on next loop iteration
continue
logger.warning(
"[%s] Overflow continuation send failed: %s",
self.name, send_err,
)
sent_msg = None
break
if sent_msg is None:
# Continuation failed — the user has chunk 1 + however many
# continuations succeeded. Report success with what we got
# so the stream consumer knows the edit landed; the
# remaining tail is lost on this attempt and the next
# streaming tick may retry.
logger.warning(
"[%s] Overflow split: stopped at %d/%d chunks delivered",
self.name, 1 + len(continuation_ids), len(chunks),
)
break
new_id = str(getattr(sent_msg, "message_id", "")) or prev_id
continuation_ids.append(new_id)
prev_id = new_id
last_id = continuation_ids[-1] if continuation_ids else message_id
logger.debug(
"[%s] Overflow split delivered %d chunks; last_id=%s",
self.name, 1 + len(continuation_ids), last_id,
)
return SendResult(
success=True,
message_id=last_id,
continuation_message_ids=tuple(continuation_ids),
)
async def delete_message(self, chat_id: str, message_id: str) -> bool: async def delete_message(self, chat_id: str, message_id: str) -> bool:
"""Delete a previously sent Telegram message. """Delete a previously sent Telegram message.

View file

@ -1153,7 +1153,29 @@ class GatewayStreamConsumer:
) )
if result.success: if result.success:
self._already_sent = True self._already_sent = True
self._last_sent_text = text # Adapter may have split-and-delivered an oversized
# edit across the original message + N continuations.
# When that happens, ``message_id`` is the LAST visible
# continuation and ``_last_sent_text`` no longer reflects
# the on-screen content (the new message only holds the
# final chunk's text), so subsequent edits must target
# the new id and skip-if-same comparisons must reset.
# Fire on_new_message so tool-progress bubbles linearize
# below the new continuation, not the original.
# ``getattr`` with default keeps backwards compat with
# SimpleNamespace mocks in tests that pre-date the field.
_continuation_ids = getattr(result, "continuation_message_ids", ()) or ()
if (
_continuation_ids
and result.message_id
and result.message_id != self._message_id
):
self._message_id = str(result.message_id)
self._message_created_ts = time.monotonic()
self._last_sent_text = ""
self._notify_new_message()
else:
self._last_sent_text = text
# Successful edit — reset flood strike counter # Successful edit — reset flood strike counter
self._flood_strikes = 0 self._flood_strikes = 0
return True return True