From 4ed293b38e8af06110c224de7923e908fef3bbf1 Mon Sep 17 00:00:00 2001 From: NivOO5 <231191380+NivOO5@users.noreply.github.com> Date: Sun, 10 May 2026 19:12:31 -0700 Subject: [PATCH] feat(telegram): native draft streaming via sendMessageDraft (Bot API 9.5+) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds Telegram's native streaming-draft API as a streaming transport so DM replies render with smooth animated previews as tokens arrive, dropping the per-edit jitter of the legacy editMessageText polling path. Adapter contract (gateway/platforms/base.py): - supports_draft_streaming(chat_type, metadata) -> bool. Default False. Telegram returns True only for DMs and only when the bound python- telegram-bot version exposes Bot.send_message_draft (PTB 22.6+). - send_draft(chat_id, draft_id, content, metadata) -> SendResult. Default raises NotImplementedError. Telegram delegates to PTB's send_message_draft. Drafts have no message_id (Bot API contract); SendResult.message_id is None on success. Telegram adapter (gateway/platforms/telegram.py): - supports_draft_streaming gates on chat_type='dm' AND PTB capability. - send_draft trims to MAX_MESSAGE_LENGTH using utf16_len, threads message_thread_id through metadata, and routes failures back as SendResult(success=False, error=...) so the consumer can fall back. Stream consumer (gateway/stream_consumer.py): - StreamConsumerConfig gains transport ('auto'|'draft'|'edit'|'off') and chat_type fields. - run() resolves _use_draft_streaming once via a probe at the top of the run, allocating a fresh class-wide draft_id_counter so each response animates as its own preview (no animation collision across consecutive responses to the same chat). - _send_or_edit gains a pre-edit branch: when drafts are active AND not finalizing AND no edit-path message_id is established, the frame routes through _send_draft_frame instead of edit_message. Drafts intentionally do NOT set _already_sent so the gateway's final sendMessage path still fires — drafts have no message_id and the user needs a real message in their chat history. - _reset_segment_state bumps the draft_id when the consumer is in draft mode so each text block after a tool boundary animates as a fresh preview below the tool-progress bubble (avoids the inter- tool-call leak openclaw documented in their #32535). - Per-response fallback: any send_draft failure (transient network, server reject, capability gap) flips _use_draft_streaming to False for the rest of the run, gracefully returning to the edit path. Gateway config (gateway/config.py): - StreamingConfig.transport default flips edit -> auto. The auto path is identical to edit on every chat type that doesn't currently support drafts (groups, supergroups, forum topics, every non- Telegram platform), so the default is backwards-compatible for non-DM users. Lifecycle model (Telegram Bot API 9.5): 1. sendMessageDraft(chat_id, draft_id, text='') opens the bubble. 2. Repeated sendMessageDraft calls with the SAME draft_id animate the preview as text grows. 3. Drafts have no message_id and cannot be edited or deleted. 4. When the response finishes the gateway's normal sendMessage path delivers the final answer; the draft preview clears naturally on the client and the user sees a real message in their history. Inspired by PR #3412 by @NivOO5. Re-authored against current main (stream_consumer.py is now ~4x larger than at #3412's branch base, with new _NEW_SEGMENT/_COMMENTARY/finalize/_on_new_message machinery the original PR didn't account for) but the design call (DM-only, edit- fallback, transport=auto|draft|edit|off) is faithful to the original proposal, with two improvements baked in: 1. Per-response draft_id (monotonic counter, not a time hash) — no collision risk across consecutive responses on the same chat. 2. Tool-boundary draft_id bump — prevents the inter-tool-call leak openclaw hit during their rollout (their #32535). Closes #21439 (duplicate feature request). --- gateway/config.py | 12 ++- gateway/platforms/base.py | 46 ++++++++++ gateway/platforms/telegram.py | 71 +++++++++++++++ gateway/run.py | 4 + gateway/stream_consumer.py | 167 ++++++++++++++++++++++++++++++++++ 5 files changed, 298 insertions(+), 2 deletions(-) diff --git a/gateway/config.py b/gateway/config.py index ab03bc1e0c1..5edb17b8094 100644 --- a/gateway/config.py +++ b/gateway/config.py @@ -321,7 +321,15 @@ class PlatformConfig: class StreamingConfig: """Configuration for real-time token streaming to messaging platforms.""" enabled: bool = False - transport: str = "edit" # "edit" (progressive editMessageText) or "off" + # Transport selection: + # "auto" — prefer native streaming-draft updates when the platform + # supports them (Telegram sendMessageDraft, Bot API 9.5+); + # fall back to edit-based when not. Recommended. + # "draft" — explicitly request native drafts; falls back to edit when + # the platform/chat doesn't support them. + # "edit" — progressive editMessageText only (legacy behaviour). + # "off" — disable streaming entirely. + transport: str = "auto" edit_interval: float = 1.0 # Seconds between message edits (Telegram rate-limits at ~1/s) buffer_threshold: int = 40 # Chars before forcing an edit cursor: str = " ▉" # Cursor shown during streaming @@ -350,7 +358,7 @@ class StreamingConfig: return cls() return cls( enabled=_coerce_bool(data.get("enabled"), False), - transport=data.get("transport", "edit"), + transport=data.get("transport", "auto"), edit_interval=_coerce_float(data.get("edit_interval"), 1.0), buffer_threshold=_coerce_int(data.get("buffer_threshold"), 40), cursor=data.get("cursor", " ▉"), diff --git a/gateway/platforms/base.py b/gateway/platforms/base.py index 55a53952169..b5282114295 100644 --- a/gateway/platforms/base.py +++ b/gateway/platforms/base.py @@ -1320,6 +1320,52 @@ class BasePlatformAdapter(ABC): """ return len + def supports_draft_streaming( + self, + chat_type: Optional[str] = None, + metadata: Optional[Dict[str, Any]] = None, + ) -> bool: + """Whether this adapter supports native streaming-draft updates. + + Telegram Bot API 9.5 introduced ``sendMessageDraft``, which renders an + animated streaming preview as the bot calls it repeatedly with the + same ``draft_id`` and growing text. Adapters that implement + ``send_draft`` should return True here for the chat types where the + platform supports it (Telegram restricts drafts to private DMs). + + Default implementation returns False. Stream consumers fall back to + the edit-based path (``send`` + ``edit_message``) when this returns + False or when ``send_draft`` raises. + """ + return False + + async def send_draft( + self, + chat_id: str, + draft_id: int, + content: str, + metadata: Optional[Dict[str, Any]] = None, + ) -> SendResult: + """Send or update an animated streaming-draft preview. + + Reuse the same ``draft_id`` (any non-zero int) across consecutive + calls within a single response so the platform animates the preview + rather than re-creating it. Different responses must use different + ``draft_id`` values within the same chat to avoid animating over a + prior bubble. + + Drafts have no message_id and cannot be edited, replied to, or + deleted via normal message APIs. When the response finishes, the + caller delivers the final answer as a regular ``send`` and the + draft preview clears naturally on the client. + + Default implementation raises NotImplementedError; adapters that + also return True from :meth:`supports_draft_streaming` must override. + """ + raise NotImplementedError( + f"{type(self).__name__} does not implement send_draft" + ) + @property def has_fatal_error(self) -> bool: return self._fatal_error_message is not None diff --git a/gateway/platforms/telegram.py b/gateway/platforms/telegram.py index 2b0dd795eb2..7dbef1b6963 100644 --- a/gateway/platforms/telegram.py +++ b/gateway/platforms/telegram.py @@ -1686,6 +1686,77 @@ class TelegramAdapter(BasePlatformAdapter): ) return False + def supports_draft_streaming( + self, + chat_type: Optional[str] = None, + metadata: Optional[Dict[str, Any]] = None, + ) -> bool: + """Telegram supports sendMessageDraft for private chats only. + + Bot API 9.5 (March 2026) opened ``sendMessageDraft`` to all bots + unconditionally for private (DM) chats. Groups, supergroups, and + channels still rely on the edit-based path. + + We additionally require ``self._bot`` to expose ``send_message_draft`` + (added to python-telegram-bot in 22.6); older PTB installs gracefully + fall back to the edit path even on DMs. + """ + if not self._bot or not hasattr(self._bot, "send_message_draft"): + return False + return (chat_type or "").lower() in ("dm", "private") + + async def send_draft( + self, + chat_id: str, + draft_id: int, + content: str, + metadata: Optional[Dict[str, Any]] = None, + ) -> SendResult: + """Stream a partial message via Telegram's native sendMessageDraft. + + The Bot API animates the preview when the same ``draft_id`` is reused + across consecutive calls in the same chat. When the response + finishes, the caller sends the final text via the normal ``send`` + path; the draft preview clears naturally on the client (Telegram has + no Bot API to "promote" a draft to a real message — the final + ``sendMessage`` is what the user receives in their history). + """ + if not self._bot: + return SendResult(success=False, error="not_connected") + if not hasattr(self._bot, "send_message_draft"): + return SendResult(success=False, error="api_unavailable") + + # Trim to the same UTF-16 budget the platform enforces on regular + # sends. Drafts have the same length contract as messages. + text = content if len(content) <= self.MAX_MESSAGE_LENGTH else \ + self.truncate_message(content, self.MAX_MESSAGE_LENGTH, len_fn=utf16_len)[0] + + kwargs: Dict[str, Any] = { + "chat_id": int(chat_id), + "draft_id": int(draft_id), + "text": text, + } + thread_id = self._metadata_thread_id(metadata) + if thread_id is not None: + kwargs["message_thread_id"] = thread_id + + try: + ok = await self._bot.send_message_draft(**kwargs) + if ok: + # Drafts have no message_id; we report success without one + # so the caller knows the animation frame landed. + return SendResult(success=True, message_id=None) + return SendResult(success=False, error="draft_rejected") + except Exception as e: + # Most likely: BadRequest because this bot/chat doesn't allow + # drafts, or a transient server hiccup. The caller treats any + # failure as "fall back to edit-based for this response". + logger.debug( + "[%s] sendMessageDraft failed (chat=%s draft_id=%s): %s", + self.name, chat_id, draft_id, e, + ) + return SendResult(success=False, error=str(e)) + async def _send_message_with_thread_fallback(self, **kwargs): """Send a Telegram message, retrying once without message_thread_id if Telegram returns 'Message thread not found'. diff --git a/gateway/run.py b/gateway/run.py index 358e3702489..13cae5297a8 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -13984,6 +13984,8 @@ class GatewayRunner: cursor=_effective_cursor, buffer_only=_buffer_only, fresh_final_after_seconds=_fresh_final_secs, + transport=_scfg.transport or "auto", + chat_type=getattr(source, "chat_type", "") or "", ) _stream_consumer = GatewayStreamConsumer( adapter=_adapter, @@ -14805,6 +14807,8 @@ class GatewayRunner: cursor=_effective_cursor, buffer_only=_buffer_only, fresh_final_after_seconds=_fresh_final_secs, + transport=_scfg.transport or "auto", + chat_type=getattr(source, "chat_type", "") or "", ) _stream_consumer = GatewayStreamConsumer( adapter=_adapter, diff --git a/gateway/stream_consumer.py b/gateway/stream_consumer.py index 4ef557ef997..440f65de272 100644 --- a/gateway/stream_consumer.py +++ b/gateway/stream_consumer.py @@ -55,6 +55,18 @@ class StreamConsumerConfig: # openclaw/openclaw#72038. Default 0 = always edit in place (legacy # behavior). The gateway enables this selectively per-platform. fresh_final_after_seconds: float = 0.0 + # Streaming transport selection: + # "auto" — prefer native draft streaming (e.g. Telegram sendMessageDraft) + # when the adapter + chat supports it; fall back to edit. + # "draft" — explicitly request native draft streaming; fall back to + # edit when unsupported. + # "edit" — progressive editMessageText (legacy behavior). + # "off" — handled by the gateway before the consumer is even built. + transport: str = "auto" + # Hint for the consumer about the originating chat type (e.g. "dm", + # "group", "supergroup", "forum"). Used to gate native draft streaming, + # which is platform-specific (Telegram drafts are DM-only). + chat_type: str = "" class GatewayStreamConsumer: @@ -88,6 +100,11 @@ class GatewayStreamConsumer: "", "", "", ) + # Class-wide monotonic counter for native-streaming draft ids. Telegram + # animates a draft when the same draft_id is reused across consecutive + # calls in the same chat, so we need a fresh non-zero id per response. + _draft_id_counter: int = 0 + def __init__( self, adapter: Any, @@ -141,6 +158,20 @@ class GatewayStreamConsumer: self._in_think_block = False self._think_buffer = "" + # Native draft-streaming state. Resolved at the start of run() based + # on cfg.transport, cfg.chat_type, and the adapter's + # supports_draft_streaming() probe. When True, the consumer emits + # animated draft frames via adapter.send_draft instead of progressive + # edits via adapter.edit_message. The final answer still goes + # through the normal first-send path so the user gets a real message + # in their chat history (drafts have no message_id). + self._use_draft_streaming = False + self._draft_id: Optional[int] = None + # Cumulative draft-frame failure count for this consumer. After the + # first failure we permanently disable drafts for the remainder of + # this response and route through edit-based for graceful degradation. + self._draft_failures = 0 + @property def already_sent(self) -> bool: """True if at least one message was sent or edited during the run.""" @@ -179,6 +210,16 @@ class GatewayStreamConsumer: self._last_sent_text = "" self._fallback_final_send = False self._fallback_prefix = "" + # Native draft streaming: bump the draft_id so the next text segment + # animates as a fresh preview below the tool-progress bubbles, not + # over the prior segment's already-finalized draft. This is how + # we avoid the "inter-tool-call text leak" failure mode openclaw + # documented in their issue #32535 — each text block becomes its + # own visible message via the finalize, then a new draft animates + # for the next one. + if self._use_draft_streaming: + type(self)._draft_id_counter += 1 + self._draft_id = type(self)._draft_id_counter def on_delta(self, text: str) -> None: """Thread-safe callback — called from the agent's worker thread. @@ -317,6 +358,20 @@ class GatewayStreamConsumer: _raw_limit = getattr(self.adapter, "MAX_MESSAGE_LENGTH", 4096) _safe_limit = max(500, _raw_limit - _len_fn(self.cfg.cursor) - 100) + # Resolve native draft streaming once per run. When enabled the + # consumer routes mid-stream frames through adapter.send_draft and + # leaves _message_id=None so the existing got_done path delivers the + # final answer as a regular sendMessage (drafts have no message_id + # to edit). + self._use_draft_streaming = self._resolve_draft_streaming() + if self._use_draft_streaming: + type(self)._draft_id_counter += 1 + self._draft_id = type(self)._draft_id_counter + logger.debug( + "Stream consumer using native-draft transport (chat=%s draft_id=%s)", + self.chat_id, self._draft_id, + ) + try: while True: # Drain all available items from the queue @@ -754,6 +809,89 @@ class GatewayStreamConsumer: err_lower = err.lower() return "flood" in err_lower or "retry after" in err_lower or "rate" in err_lower + def _resolve_draft_streaming(self) -> bool: + """Decide whether this run should use native draft streaming. + + Honors ``cfg.transport``: + * ``"edit"`` → never use drafts (legacy progressive-edit path). + * ``"draft"`` → require draft support; gracefully fall back to edit + when the adapter declines. Logs the downgrade at debug. + * ``"auto"`` → use drafts when the adapter supports them for this + chat type; otherwise edit. + + Adapter eligibility is checked via + :meth:`BasePlatformAdapter.supports_draft_streaming`, which considers + the chat type (e.g. Telegram drafts are DM-only) and platform-version + gates (e.g. python-telegram-bot 22.6+). + """ + transport = (self.cfg.transport or "auto").lower() + if transport == "edit": + return False + # "off" is filtered upstream by the gateway; treat as edit defensively. + if transport == "off": + return False + # Test adapters are MagicMocks that don't subclass BasePlatformAdapter; + # default them to edit so existing test behaviour is preserved. + if not isinstance(self.adapter, _BasePlatformAdapter): + return False + try: + supported = self.adapter.supports_draft_streaming( + chat_type=self.cfg.chat_type or None, + metadata=self.metadata, + ) + except Exception: + logger.debug("supports_draft_streaming probe raised", exc_info=True) + supported = False + if not supported: + if transport == "draft": + logger.debug( + "Draft streaming requested but unsupported (chat=%s, type=%r) — " + "falling back to edit", + self.chat_id, self.cfg.chat_type, + ) + return False + return True + + async def _send_draft_frame(self, text: str) -> bool: + """Emit a single animated draft frame for the current accumulated text. + + Returns True when the frame landed. On any failure, permanently + disables drafts for the remainder of this run so subsequent frames + flow through the edit-based path (which can adapt with flood-control + backoff, etc.). Drafts have no message_id and clear naturally on + the client when the response finalizes via a regular sendMessage. + """ + if self._draft_id is None: + # Defensive: should never happen — _use_draft_streaming gate is + # set in tandem with _draft_id in run(). Disable to be safe. + self._use_draft_streaming = False + return False + try: + result = await self.adapter.send_draft( + chat_id=self.chat_id, + draft_id=self._draft_id, + content=text, + metadata=self.metadata, + ) + except Exception as e: + logger.debug( + "send_draft raised, disabling draft transport for this run: %s", e, + ) + self._draft_failures += 1 + self._use_draft_streaming = False + return False + if not getattr(result, "success", False): + logger.debug( + "send_draft returned success=False, disabling draft transport: %s", + getattr(result, "error", "unknown"), + ) + self._draft_failures += 1 + self._use_draft_streaming = False + return False + # Frame delivered. Track text for parity with edit-based no-op skip. + self._last_sent_text = text + return True + async def _flush_segment_tail_on_edit_failure(self) -> None: """Deliver un-sent tail content before a segment-break reset. @@ -948,6 +1086,35 @@ class GatewayStreamConsumer: and self.cfg.cursor in text and len(_visible_stripped) < _MIN_NEW_MSG_CHARS): return True # too short for a standalone message — accumulate more + + # Native draft streaming: route mid-stream frames through send_draft. + # The final answer is delivered via the regular sendMessage path + # below — drafts have no message_id so we can't finalize them + # in-place; the regular sendMessage clears the draft naturally on + # the client and gives the user a real message in their history. + # Skip when: + # * finalize=True (this is the final answer; needs to be a real message) + # * an edit path is already established (message_id is set, e.g. after + # a tool-boundary segment break where the prior text was finalized + # as a real sendMessage and the next text segment continues editing + # that one — staying on edit-based for that segment is correct). + if ( + self._use_draft_streaming + and not finalize + and self._message_id is None + ): + # No-op skip: identical to the last frame we sent. + if text == self._last_sent_text: + return True + ok = await self._send_draft_frame(text) + if ok: + # Drafts mark "we put something on screen" but DO NOT set + # _already_sent — that flag gates the gateway's fallback + # final-send path and we still need that to fire so the + # user gets a real message (drafts have no message_id). + return True + # Failure already disabled drafts for this run; fall through to + # the regular edit/send path below. try: if self._message_id is not None: if self._edit_supported: