From 7f90141c6344e82d64e25f9f3cdbcdcb90ecbed9 Mon Sep 17 00:00:00 2001 From: teknium1 <127238744+teknium1@users.noreply.github.com> Date: Sun, 10 May 2026 19:12:39 -0700 Subject: [PATCH] test(telegram): native-draft transport coverage + docs Added tests/gateway/test_stream_consumer_draft.py with 11 tests covering: - Transport selection: auto+dm-supported -> draft; auto+group -> edit; explicit edit; explicit draft on unsupported adapter -> edit; MagicMock adapter -> edit (back-compat for the existing test suite). - Happy path: DM stream animates draft frames with a single shared draft_id, then finalizes via a regular adapter.send. - Group fallback: drafts entirely skipped in non-DM chats. - Failure fallback: send_draft returning success=False disables drafts for the rest of the response. - Draft_id lifecycle: consecutive responses use distinct ids; tool boundaries bump the id so post-tool text animates fresh below the tool-progress bubble (the openclaw #32535 leak guard). - _already_sent contract: drafts must NOT set the flag so the gateway's fallback final-send still fires (drafts have no message_id). Updated website/docs/user-guide/messaging/telegram.md with a 'Streaming transport' section explaining auto|draft|edit|off, the DM-only constraint, and the per-response fallback behaviour. --- tests/gateway/test_stream_consumer_draft.py | 318 ++++++++++++++++++ website/docs/user-guide/messaging/telegram.md | 28 +- 2 files changed, 345 insertions(+), 1 deletion(-) create mode 100644 tests/gateway/test_stream_consumer_draft.py diff --git a/tests/gateway/test_stream_consumer_draft.py b/tests/gateway/test_stream_consumer_draft.py new file mode 100644 index 00000000000..bab8e20fd35 --- /dev/null +++ b/tests/gateway/test_stream_consumer_draft.py @@ -0,0 +1,318 @@ +"""Tests for native draft streaming in GatewayStreamConsumer. + +Telegram Bot API 9.5 (March 2026) introduced sendMessageDraft for native +animated streaming previews in private chats. This test suite covers the +consumer's transport-selection, fallback, and tool-boundary handling for +that path. + +Adapter under test is a runtime subclass of BasePlatformAdapter that +overrides supports_draft_streaming + send_draft, since the consumer's +isinstance(BasePlatformAdapter) gate excludes plain MagicMocks. +""" + +from __future__ import annotations + +import asyncio +from types import SimpleNamespace +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from gateway.stream_consumer import ( + GatewayStreamConsumer, + StreamConsumerConfig, +) + + +def _make_draft_capable_adapter( + *, supports_draft: bool = True, draft_succeeds: bool = True, +): + """Build a minimal BasePlatformAdapter subclass with draft support. + + The runtime subclass + cleared __abstractmethods__ pattern lets us + construct an adapter without hauling in any platform's heavy state + (Telegram bot, Discord client, etc.) while still satisfying the + consumer's isinstance(BasePlatformAdapter) gate. + """ + from gateway.platforms.base import BasePlatformAdapter, SendResult + + DraftCapableAdapter = type( + "DraftCapableAdapter", + (BasePlatformAdapter,), + {"MAX_MESSAGE_LENGTH": 4096}, + ) + DraftCapableAdapter.__abstractmethods__ = frozenset() + adapter = DraftCapableAdapter.__new__(DraftCapableAdapter) + adapter._typing_paused = set() + adapter._fatal_error_message = None + + # Track every send_draft call for assertions. + adapter.draft_calls = [] + + def _supports(chat_type=None, metadata=None): + return bool(supports_draft) and (chat_type or "").lower() == "dm" + adapter.supports_draft_streaming = _supports + + async def _send_draft(*, chat_id, draft_id, content, metadata=None): + adapter.draft_calls.append({ + "chat_id": chat_id, + "draft_id": draft_id, + "content": content, + "metadata": metadata, + }) + if draft_succeeds: + return SendResult(success=True, message_id=None) + return SendResult(success=False, error="draft_rejected") + adapter.send_draft = _send_draft + + # send / edit_message: count and return canned successes so the + # consumer's first-send + finalize paths work when drafts fall back + # or when delivering the final message. + adapter.send = AsyncMock( + return_value=SimpleNamespace(success=True, message_id="msg_real"), + ) + adapter.edit_message = AsyncMock( + return_value=SimpleNamespace(success=True), + ) + return adapter + + +class TestDraftTransportSelection: + """Verify _resolve_draft_streaming picks the right transport.""" + + def test_auto_dm_with_draft_capable_adapter_picks_draft(self): + adapter = _make_draft_capable_adapter() + cfg = StreamConsumerConfig(transport="auto", chat_type="dm") + consumer = GatewayStreamConsumer(adapter, "12345", cfg) + assert consumer._resolve_draft_streaming() is True + + def test_auto_group_falls_back_to_edit(self): + adapter = _make_draft_capable_adapter() + cfg = StreamConsumerConfig(transport="auto", chat_type="group") + consumer = GatewayStreamConsumer(adapter, "12345", cfg) + assert consumer._resolve_draft_streaming() is False + + def test_explicit_edit_never_uses_drafts(self): + adapter = _make_draft_capable_adapter() + cfg = StreamConsumerConfig(transport="edit", chat_type="dm") + consumer = GatewayStreamConsumer(adapter, "12345", cfg) + assert consumer._resolve_draft_streaming() is False + + def test_explicit_draft_unsupported_falls_back(self): + adapter = _make_draft_capable_adapter(supports_draft=False) + cfg = StreamConsumerConfig(transport="draft", chat_type="dm") + consumer = GatewayStreamConsumer(adapter, "12345", cfg) + assert consumer._resolve_draft_streaming() is False + + def test_magicmock_adapter_falls_back_to_edit(self): + """MagicMock adapters (used in many existing tests) must default to + edit-based since their auto-attributes aren't real callables.""" + adapter = MagicMock() + cfg = StreamConsumerConfig(transport="auto", chat_type="dm") + consumer = GatewayStreamConsumer(adapter, "12345", cfg) + assert consumer._resolve_draft_streaming() is False + + +class TestDraftStreamingHappyPath: + """End-to-end: stream a few deltas in a DM, verify drafts animated and + the final message was delivered as a real sendMessage.""" + + @pytest.mark.asyncio + async def test_dm_stream_animates_draft_then_finalizes_with_send(self): + adapter = _make_draft_capable_adapter() + cfg = StreamConsumerConfig( + transport="auto", chat_type="dm", + edit_interval=0.01, buffer_threshold=5, cursor="", + ) + consumer = GatewayStreamConsumer(adapter, "12345", cfg) + + consumer.on_delta("Hello ") + task = asyncio.create_task(consumer.run()) + await asyncio.sleep(0.05) + consumer.on_delta("world!") + await asyncio.sleep(0.05) + consumer.finish() + await task + + # At least one draft frame landed. + assert len(adapter.draft_calls) >= 1, ( + "expected at least one send_draft frame" + ) + # Final draft frame held the full accumulated text. + assert adapter.draft_calls[-1]["content"] == "Hello world!" + # All draft frames in this run shared a single draft_id (animation). + draft_ids = {c["draft_id"] for c in adapter.draft_calls} + assert len(draft_ids) == 1 + # Final answer was delivered as a regular sendMessage so the user + # sees a real message in their history (drafts have no message_id). + adapter.send.assert_awaited() + # And the final send carried the complete reply. + final_call = adapter.send.call_args + sent_content = ( + final_call.kwargs.get("content") + if "content" in final_call.kwargs + else final_call.args[1] if len(final_call.args) > 1 else None + ) + assert sent_content == "Hello world!" + + @pytest.mark.asyncio + async def test_group_chat_skips_draft_path(self): + adapter = _make_draft_capable_adapter() + cfg = StreamConsumerConfig( + transport="auto", chat_type="group", + edit_interval=0.01, buffer_threshold=5, cursor="", + ) + consumer = GatewayStreamConsumer(adapter, "67890", cfg) + + consumer.on_delta("Group message") + task = asyncio.create_task(consumer.run()) + await asyncio.sleep(0.05) + consumer.finish() + await task + + # Group chats skip drafts entirely — no send_draft calls at all. + assert adapter.draft_calls == [] + # Edit-based path delivered via send (first message). + adapter.send.assert_awaited() + + +class TestDraftFallbackOnFailure: + """When a draft frame fails, the consumer disables drafts for the rest + of the response and continues via the edit-based path.""" + + @pytest.mark.asyncio + async def test_first_draft_failure_disables_drafts_for_run(self): + adapter = _make_draft_capable_adapter(draft_succeeds=False) + cfg = StreamConsumerConfig( + transport="auto", chat_type="dm", + edit_interval=0.01, buffer_threshold=5, cursor="", + ) + consumer = GatewayStreamConsumer(adapter, "12345", cfg) + + consumer.on_delta("Hello ") + task = asyncio.create_task(consumer.run()) + await asyncio.sleep(0.05) + consumer.on_delta("world!") + await asyncio.sleep(0.05) + consumer.finish() + await task + + # The consumer attempted draft, hit failure, disabled drafts. + assert consumer._draft_failures >= 1 + assert consumer._use_draft_streaming is False + # Final message delivered via the regular send path. + adapter.send.assert_awaited() + + +class TestDraftIdLifecycle: + """Each response gets its own draft_id (no animation collision across + consecutive responses to the same chat).""" + + @pytest.mark.asyncio + async def test_consecutive_responses_use_distinct_draft_ids(self): + adapter = _make_draft_capable_adapter() + cfg1 = StreamConsumerConfig( + transport="auto", chat_type="dm", + edit_interval=0.01, buffer_threshold=5, cursor="", + ) + consumer1 = GatewayStreamConsumer(adapter, "12345", cfg1) + consumer1.on_delta("First reply") + task1 = asyncio.create_task(consumer1.run()) + await asyncio.sleep(0.05) + consumer1.finish() + await task1 + + cfg2 = StreamConsumerConfig( + transport="auto", chat_type="dm", + edit_interval=0.01, buffer_threshold=5, cursor="", + ) + consumer2 = GatewayStreamConsumer(adapter, "12345", cfg2) + consumer2.on_delta("Second reply") + task2 = asyncio.create_task(consumer2.run()) + await asyncio.sleep(0.05) + consumer2.finish() + await task2 + + # Two responses → two distinct draft_ids. + all_ids = {c["draft_id"] for c in adapter.draft_calls} + assert len(all_ids) >= 2, ( + f"expected distinct draft_ids across responses; got {all_ids}" + ) + # Every draft_id must be non-zero (Telegram's contract). + assert all(did != 0 for did in all_ids) + + @pytest.mark.asyncio + async def test_tool_boundary_bumps_draft_id(self): + """After a segment break (tool boundary), the next text segment + animates via a new draft_id so it appears below the tool-progress + bubble rather than overwriting the prior segment's preview.""" + adapter = _make_draft_capable_adapter() + cfg = StreamConsumerConfig( + transport="auto", chat_type="dm", + edit_interval=0.01, buffer_threshold=5, cursor="", + ) + consumer = GatewayStreamConsumer(adapter, "12345", cfg) + + consumer.on_delta("Pre-tool ") + task = asyncio.create_task(consumer.run()) + await asyncio.sleep(0.05) + # Tool boundary + consumer.on_segment_break() + await asyncio.sleep(0.05) + consumer.on_delta("Post-tool") + await asyncio.sleep(0.05) + consumer.finish() + await task + + # Pre-tool and post-tool segments must use different draft_ids. + draft_ids = [c["draft_id"] for c in adapter.draft_calls] + if len(draft_ids) >= 2: + # Find pre-tool and post-tool calls by content + pre_ids = { + c["draft_id"] for c in adapter.draft_calls + if "Pre-tool" in c["content"] and "Post-tool" not in c["content"] + } + post_ids = { + c["draft_id"] for c in adapter.draft_calls + if "Post-tool" in c["content"] + } + if pre_ids and post_ids: + assert pre_ids.isdisjoint(post_ids), ( + f"pre-tool and post-tool segments must use distinct " + f"draft_ids; got pre={pre_ids} post={post_ids}" + ) + + +class TestAlreadySentInDraftMode: + """Drafts must NOT mark _already_sent — that flag gates the gateway's + fallback final-send path, which we still need to fire so the user gets + a real message in their history (drafts have no message_id).""" + + @pytest.mark.asyncio + async def test_drafts_do_not_set_already_sent_until_real_message(self): + adapter = _make_draft_capable_adapter() + cfg = StreamConsumerConfig( + transport="auto", chat_type="dm", + edit_interval=0.01, buffer_threshold=5, cursor="", + ) + consumer = GatewayStreamConsumer(adapter, "12345", cfg) + + consumer.on_delta("Hello") + # Drive the consumer for a bit but DON'T finish — only drafts have + # been sent. + task = asyncio.create_task(consumer.run()) + await asyncio.sleep(0.05) + # At this point drafts may have fired but we haven't finalized. + # _already_sent must still be False so a downstream fallback would + # know it needs to deliver the final answer. + if adapter.draft_calls: + assert consumer._already_sent is False, ( + "drafts wrongly marked _already_sent — " + "would suppress gateway fallback delivery" + ) + + consumer.finish() + await task + + # After the regular sendMessage finalize, _already_sent is True. + assert consumer._already_sent is True diff --git a/website/docs/user-guide/messaging/telegram.md b/website/docs/user-guide/messaging/telegram.md index ee6e09bfd77..ffbc9dfe074 100644 --- a/website/docs/user-guide/messaging/telegram.md +++ b/website/docs/user-guide/messaging/telegram.md @@ -611,7 +611,33 @@ To find a topic's `thread_id`, open the topic in Telegram Web or Desktop and loo - **Bot API 9.4 (Feb 2026):** Private Chat Topics — bots can create forum topics in 1-on-1 DM chats via `createForumTopic`. Hermes uses this for two distinct features: operator-curated [Private Chat Topics](#private-chat-topics-bot-api-94) (config-driven, fixed topic list) and user-driven [Multi-session DM mode](#multi-session-dm-mode-topic) (activated by `/topic`, unlimited user-created topics). - **Privacy policy:** Telegram now requires bots to have a privacy policy. Set one via BotFather with `/setprivacy_policy`, or Telegram may auto-generate a placeholder. This is particularly important if your bot is public-facing. -- **Message streaming:** Bot API 9.x added support for streaming long responses, which can improve perceived latency for lengthy agent replies. +- **Bot API 9.5 (Mar 2026): Native streaming via `sendMessageDraft`.** Hermes uses Telegram's native streaming-draft API to render an animated preview of the agent's reply as tokens arrive in private chats. Drops the per-edit jitter you used to see with the legacy `editMessageText` polling path on slow models. + +### Streaming transport (`gateway.streaming.transport`) + +When streaming is enabled (`gateway.streaming.enabled: true`), Hermes picks one of four transports: + +| Value | Behaviour | +|---|---| +| `auto` (default) | Native draft streaming on supported chats (currently Telegram DMs); legacy edit-based path otherwise. Falls back gracefully if a draft frame fails. | +| `draft` | Force native drafts. Logs a downgrade and falls back to edit if the chat doesn't support drafts (e.g. groups/topics). | +| `edit` | Legacy progressive `editMessageText` polling for every chat type. | +| `off` | Disable streaming entirely (final reply only, no progressive updates). | + +In `~/.hermes/config.yaml`: + +```yaml +gateway: + streaming: + enabled: true + transport: auto # auto | draft | edit | off +``` + +**What you'll see in DMs with `auto` (default)** — when the agent generates a reply, Telegram shows an animated draft preview that updates token-by-token. When the reply finishes, it's delivered as a regular message and the draft preview clears naturally on the client. Drafts have no message id, so the final answer is what stays in your chat history. + +**What about groups, supergroups, forum topics?** Telegram restricts `sendMessageDraft` to private chats (DMs). The gateway transparently falls back to the edit-based path for everything else — same UX as before. + +**What if a draft frame fails?** Any failure (transient network error, server-side rejection, older python-telegram-bot install) flips that response back to the edit-based path for the rest of the stream. The next response gets a fresh attempt. ## Rendering: Tables and Link Previews