test(telegram): native-draft transport coverage + docs

Added tests/gateway/test_stream_consumer_draft.py with 11 tests
covering:
- Transport selection: auto+dm-supported -> draft; auto+group -> edit;
  explicit edit; explicit draft on unsupported adapter -> edit;
  MagicMock adapter -> edit (back-compat for the existing test suite).
- Happy path: DM stream animates draft frames with a single shared
  draft_id, then finalizes via a regular adapter.send.
- Group fallback: drafts entirely skipped in non-DM chats.
- Failure fallback: send_draft returning success=False disables drafts
  for the rest of the response.
- Draft_id lifecycle: consecutive responses use distinct ids; tool
  boundaries bump the id so post-tool text animates fresh below the
  tool-progress bubble (the openclaw #32535 leak guard).
- _already_sent contract: drafts must NOT set the flag so the gateway's
  fallback final-send still fires (drafts have no message_id).

Updated website/docs/user-guide/messaging/telegram.md with a
'Streaming transport' section explaining auto|draft|edit|off, the
DM-only constraint, and the per-response fallback behaviour.
This commit is contained in:
teknium1 2026-05-10 19:12:39 -07:00 committed by Teknium
parent 4ed293b38e
commit 7f90141c63
2 changed files with 345 additions and 1 deletions

View file

@ -0,0 +1,318 @@
"""Tests for native draft streaming in GatewayStreamConsumer.
Telegram Bot API 9.5 (March 2026) introduced sendMessageDraft for native
animated streaming previews in private chats. This test suite covers the
consumer's transport-selection, fallback, and tool-boundary handling for
that path.
Adapter under test is a runtime subclass of BasePlatformAdapter that
overrides supports_draft_streaming + send_draft, since the consumer's
isinstance(BasePlatformAdapter) gate excludes plain MagicMocks.
"""
from __future__ import annotations
import asyncio
from types import SimpleNamespace
from unittest.mock import AsyncMock, MagicMock
import pytest
from gateway.stream_consumer import (
GatewayStreamConsumer,
StreamConsumerConfig,
)
def _make_draft_capable_adapter(
*, supports_draft: bool = True, draft_succeeds: bool = True,
):
"""Build a minimal BasePlatformAdapter subclass with draft support.
The runtime subclass + cleared __abstractmethods__ pattern lets us
construct an adapter without hauling in any platform's heavy state
(Telegram bot, Discord client, etc.) while still satisfying the
consumer's isinstance(BasePlatformAdapter) gate.
"""
from gateway.platforms.base import BasePlatformAdapter, SendResult
DraftCapableAdapter = type(
"DraftCapableAdapter",
(BasePlatformAdapter,),
{"MAX_MESSAGE_LENGTH": 4096},
)
DraftCapableAdapter.__abstractmethods__ = frozenset()
adapter = DraftCapableAdapter.__new__(DraftCapableAdapter)
adapter._typing_paused = set()
adapter._fatal_error_message = None
# Track every send_draft call for assertions.
adapter.draft_calls = []
def _supports(chat_type=None, metadata=None):
return bool(supports_draft) and (chat_type or "").lower() == "dm"
adapter.supports_draft_streaming = _supports
async def _send_draft(*, chat_id, draft_id, content, metadata=None):
adapter.draft_calls.append({
"chat_id": chat_id,
"draft_id": draft_id,
"content": content,
"metadata": metadata,
})
if draft_succeeds:
return SendResult(success=True, message_id=None)
return SendResult(success=False, error="draft_rejected")
adapter.send_draft = _send_draft
# send / edit_message: count and return canned successes so the
# consumer's first-send + finalize paths work when drafts fall back
# or when delivering the final message.
adapter.send = AsyncMock(
return_value=SimpleNamespace(success=True, message_id="msg_real"),
)
adapter.edit_message = AsyncMock(
return_value=SimpleNamespace(success=True),
)
return adapter
class TestDraftTransportSelection:
"""Verify _resolve_draft_streaming picks the right transport."""
def test_auto_dm_with_draft_capable_adapter_picks_draft(self):
adapter = _make_draft_capable_adapter()
cfg = StreamConsumerConfig(transport="auto", chat_type="dm")
consumer = GatewayStreamConsumer(adapter, "12345", cfg)
assert consumer._resolve_draft_streaming() is True
def test_auto_group_falls_back_to_edit(self):
adapter = _make_draft_capable_adapter()
cfg = StreamConsumerConfig(transport="auto", chat_type="group")
consumer = GatewayStreamConsumer(adapter, "12345", cfg)
assert consumer._resolve_draft_streaming() is False
def test_explicit_edit_never_uses_drafts(self):
adapter = _make_draft_capable_adapter()
cfg = StreamConsumerConfig(transport="edit", chat_type="dm")
consumer = GatewayStreamConsumer(adapter, "12345", cfg)
assert consumer._resolve_draft_streaming() is False
def test_explicit_draft_unsupported_falls_back(self):
adapter = _make_draft_capable_adapter(supports_draft=False)
cfg = StreamConsumerConfig(transport="draft", chat_type="dm")
consumer = GatewayStreamConsumer(adapter, "12345", cfg)
assert consumer._resolve_draft_streaming() is False
def test_magicmock_adapter_falls_back_to_edit(self):
"""MagicMock adapters (used in many existing tests) must default to
edit-based since their auto-attributes aren't real callables."""
adapter = MagicMock()
cfg = StreamConsumerConfig(transport="auto", chat_type="dm")
consumer = GatewayStreamConsumer(adapter, "12345", cfg)
assert consumer._resolve_draft_streaming() is False
class TestDraftStreamingHappyPath:
"""End-to-end: stream a few deltas in a DM, verify drafts animated and
the final message was delivered as a real sendMessage."""
@pytest.mark.asyncio
async def test_dm_stream_animates_draft_then_finalizes_with_send(self):
adapter = _make_draft_capable_adapter()
cfg = StreamConsumerConfig(
transport="auto", chat_type="dm",
edit_interval=0.01, buffer_threshold=5, cursor="",
)
consumer = GatewayStreamConsumer(adapter, "12345", cfg)
consumer.on_delta("Hello ")
task = asyncio.create_task(consumer.run())
await asyncio.sleep(0.05)
consumer.on_delta("world!")
await asyncio.sleep(0.05)
consumer.finish()
await task
# At least one draft frame landed.
assert len(adapter.draft_calls) >= 1, (
"expected at least one send_draft frame"
)
# Final draft frame held the full accumulated text.
assert adapter.draft_calls[-1]["content"] == "Hello world!"
# All draft frames in this run shared a single draft_id (animation).
draft_ids = {c["draft_id"] for c in adapter.draft_calls}
assert len(draft_ids) == 1
# Final answer was delivered as a regular sendMessage so the user
# sees a real message in their history (drafts have no message_id).
adapter.send.assert_awaited()
# And the final send carried the complete reply.
final_call = adapter.send.call_args
sent_content = (
final_call.kwargs.get("content")
if "content" in final_call.kwargs
else final_call.args[1] if len(final_call.args) > 1 else None
)
assert sent_content == "Hello world!"
@pytest.mark.asyncio
async def test_group_chat_skips_draft_path(self):
adapter = _make_draft_capable_adapter()
cfg = StreamConsumerConfig(
transport="auto", chat_type="group",
edit_interval=0.01, buffer_threshold=5, cursor="",
)
consumer = GatewayStreamConsumer(adapter, "67890", cfg)
consumer.on_delta("Group message")
task = asyncio.create_task(consumer.run())
await asyncio.sleep(0.05)
consumer.finish()
await task
# Group chats skip drafts entirely — no send_draft calls at all.
assert adapter.draft_calls == []
# Edit-based path delivered via send (first message).
adapter.send.assert_awaited()
class TestDraftFallbackOnFailure:
"""When a draft frame fails, the consumer disables drafts for the rest
of the response and continues via the edit-based path."""
@pytest.mark.asyncio
async def test_first_draft_failure_disables_drafts_for_run(self):
adapter = _make_draft_capable_adapter(draft_succeeds=False)
cfg = StreamConsumerConfig(
transport="auto", chat_type="dm",
edit_interval=0.01, buffer_threshold=5, cursor="",
)
consumer = GatewayStreamConsumer(adapter, "12345", cfg)
consumer.on_delta("Hello ")
task = asyncio.create_task(consumer.run())
await asyncio.sleep(0.05)
consumer.on_delta("world!")
await asyncio.sleep(0.05)
consumer.finish()
await task
# The consumer attempted draft, hit failure, disabled drafts.
assert consumer._draft_failures >= 1
assert consumer._use_draft_streaming is False
# Final message delivered via the regular send path.
adapter.send.assert_awaited()
class TestDraftIdLifecycle:
"""Each response gets its own draft_id (no animation collision across
consecutive responses to the same chat)."""
@pytest.mark.asyncio
async def test_consecutive_responses_use_distinct_draft_ids(self):
adapter = _make_draft_capable_adapter()
cfg1 = StreamConsumerConfig(
transport="auto", chat_type="dm",
edit_interval=0.01, buffer_threshold=5, cursor="",
)
consumer1 = GatewayStreamConsumer(adapter, "12345", cfg1)
consumer1.on_delta("First reply")
task1 = asyncio.create_task(consumer1.run())
await asyncio.sleep(0.05)
consumer1.finish()
await task1
cfg2 = StreamConsumerConfig(
transport="auto", chat_type="dm",
edit_interval=0.01, buffer_threshold=5, cursor="",
)
consumer2 = GatewayStreamConsumer(adapter, "12345", cfg2)
consumer2.on_delta("Second reply")
task2 = asyncio.create_task(consumer2.run())
await asyncio.sleep(0.05)
consumer2.finish()
await task2
# Two responses → two distinct draft_ids.
all_ids = {c["draft_id"] for c in adapter.draft_calls}
assert len(all_ids) >= 2, (
f"expected distinct draft_ids across responses; got {all_ids}"
)
# Every draft_id must be non-zero (Telegram's contract).
assert all(did != 0 for did in all_ids)
@pytest.mark.asyncio
async def test_tool_boundary_bumps_draft_id(self):
"""After a segment break (tool boundary), the next text segment
animates via a new draft_id so it appears below the tool-progress
bubble rather than overwriting the prior segment's preview."""
adapter = _make_draft_capable_adapter()
cfg = StreamConsumerConfig(
transport="auto", chat_type="dm",
edit_interval=0.01, buffer_threshold=5, cursor="",
)
consumer = GatewayStreamConsumer(adapter, "12345", cfg)
consumer.on_delta("Pre-tool ")
task = asyncio.create_task(consumer.run())
await asyncio.sleep(0.05)
# Tool boundary
consumer.on_segment_break()
await asyncio.sleep(0.05)
consumer.on_delta("Post-tool")
await asyncio.sleep(0.05)
consumer.finish()
await task
# Pre-tool and post-tool segments must use different draft_ids.
draft_ids = [c["draft_id"] for c in adapter.draft_calls]
if len(draft_ids) >= 2:
# Find pre-tool and post-tool calls by content
pre_ids = {
c["draft_id"] for c in adapter.draft_calls
if "Pre-tool" in c["content"] and "Post-tool" not in c["content"]
}
post_ids = {
c["draft_id"] for c in adapter.draft_calls
if "Post-tool" in c["content"]
}
if pre_ids and post_ids:
assert pre_ids.isdisjoint(post_ids), (
f"pre-tool and post-tool segments must use distinct "
f"draft_ids; got pre={pre_ids} post={post_ids}"
)
class TestAlreadySentInDraftMode:
"""Drafts must NOT mark _already_sent — that flag gates the gateway's
fallback final-send path, which we still need to fire so the user gets
a real message in their history (drafts have no message_id)."""
@pytest.mark.asyncio
async def test_drafts_do_not_set_already_sent_until_real_message(self):
adapter = _make_draft_capable_adapter()
cfg = StreamConsumerConfig(
transport="auto", chat_type="dm",
edit_interval=0.01, buffer_threshold=5, cursor="",
)
consumer = GatewayStreamConsumer(adapter, "12345", cfg)
consumer.on_delta("Hello")
# Drive the consumer for a bit but DON'T finish — only drafts have
# been sent.
task = asyncio.create_task(consumer.run())
await asyncio.sleep(0.05)
# At this point drafts may have fired but we haven't finalized.
# _already_sent must still be False so a downstream fallback would
# know it needs to deliver the final answer.
if adapter.draft_calls:
assert consumer._already_sent is False, (
"drafts wrongly marked _already_sent — "
"would suppress gateway fallback delivery"
)
consumer.finish()
await task
# After the regular sendMessage finalize, _already_sent is True.
assert consumer._already_sent is True

View file

@ -611,7 +611,33 @@ To find a topic's `thread_id`, open the topic in Telegram Web or Desktop and loo
- **Bot API 9.4 (Feb 2026):** Private Chat Topics — bots can create forum topics in 1-on-1 DM chats via `createForumTopic`. Hermes uses this for two distinct features: operator-curated [Private Chat Topics](#private-chat-topics-bot-api-94) (config-driven, fixed topic list) and user-driven [Multi-session DM mode](#multi-session-dm-mode-topic) (activated by `/topic`, unlimited user-created topics).
- **Privacy policy:** Telegram now requires bots to have a privacy policy. Set one via BotFather with `/setprivacy_policy`, or Telegram may auto-generate a placeholder. This is particularly important if your bot is public-facing.
- **Message streaming:** Bot API 9.x added support for streaming long responses, which can improve perceived latency for lengthy agent replies.
- **Bot API 9.5 (Mar 2026): Native streaming via `sendMessageDraft`.** Hermes uses Telegram's native streaming-draft API to render an animated preview of the agent's reply as tokens arrive in private chats. Drops the per-edit jitter you used to see with the legacy `editMessageText` polling path on slow models.
### Streaming transport (`gateway.streaming.transport`)
When streaming is enabled (`gateway.streaming.enabled: true`), Hermes picks one of four transports:
| Value | Behaviour |
|---|---|
| `auto` (default) | Native draft streaming on supported chats (currently Telegram DMs); legacy edit-based path otherwise. Falls back gracefully if a draft frame fails. |
| `draft` | Force native drafts. Logs a downgrade and falls back to edit if the chat doesn't support drafts (e.g. groups/topics). |
| `edit` | Legacy progressive `editMessageText` polling for every chat type. |
| `off` | Disable streaming entirely (final reply only, no progressive updates). |
In `~/.hermes/config.yaml`:
```yaml
gateway:
streaming:
enabled: true
transport: auto # auto | draft | edit | off
```
**What you'll see in DMs with `auto` (default)** — when the agent generates a reply, Telegram shows an animated draft preview that updates token-by-token. When the reply finishes, it's delivered as a regular message and the draft preview clears naturally on the client. Drafts have no message id, so the final answer is what stays in your chat history.
**What about groups, supergroups, forum topics?** Telegram restricts `sendMessageDraft` to private chats (DMs). The gateway transparently falls back to the edit-based path for everything else — same UX as before.
**What if a draft frame fails?** Any failure (transient network error, server-side rejection, older python-telegram-bot install) flips that response back to the edit-based path for the rest of the stream. The next response gets a fresh attempt.
## Rendering: Tables and Link Previews