feat(telegram): native draft streaming via sendMessageDraft (Bot API 9.5+)

Adds Telegram's native streaming-draft API as a streaming transport so DM
replies render with smooth animated previews as tokens arrive, dropping
the per-edit jitter of the legacy editMessageText polling path.

Adapter contract (gateway/platforms/base.py):
  - supports_draft_streaming(chat_type, metadata) -> bool. Default False.
    Telegram returns True only for DMs and only when the bound python-
    telegram-bot version exposes Bot.send_message_draft (PTB 22.6+).
  - send_draft(chat_id, draft_id, content, metadata) -> SendResult.
    Default raises NotImplementedError. Telegram delegates to PTB's
    send_message_draft. Drafts have no message_id (Bot API contract);
    SendResult.message_id is None on success.

Telegram adapter (gateway/platforms/telegram.py):
  - supports_draft_streaming gates on chat_type='dm' AND PTB capability.
  - send_draft trims to MAX_MESSAGE_LENGTH using utf16_len, threads
    message_thread_id through metadata, and routes failures back as
    SendResult(success=False, error=...) so the consumer can fall back.

Stream consumer (gateway/stream_consumer.py):
  - StreamConsumerConfig gains transport ('auto'|'draft'|'edit'|'off')
    and chat_type fields.
  - run() resolves _use_draft_streaming once via a probe at the top of
    the run, allocating a fresh class-wide draft_id_counter so each
    response animates as its own preview (no animation collision across
    consecutive responses to the same chat).
  - _send_or_edit gains a pre-edit branch: when drafts are active AND
    not finalizing AND no edit-path message_id is established, the
    frame routes through _send_draft_frame instead of edit_message.
    Drafts intentionally do NOT set _already_sent so the gateway's
    final sendMessage path still fires — drafts have no message_id and
    the user needs a real message in their chat history.
  - _reset_segment_state bumps the draft_id when the consumer is in
    draft mode so each text block after a tool boundary animates as a
    fresh preview below the tool-progress bubble (avoids the inter-
    tool-call leak openclaw documented in their #32535).
  - Per-response fallback: any send_draft failure (transient network,
    server reject, capability gap) flips _use_draft_streaming to False
    for the rest of the run, gracefully returning to the edit path.

Gateway config (gateway/config.py):
  - StreamingConfig.transport default flips edit -> auto. The auto path
    is identical to edit on every chat type that doesn't currently
    support drafts (groups, supergroups, forum topics, every non-
    Telegram platform), so the default is backwards-compatible for
    non-DM users.

Lifecycle model (Telegram Bot API 9.5):
  1. sendMessageDraft(chat_id, draft_id, text='') opens the bubble.
  2. Repeated sendMessageDraft calls with the SAME draft_id animate
     the preview as text grows.
  3. Drafts have no message_id and cannot be edited or deleted.
  4. When the response finishes the gateway's normal sendMessage path
     delivers the final answer; the draft preview clears naturally on
     the client and the user sees a real message in their history.

Inspired by PR #3412 by @NivOO5. Re-authored against current main
(stream_consumer.py is now ~4x larger than at #3412's branch base, with
new _NEW_SEGMENT/_COMMENTARY/finalize/_on_new_message machinery the
original PR didn't account for) but the design call (DM-only, edit-
fallback, transport=auto|draft|edit|off) is faithful to the original
proposal, with two improvements baked in:

  1. Per-response draft_id (monotonic counter, not a time hash) — no
     collision risk across consecutive responses on the same chat.
  2. Tool-boundary draft_id bump — prevents the inter-tool-call leak
     openclaw hit during their rollout (their #32535).

Closes #21439 (duplicate feature request).
This commit is contained in:
NivOO5 2026-05-10 19:12:31 -07:00 committed by Teknium
parent 80bb5f2947
commit 4ed293b38e
5 changed files with 298 additions and 2 deletions

View file

@ -321,7 +321,15 @@ class PlatformConfig:
class StreamingConfig:
"""Configuration for real-time token streaming to messaging platforms."""
enabled: bool = False
transport: str = "edit" # "edit" (progressive editMessageText) or "off"
# Transport selection:
# "auto" — prefer native streaming-draft updates when the platform
# supports them (Telegram sendMessageDraft, Bot API 9.5+);
# fall back to edit-based when not. Recommended.
# "draft" — explicitly request native drafts; falls back to edit when
# the platform/chat doesn't support them.
# "edit" — progressive editMessageText only (legacy behaviour).
# "off" — disable streaming entirely.
transport: str = "auto"
edit_interval: float = 1.0 # Seconds between message edits (Telegram rate-limits at ~1/s)
buffer_threshold: int = 40 # Chars before forcing an edit
cursor: str = "" # Cursor shown during streaming
@ -350,7 +358,7 @@ class StreamingConfig:
return cls()
return cls(
enabled=_coerce_bool(data.get("enabled"), False),
transport=data.get("transport", "edit"),
transport=data.get("transport", "auto"),
edit_interval=_coerce_float(data.get("edit_interval"), 1.0),
buffer_threshold=_coerce_int(data.get("buffer_threshold"), 40),
cursor=data.get("cursor", ""),

View file

@ -1320,6 +1320,52 @@ class BasePlatformAdapter(ABC):
"""
return len
def supports_draft_streaming(
self,
chat_type: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> bool:
"""Whether this adapter supports native streaming-draft updates.
Telegram Bot API 9.5 introduced ``sendMessageDraft``, which renders an
animated streaming preview as the bot calls it repeatedly with the
same ``draft_id`` and growing text. Adapters that implement
``send_draft`` should return True here for the chat types where the
platform supports it (Telegram restricts drafts to private DMs).
Default implementation returns False. Stream consumers fall back to
the edit-based path (``send`` + ``edit_message``) when this returns
False or when ``send_draft`` raises.
"""
return False
async def send_draft(
self,
chat_id: str,
draft_id: int,
content: str,
metadata: Optional[Dict[str, Any]] = None,
) -> SendResult:
"""Send or update an animated streaming-draft preview.
Reuse the same ``draft_id`` (any non-zero int) across consecutive
calls within a single response so the platform animates the preview
rather than re-creating it. Different responses must use different
``draft_id`` values within the same chat to avoid animating over a
prior bubble.
Drafts have no message_id and cannot be edited, replied to, or
deleted via normal message APIs. When the response finishes, the
caller delivers the final answer as a regular ``send`` and the
draft preview clears naturally on the client.
Default implementation raises NotImplementedError; adapters that
also return True from :meth:`supports_draft_streaming` must override.
"""
raise NotImplementedError(
f"{type(self).__name__} does not implement send_draft"
)
@property
def has_fatal_error(self) -> bool:
return self._fatal_error_message is not None

View file

@ -1686,6 +1686,77 @@ class TelegramAdapter(BasePlatformAdapter):
)
return False
def supports_draft_streaming(
self,
chat_type: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> bool:
"""Telegram supports sendMessageDraft for private chats only.
Bot API 9.5 (March 2026) opened ``sendMessageDraft`` to all bots
unconditionally for private (DM) chats. Groups, supergroups, and
channels still rely on the edit-based path.
We additionally require ``self._bot`` to expose ``send_message_draft``
(added to python-telegram-bot in 22.6); older PTB installs gracefully
fall back to the edit path even on DMs.
"""
if not self._bot or not hasattr(self._bot, "send_message_draft"):
return False
return (chat_type or "").lower() in ("dm", "private")
async def send_draft(
self,
chat_id: str,
draft_id: int,
content: str,
metadata: Optional[Dict[str, Any]] = None,
) -> SendResult:
"""Stream a partial message via Telegram's native sendMessageDraft.
The Bot API animates the preview when the same ``draft_id`` is reused
across consecutive calls in the same chat. When the response
finishes, the caller sends the final text via the normal ``send``
path; the draft preview clears naturally on the client (Telegram has
no Bot API to "promote" a draft to a real message the final
``sendMessage`` is what the user receives in their history).
"""
if not self._bot:
return SendResult(success=False, error="not_connected")
if not hasattr(self._bot, "send_message_draft"):
return SendResult(success=False, error="api_unavailable")
# Trim to the same UTF-16 budget the platform enforces on regular
# sends. Drafts have the same length contract as messages.
text = content if len(content) <= self.MAX_MESSAGE_LENGTH else \
self.truncate_message(content, self.MAX_MESSAGE_LENGTH, len_fn=utf16_len)[0]
kwargs: Dict[str, Any] = {
"chat_id": int(chat_id),
"draft_id": int(draft_id),
"text": text,
}
thread_id = self._metadata_thread_id(metadata)
if thread_id is not None:
kwargs["message_thread_id"] = thread_id
try:
ok = await self._bot.send_message_draft(**kwargs)
if ok:
# Drafts have no message_id; we report success without one
# so the caller knows the animation frame landed.
return SendResult(success=True, message_id=None)
return SendResult(success=False, error="draft_rejected")
except Exception as e:
# Most likely: BadRequest because this bot/chat doesn't allow
# drafts, or a transient server hiccup. The caller treats any
# failure as "fall back to edit-based for this response".
logger.debug(
"[%s] sendMessageDraft failed (chat=%s draft_id=%s): %s",
self.name, chat_id, draft_id, e,
)
return SendResult(success=False, error=str(e))
async def _send_message_with_thread_fallback(self, **kwargs):
"""Send a Telegram message, retrying once without message_thread_id
if Telegram returns 'Message thread not found'.

View file

@ -13984,6 +13984,8 @@ class GatewayRunner:
cursor=_effective_cursor,
buffer_only=_buffer_only,
fresh_final_after_seconds=_fresh_final_secs,
transport=_scfg.transport or "auto",
chat_type=getattr(source, "chat_type", "") or "",
)
_stream_consumer = GatewayStreamConsumer(
adapter=_adapter,
@ -14805,6 +14807,8 @@ class GatewayRunner:
cursor=_effective_cursor,
buffer_only=_buffer_only,
fresh_final_after_seconds=_fresh_final_secs,
transport=_scfg.transport or "auto",
chat_type=getattr(source, "chat_type", "") or "",
)
_stream_consumer = GatewayStreamConsumer(
adapter=_adapter,

View file

@ -55,6 +55,18 @@ class StreamConsumerConfig:
# openclaw/openclaw#72038. Default 0 = always edit in place (legacy
# behavior). The gateway enables this selectively per-platform.
fresh_final_after_seconds: float = 0.0
# Streaming transport selection:
# "auto" — prefer native draft streaming (e.g. Telegram sendMessageDraft)
# when the adapter + chat supports it; fall back to edit.
# "draft" — explicitly request native draft streaming; fall back to
# edit when unsupported.
# "edit" — progressive editMessageText (legacy behavior).
# "off" — handled by the gateway before the consumer is even built.
transport: str = "auto"
# Hint for the consumer about the originating chat type (e.g. "dm",
# "group", "supergroup", "forum"). Used to gate native draft streaming,
# which is platform-specific (Telegram drafts are DM-only).
chat_type: str = ""
class GatewayStreamConsumer:
@ -88,6 +100,11 @@ class GatewayStreamConsumer:
"</THINKING>", "</thinking>", "</thought>",
)
# Class-wide monotonic counter for native-streaming draft ids. Telegram
# animates a draft when the same draft_id is reused across consecutive
# calls in the same chat, so we need a fresh non-zero id per response.
_draft_id_counter: int = 0
def __init__(
self,
adapter: Any,
@ -141,6 +158,20 @@ class GatewayStreamConsumer:
self._in_think_block = False
self._think_buffer = ""
# Native draft-streaming state. Resolved at the start of run() based
# on cfg.transport, cfg.chat_type, and the adapter's
# supports_draft_streaming() probe. When True, the consumer emits
# animated draft frames via adapter.send_draft instead of progressive
# edits via adapter.edit_message. The final answer still goes
# through the normal first-send path so the user gets a real message
# in their chat history (drafts have no message_id).
self._use_draft_streaming = False
self._draft_id: Optional[int] = None
# Cumulative draft-frame failure count for this consumer. After the
# first failure we permanently disable drafts for the remainder of
# this response and route through edit-based for graceful degradation.
self._draft_failures = 0
@property
def already_sent(self) -> bool:
"""True if at least one message was sent or edited during the run."""
@ -179,6 +210,16 @@ class GatewayStreamConsumer:
self._last_sent_text = ""
self._fallback_final_send = False
self._fallback_prefix = ""
# Native draft streaming: bump the draft_id so the next text segment
# animates as a fresh preview below the tool-progress bubbles, not
# over the prior segment's already-finalized draft. This is how
# we avoid the "inter-tool-call text leak" failure mode openclaw
# documented in their issue #32535 — each text block becomes its
# own visible message via the finalize, then a new draft animates
# for the next one.
if self._use_draft_streaming:
type(self)._draft_id_counter += 1
self._draft_id = type(self)._draft_id_counter
def on_delta(self, text: str) -> None:
"""Thread-safe callback — called from the agent's worker thread.
@ -317,6 +358,20 @@ class GatewayStreamConsumer:
_raw_limit = getattr(self.adapter, "MAX_MESSAGE_LENGTH", 4096)
_safe_limit = max(500, _raw_limit - _len_fn(self.cfg.cursor) - 100)
# Resolve native draft streaming once per run. When enabled the
# consumer routes mid-stream frames through adapter.send_draft and
# leaves _message_id=None so the existing got_done path delivers the
# final answer as a regular sendMessage (drafts have no message_id
# to edit).
self._use_draft_streaming = self._resolve_draft_streaming()
if self._use_draft_streaming:
type(self)._draft_id_counter += 1
self._draft_id = type(self)._draft_id_counter
logger.debug(
"Stream consumer using native-draft transport (chat=%s draft_id=%s)",
self.chat_id, self._draft_id,
)
try:
while True:
# Drain all available items from the queue
@ -754,6 +809,89 @@ class GatewayStreamConsumer:
err_lower = err.lower()
return "flood" in err_lower or "retry after" in err_lower or "rate" in err_lower
def _resolve_draft_streaming(self) -> bool:
"""Decide whether this run should use native draft streaming.
Honors ``cfg.transport``:
* ``"edit"`` never use drafts (legacy progressive-edit path).
* ``"draft"`` require draft support; gracefully fall back to edit
when the adapter declines. Logs the downgrade at debug.
* ``"auto"`` use drafts when the adapter supports them for this
chat type; otherwise edit.
Adapter eligibility is checked via
:meth:`BasePlatformAdapter.supports_draft_streaming`, which considers
the chat type (e.g. Telegram drafts are DM-only) and platform-version
gates (e.g. python-telegram-bot 22.6+).
"""
transport = (self.cfg.transport or "auto").lower()
if transport == "edit":
return False
# "off" is filtered upstream by the gateway; treat as edit defensively.
if transport == "off":
return False
# Test adapters are MagicMocks that don't subclass BasePlatformAdapter;
# default them to edit so existing test behaviour is preserved.
if not isinstance(self.adapter, _BasePlatformAdapter):
return False
try:
supported = self.adapter.supports_draft_streaming(
chat_type=self.cfg.chat_type or None,
metadata=self.metadata,
)
except Exception:
logger.debug("supports_draft_streaming probe raised", exc_info=True)
supported = False
if not supported:
if transport == "draft":
logger.debug(
"Draft streaming requested but unsupported (chat=%s, type=%r) — "
"falling back to edit",
self.chat_id, self.cfg.chat_type,
)
return False
return True
async def _send_draft_frame(self, text: str) -> bool:
"""Emit a single animated draft frame for the current accumulated text.
Returns True when the frame landed. On any failure, permanently
disables drafts for the remainder of this run so subsequent frames
flow through the edit-based path (which can adapt with flood-control
backoff, etc.). Drafts have no message_id and clear naturally on
the client when the response finalizes via a regular sendMessage.
"""
if self._draft_id is None:
# Defensive: should never happen — _use_draft_streaming gate is
# set in tandem with _draft_id in run(). Disable to be safe.
self._use_draft_streaming = False
return False
try:
result = await self.adapter.send_draft(
chat_id=self.chat_id,
draft_id=self._draft_id,
content=text,
metadata=self.metadata,
)
except Exception as e:
logger.debug(
"send_draft raised, disabling draft transport for this run: %s", e,
)
self._draft_failures += 1
self._use_draft_streaming = False
return False
if not getattr(result, "success", False):
logger.debug(
"send_draft returned success=False, disabling draft transport: %s",
getattr(result, "error", "unknown"),
)
self._draft_failures += 1
self._use_draft_streaming = False
return False
# Frame delivered. Track text for parity with edit-based no-op skip.
self._last_sent_text = text
return True
async def _flush_segment_tail_on_edit_failure(self) -> None:
"""Deliver un-sent tail content before a segment-break reset.
@ -948,6 +1086,35 @@ class GatewayStreamConsumer:
and self.cfg.cursor in text
and len(_visible_stripped) < _MIN_NEW_MSG_CHARS):
return True # too short for a standalone message — accumulate more
# Native draft streaming: route mid-stream frames through send_draft.
# The final answer is delivered via the regular sendMessage path
# below — drafts have no message_id so we can't finalize them
# in-place; the regular sendMessage clears the draft naturally on
# the client and gives the user a real message in their history.
# Skip when:
# * finalize=True (this is the final answer; needs to be a real message)
# * an edit path is already established (message_id is set, e.g. after
# a tool-boundary segment break where the prior text was finalized
# as a real sendMessage and the next text segment continues editing
# that one — staying on edit-based for that segment is correct).
if (
self._use_draft_streaming
and not finalize
and self._message_id is None
):
# No-op skip: identical to the last frame we sent.
if text == self._last_sent_text:
return True
ok = await self._send_draft_frame(text)
if ok:
# Drafts mark "we put something on screen" but DO NOT set
# _already_sent — that flag gates the gateway's fallback
# final-send path and we still need that to fire so the
# user gets a real message (drafts have no message_id).
return True
# Failure already disabled drafts for this run; fall through to
# the regular edit/send path below.
try:
if self._message_id is not None:
if self._edit_supported: