From 7c0605bf224c27dda466a1e877161da319224da6 Mon Sep 17 00:00:00 2001 From: ITheEqualizer Date: Sat, 13 Jun 2026 10:53:28 -0700 Subject: [PATCH] fix(telegram): preserve rich formatting on stream final --- agent/prompt_builder.py | 20 +- gateway/platforms/base.py | 40 ++++ gateway/platforms/telegram.py | 62 +++++- gateway/stream_consumer.py | 165 +++++++++++++--- tests/agent/test_prompt_builder.py | 5 + tests/gateway/test_stream_consumer_draft.py | 186 ++++++++++++++++++ .../test_stream_consumer_thread_routing.py | 3 +- tests/gateway/test_telegram_rich_messages.py | 56 ++++++ 8 files changed, 499 insertions(+), 38 deletions(-) diff --git a/agent/prompt_builder.py b/agent/prompt_builder.py index 1cc0c4a71e4..080a3525e33 100644 --- a/agent/prompt_builder.py +++ b/agent/prompt_builder.py @@ -511,13 +511,19 @@ PLATFORM_HINTS = { "Standard Markdown is automatically converted to Telegram formatting. " "Supported: **bold**, *italic*, ~~strikethrough~~, ||spoiler||, " "`inline code`, ```code blocks```, [links](url), and ## headers. " - "Telegram supports rich Markdown, so when it improves clarity you may " - "use headings, tables (pipe `| col | col |` syntax), task lists " - "(`- [ ]` / `- [x]`), nested blockquotes, collapsible details, " - "footnotes/references, math/formulas (`$...$`, `$$...$$`), underline, " - "subscript/superscript, marked (highlighted) text, and anchors. Prefer " - "real Markdown tables and task lists over hand-built bullet substitutes " - "when presenting structured data. " + "Telegram now supports rich Markdown, so lean into it: whenever it " + "makes the answer clearer or easier to scan, actively reach for real " + "Markdown tables (pipe `| col | col |` syntax), bullet and numbered " + "lists, task lists (`- [ ]` / `- [x]`), headings, nested blockquotes, " + "collapsible details, footnotes/references, math/formulas (`$...$`, " + "`$$...$$`), underline, subscript/superscript, marked (highlighted) " + "text, and anchors. Default to structured formatting over dense " + "paragraphs for any comparison, set of steps, key/value summary, or " + "tabular data. Prefer real Markdown tables and task lists over " + "hand-built bullet substitutes when presenting structured data; these " + "degrade gracefully (tables become readable bullet groups) when rich " + "rendering is unavailable, but advanced constructs like math and " + "collapsible details may render as plain source text in that case. " "You can send media files natively: to deliver a file to the user, " "include MEDIA:/absolute/path/to/file in your response. Images " "(.png, .jpg, .webp) appear as photos, audio (.ogg) sends as voice " diff --git a/gateway/platforms/base.py b/gateway/platforms/base.py index 5c15660d427..205d9cbf509 100644 --- a/gateway/platforms/base.py +++ b/gateway/platforms/base.py @@ -1953,6 +1953,46 @@ class BasePlatformAdapter(ABC): """ return False + def prefers_fresh_final_streaming( + self, + content: str, + metadata: Optional[Dict[str, Any]] = None, + ) -> bool: + """Whether the stream consumer should finalize a streamed reply by + sending a *fresh* final message (and deleting the preview) instead of + final-editing the preview. + + Some adapters can send richer final messages than their current edit + implementation supports. Telegram is the motivating case: Hermes sends + final replies through ``sendRichMessage`` but still finalizes streamed + previews through its existing MarkdownV2 edit path until Bot API 10.1's + ``rich_message`` edit parameter is wired directly. Such adapters + override this to ask the consumer to re-deliver the completed answer as + a new rich message and best-effort delete the stale preview, so the + final rendering matches the rich send path. + + Default implementation returns False — legacy platforms keep the + edit-in-place finalization path. + """ + return False + + def streaming_overflow_limit(self) -> Optional[int]: + """Max single-message length (in this adapter's ``message_len_fn`` + units) the stream consumer may accumulate before it splits, when the + adapter can deliver a larger message than its legacy per-message limit. + + Telegram Bot API 10.1 Rich Messages accept up to 32,768 chars in a + single ``sendRichMessage`` / ``sendRichMessageDraft``, far above the + 4,096 MarkdownV2 limit. Adapters with such a richer send/draft path + override this so the consumer doesn't fragment a reply that fits one + rich message; the live edit preview is still bound by the platform's + edit limit, but the finalized reply (and DM draft preview) is delivered + whole. + + Return ``None`` (default) to use ``MAX_MESSAGE_LENGTH``. + """ + return None + async def send_draft( self, chat_id: str, diff --git a/gateway/platforms/telegram.py b/gateway/platforms/telegram.py index e0d13cdbd53..5bb4d5bbcbe 100644 --- a/gateway/platforms/telegram.py +++ b/gateway/platforms/telegram.py @@ -349,8 +349,11 @@ class TelegramAdapter(BasePlatformAdapter): MAX_MESSAGE_LENGTH = 4096 supports_code_blocks = True # Telegram MarkdownV2 renders fenced code blocks # Bot API 10.1 Rich Messages cap the raw markdown/html text at 32,768 - # UTF-8 bytes. Content above this is sent via the legacy chunking path. - RICH_MESSAGE_MAX_BYTES = 32768 + # UTF-8 characters. Content above this is sent via the legacy chunking path. + RICH_MESSAGE_MAX_CHARS = 32768 + # Backwards-compatible alias for tests/external callers that referenced the + # initial implementation name. The API limit is character-based, not bytes. + RICH_MESSAGE_MAX_BYTES = RICH_MESSAGE_MAX_CHARS # Threshold for detecting Telegram client-side message splits. # When a chunk is near this limit, a continuation is almost certain. _SPLIT_THRESHOLD = 4000 @@ -920,19 +923,20 @@ class TelegramAdapter(BasePlatformAdapter): # the RAW agent markdown so richer constructs (tables, task lists, # collapsible details, math, ...) render natively. The legacy MarkdownV2 # send() path stays as the fallback for unsupported/oversized content and - # older PTB/clients. Streaming edits/drafts are intentionally untouched — - # Telegram exposes no rich-edit method. + # older PTB/clients. Streaming edits stay on Hermes' existing MarkdownV2 + # edit path for now; finalization can re-send as rich and delete the stale + # preview until rich_message edit support is wired directly. # ------------------------------------------------------------------ def _content_fits_rich_limits(self, content: str) -> bool: """Cheap pre-check for the one hard rich limit we can count locally. - Only the 32,768 UTF-8 byte text cap is enforced here. Other Bot API + Only the 32,768 UTF-8 character text cap is enforced here. Other Bot API rich limits (500 blocks, 16 nesting levels, 20 table columns, ...) are not pre-counted; if exceeded Telegram returns a BadRequest, which :meth:`_is_rich_fallback_error` classifies as permanent so the send degrades to the legacy chunking path. """ - return len(content.encode("utf-8")) <= self.RICH_MESSAGE_MAX_BYTES + return len(content) <= self.RICH_MESSAGE_MAX_CHARS def _bot_supports_rich(self) -> bool: """True when the bound bot can issue raw ``sendRichMessage`` calls. @@ -946,15 +950,57 @@ class TelegramAdapter(BasePlatformAdapter): """ return inspect.iscoroutinefunction(getattr(self._bot, "do_api_request", None)) - def _should_attempt_rich(self, content: str) -> bool: + def _should_attempt_rich( + self, content: str, metadata: Optional[Dict[str, Any]] = None + ) -> bool: return bool( not getattr(self, "_rich_send_disabled", False) + and not (metadata or {}).get("expect_edits") and content and content.strip() and self._content_fits_rich_limits(content) and self._bot_supports_rich() ) + def prefers_fresh_final_streaming( + self, content: str, metadata: Optional[Dict[str, Any]] = None + ) -> bool: + """Finalize rich-eligible streamed replies with a fresh sendRichMessage + instead of Hermes' current MarkdownV2 edit path. + + The final edit path has not yet been upgraded to Bot API 10.1's + ``rich_message`` edit parameter, so finalizing through edit would lose + rich constructs such as tables/task lists. When the completed content + is rich-eligible, re-send it via ``sendRichMessage`` and delete the + preview (see ``gateway.stream_consumer._try_fresh_final``). + + ``metadata`` is intentionally ignored: the preview was sent with + ``expect_edits=True`` (to stay on the editable path mid-stream), but the + FINAL answer is a brand-new message that should render rich. Gating + otherwise matches :meth:`_should_attempt_rich`: rich not latched off, + content present and within the rich character limit, and the bot exposes + an async ``do_api_request``. + """ + return self._should_attempt_rich(content) + + def streaming_overflow_limit(self) -> Optional[int]: + """Allow the stream consumer to accumulate up to the rich-message cap + before splitting, so a reply that fits one ``sendRichMessage`` / + ``sendRichMessageDraft`` isn't fragmented at the 4,096 MarkdownV2 limit. + + Gated on the same rich capability as the send path (minus the + content-length check — raising that cap is the whole point): rich not + latched off and the bot exposes an async ``do_api_request``. Returns + ``None`` (→ legacy 4,096 limit) when rich isn't available, so non-rich + streams split exactly as before. + """ + if ( + not getattr(self, "_rich_send_disabled", False) + and self._bot_supports_rich() + ): + return self.RICH_MESSAGE_MAX_CHARS + return None + def _rich_message_payload( self, content: str, *, skip_entity_detection: bool = False ) -> Dict[str, Any]: @@ -2146,7 +2192,7 @@ class TelegramAdapter(BasePlatformAdapter): # through to the legacy MarkdownV2 path on permanent/capability # errors or DM-topic routing skips; returns directly on success or # on a transient failure (which must NOT be legacy-resent). - if self._should_attempt_rich(content): + if self._should_attempt_rich(content, metadata=metadata): rich_result = await self._try_send_rich(chat_id, content, reply_to, metadata) if rich_result is not None: if rich_result.success: diff --git a/gateway/stream_consumer.py b/gateway/stream_consumer.py index c236dc69a60..aefacdbd4f7 100644 --- a/gateway/stream_consumer.py +++ b/gateway/stream_consumer.py @@ -143,6 +143,13 @@ class GatewayStreamConsumer: # timestamps would be stale by completion time. Ported from # openclaw/openclaw#72038. self._message_created_ts: Optional[float] = None + # Every real preview message id the consumer has put on screen during + # this response (first send + any continuation messages from oversized + # edits/sends). The fresh-final path deletes all of them when it + # re-delivers the completed answer as a single (rich) message, so a + # reply that was split across the platform's edit limit while streaming + # doesn't leave stale fragments above the final message. + self._preview_message_ids: "set[str]" = set() self._already_sent = False self._edit_supported = True # Disabled when progressive edits are no longer usable self._last_edit_time = 0.0 @@ -420,7 +427,10 @@ class GatewayStreamConsumer: if isinstance(self.adapter, _BasePlatformAdapter) else len ) - _raw_limit = getattr(self.adapter, "MAX_MESSAGE_LENGTH", 4096) + # Rich-capable adapters (Telegram rich messages) raise this above the + # legacy per-message limit so a reply that fits one rich send/draft + # isn't fragmented at 4096 while streaming. See _raw_message_limit. + _raw_limit = self._raw_message_limit() _safe_limit = max(500, _raw_limit - _len_fn(self.cfg.cursor) - 100) # Resolve native draft streaming once per run. When enabled the @@ -589,9 +599,20 @@ class GatewayStreamConsumer: if self._accumulated: if self._fallback_final_send: await self._send_fallback_final(self._accumulated) - elif current_update_visible and ( - not self._adapter_requires_finalize - or self._last_edit_overflowed + elif self._final_response_sent: + # A finalize=True tick above already delivered the + # final answer via the adapter's fresh-final path + # (_try_fresh_final sent a fresh rich message and + # deleted the preview). Running a second finalize + # edit here would duplicate the message / re-delete, + # so just record delivery and stop. + self._final_content_delivered = True + elif ( + current_update_visible + and ( + not self._adapter_requires_finalize + or self._last_edit_overflowed + ) ): # Mid-stream edit above already delivered the # final accumulated content. Skip the redundant @@ -729,6 +750,9 @@ class GatewayStreamConsumer: return reply_to_id try: meta = dict(self.metadata) if self.metadata else {} + # This chunk becomes the next edit target — adapters that support + # rich final sends (Telegram) must keep it on the editable path. + meta["expect_edits"] = True result = await self.adapter.send( chat_id=self.chat_id, content=text, @@ -737,6 +761,7 @@ class GatewayStreamConsumer: ) if result.success and result.message_id: self._message_id = str(result.message_id) + self._track_preview_ids_from_result(result) self._already_sent = True self._last_sent_text = text # Fresh content bubble — close off any stale tool bubble @@ -1114,6 +1139,76 @@ class GatewayStreamConsumer: age = time.monotonic() - self._message_created_ts return age >= threshold + def _raw_message_limit(self) -> int: + """Per-message length budget (in the adapter's ``message_len_fn`` units) + before the consumer splits an overflowing reply. + + Adapters with a richer send/draft path (e.g. Telegram rich messages) + can raise this above ``MAX_MESSAGE_LENGTH`` via + ``streaming_overflow_limit`` so a reply that fits one rich message isn't + fragmented at the legacy edit limit. Falls back to + ``MAX_MESSAGE_LENGTH`` (4096 default) for everyone else. + """ + base = getattr(self.adapter, "MAX_MESSAGE_LENGTH", 4096) + # isinstance gate: MagicMock adapters return mock objects (truthy, not + # ints) for arbitrary attribute access — keep them on the base limit. + if isinstance(self.adapter, _BasePlatformAdapter): + try: + cap = self.adapter.streaming_overflow_limit() + except Exception as e: + logger.debug("streaming_overflow_limit check failed: %s", e) + cap = None + if isinstance(cap, int) and cap > base: + return cap + return base + + def _track_preview_id(self, message_id: Optional[str]) -> None: + """Record a real preview message id for fresh-final cleanup.""" + if message_id and message_id != "__no_edit__": + self._preview_message_ids.add(str(message_id)) + + def _track_preview_ids_from_result(self, result: Any) -> None: + """Record every message id a send/edit result exposes: the primary id + plus any continuation ids from an oversized split + (``continuation_message_ids`` or ``raw_response['message_ids']``).""" + self._track_preview_id(getattr(result, "message_id", None)) + for mid in (getattr(result, "continuation_message_ids", None) or ()): + self._track_preview_id(mid) + raw = getattr(result, "raw_response", None) or {} + if isinstance(raw, dict): + for mid in (raw.get("message_ids") or ()): + self._track_preview_id(mid) + + def _adapter_prefers_fresh_final(self, text: str) -> bool: + """Return True when the adapter would rather finalize a streamed reply + by sending a fresh message and deleting the preview than by editing the + preview in place — e.g. Telegram, whose ``sendRichMessage`` send path + currently renders richer markdown than Hermes' MarkdownV2 edit path. + + Returns False when there is no real preview to replace (no message id, + or the ``__no_edit__`` sentinel), when the adapter doesn't expose the + hook, or on any error (the consumer then keeps the edit-in-place path). + """ + if not self._message_id or self._message_id == "__no_edit__": + return False + fn = getattr(self.adapter, "prefers_fresh_final_streaming", None) + if fn is None: + return False + try: + try: + result = fn(text, metadata=self.metadata) + except TypeError: + # Adapter / test double whose hook doesn't accept the metadata + # keyword — fall back to the positional-only form. + result = fn(text) + except Exception as e: + logger.debug("prefers_fresh_final_streaming check failed: %s", e) + return False + # ``is True`` (not ``bool(...)``) so a MagicMock adapter's auto-child + # method — truthy by default in tests — does not wrongly enable the + # fresh-final path. Mirrors the REQUIRES_EDIT_FINALIZE gate in __init__. + return result is True + async def _try_fresh_final(self, text: str, *, is_turn_final: bool = True) -> bool: """Send ``text`` as a brand-new message (best-effort delete the old preview) so the platform's visible timestamp reflects completion @@ -1127,7 +1222,13 @@ class GatewayStreamConsumer: Ported from openclaw/openclaw#72038. """ - old_message_id = self._message_id + # Every preview message the user has seen for this response: the + # current one plus any continuation fragments tracked while streaming + # (an oversized reply split across the platform's edit limit). All of + # them are replaced by the single fresh message below. + stale_ids = set(self._preview_message_ids) + if self._message_id and self._message_id != "__no_edit__": + stale_ids.add(self._message_id) try: result = await self.adapter.send( chat_id=self.chat_id, @@ -1139,25 +1240,29 @@ class GatewayStreamConsumer: return False if not getattr(result, "success", False): return False - # Successful fresh send — try to delete the stale preview so the - # user doesn't see the old edit-stuck message underneath. Cleanup - # is best-effort; platforms that don't implement ``delete_message`` - # just leave the preview behind (still an acceptable outcome — - # the visible final timestamp is the important part). - if old_message_id and old_message_id != "__no_edit__": - delete_fn = getattr(self.adapter, "delete_message", None) - if delete_fn is not None: - try: - await delete_fn(self.chat_id, old_message_id) - except Exception as e: - logger.debug( - "Fresh-final preview cleanup failed (%s): %s", - old_message_id, e, - ) # Adopt the new message id as the current message so subsequent # callers (e.g. overflow split loops, finalize retries) see a # consistent state. new_message_id = getattr(result, "message_id", None) + # Successful fresh send — try to delete the stale preview(s) so the + # user doesn't see the old edit-stuck message(s) underneath. Cleanup + # is best-effort; platforms that don't implement ``delete_message`` + # just leave the preview behind (still an acceptable outcome — the + # visible final timestamp is the important part). Never delete the + # message we just sent. + delete_fn = getattr(self.adapter, "delete_message", None) + if delete_fn is not None: + for stale_id in stale_ids: + if not stale_id or stale_id == "__no_edit__" or stale_id == new_message_id: + continue + try: + await delete_fn(self.chat_id, stale_id) + except Exception as e: + logger.debug( + "Fresh-final preview cleanup failed (%s): %s", + stale_id, e, + ) + self._preview_message_ids = set() if new_message_id: self._message_id = new_message_id self._message_created_ts = time.monotonic() @@ -1267,9 +1372,19 @@ class GatewayStreamConsumer: # old preview follows. Ported from # openclaw/openclaw#72038. Gated by config so the # legacy edit-in-place path stays the default. + # + # Adapters can also opt in regardless of the time threshold + # via prefers_fresh_final_streaming (e.g. Telegram, whose + # send path renders richer markdown than its edit path): + # finalizing through edit would visibly downgrade a rich + # preview, so re-deliver as a fresh message + delete the + # preview instead. if ( finalize - and self._should_send_fresh_final() + and ( + self._should_send_fresh_final() + or self._adapter_prefers_fresh_final(text) + ) and await self._try_fresh_final( text, is_turn_final=is_turn_final, ) @@ -1283,6 +1398,9 @@ class GatewayStreamConsumer: ) if result.success: self._already_sent = True + # Record any continuation fragments an oversized edit + # split off, so fresh-final can clean them all up. + self._track_preview_ids_from_result(result) # Adapter may have split-and-delivered an oversized # edit across the original message + N continuations. # When that happens, ``message_id`` is the LAST visible @@ -1405,7 +1523,7 @@ class GatewayStreamConsumer: chat_id=self.chat_id, content=text, reply_to=self._initial_reply_to_id, - metadata=self.metadata, + metadata={**(self.metadata or {}), "expect_edits": True}, ) if result.success: if result.message_id: @@ -1414,6 +1532,9 @@ class GatewayStreamConsumer: # the user so fresh-final logic can detect stale # preview timestamps on long-running responses. self._message_created_ts = time.monotonic() + # Record this (and any continuation fragments from an + # oversized first send) for fresh-final cleanup. + self._track_preview_ids_from_result(result) else: self._edit_supported = False self._already_sent = True diff --git a/tests/agent/test_prompt_builder.py b/tests/agent/test_prompt_builder.py index 998f9ddbac8..e6c302fdb92 100644 --- a/tests/agent/test_prompt_builder.py +++ b/tests/agent/test_prompt_builder.py @@ -887,6 +887,11 @@ class TestPromptBuilderConstants: assert "table" in lowered assert "task list" in lowered assert "math" in lowered + # Hint should proactively steer toward structured formatting, not just + # permit it: bullet + numbered lists for scannable, structured output. + assert "bullet" in lowered + assert "numbered" in lowered + # Local media delivery guidance must remain intact. assert "include MEDIA:" in hint def test_platform_hints_mattermost(self): diff --git a/tests/gateway/test_stream_consumer_draft.py b/tests/gateway/test_stream_consumer_draft.py index 23d12b03913..ef09bd75db0 100644 --- a/tests/gateway/test_stream_consumer_draft.py +++ b/tests/gateway/test_stream_consumer_draft.py @@ -321,3 +321,189 @@ class TestAlreadySentInDraftMode: # After the regular sendMessage finalize, _already_sent is True. assert consumer._already_sent is True + + +def _make_fresh_final_adapter(): + """Build a non-draft adapter that prefers a fresh final send. + + Mirrors Telegram's rich-message contract: REQUIRES_EDIT_FINALIZE so the + final tick is routed through even when the text is unchanged, and + prefers_fresh_final_streaming() True so the consumer delivers the final + answer via a *fresh* send + preview delete instead of an edit. + + ``send`` returns two distinct ids so the test can tell the preview + (first send) from the fresh final (second send) and assert the preview + is the one deleted. + """ + from gateway.platforms.base import BasePlatformAdapter, SendResult + + FreshFinalAdapter = type( + "FreshFinalAdapter", + (BasePlatformAdapter,), + {"MAX_MESSAGE_LENGTH": 4096, "REQUIRES_EDIT_FINALIZE": True}, + ) + FreshFinalAdapter.__abstractmethods__ = frozenset() + adapter = FreshFinalAdapter.__new__(FreshFinalAdapter) + adapter._typing_paused = set() + adapter._fatal_error_message = None + + # Edit-based path only — no native drafts. + adapter.supports_draft_streaming = lambda chat_type=None, metadata=None: False + # Accepts the metadata kwarg the consumer passes; ignores it (like Telegram). + adapter.prefers_fresh_final_streaming = lambda content, metadata=None: True + + adapter.send = AsyncMock(side_effect=[ + SendResult(success=True, message_id="preview1"), + SendResult(success=True, message_id="final1"), + ]) + adapter.edit_message = AsyncMock(return_value=SendResult(success=True)) + adapter.delete_message = AsyncMock(return_value=True) + return adapter + + +class TestAdapterPrefersFreshFinal: + """An adapter whose send path is richer than its edit path (e.g. Telegram + rich messages) finalizes a streamed reply by sending a fresh final message + and deleting the preview, instead of final-editing the preview.""" + + @pytest.mark.asyncio + async def test_edit_stream_finalizes_with_fresh_send_and_deletes_preview(self): + adapter = _make_fresh_final_adapter() + cfg = StreamConsumerConfig( + transport="auto", chat_type="dm", + edit_interval=0.01, buffer_threshold=5, cursor="", + fresh_final_after_seconds=0.0, # only the adapter hook drives fresh-final + ) + consumer = GatewayStreamConsumer(adapter, "12345", cfg) + + consumer.on_delta("Full answer here") + task = asyncio.create_task(consumer.run()) + # Let the first send land so a real preview message_id exists before + # finalization — the fresh-final path only engages with a live preview. + await asyncio.sleep(0.05) + consumer.finish() + await task + + # Two sends: the streaming preview, then the fresh final. + assert adapter.send.await_count == 2 + first_content = adapter.send.call_args_list[0].kwargs.get("content") + second_content = adapter.send.call_args_list[1].kwargs.get("content") + # First update delivered the preview via adapter.send. + assert first_content == "Full answer here" + # Finalization re-sent the same completed content as a fresh message. + assert second_content == "Full answer here" + + # The edit path must NOT be used to finalize a rich preview. + adapter.edit_message.assert_not_called() + + # The stale preview is best-effort deleted (by its id, not the final's). + adapter.delete_message.assert_awaited_once_with("12345", "preview1") + + assert consumer.final_response_sent is True + + +def _make_rich_capable_adapter(*, overflow_limit=32768, send_results=None): + """Non-draft adapter that mimics Telegram rich messages: REQUIRES_EDIT_FINALIZE, + prefers a fresh (rich) final send, and reports a 32,768 streaming overflow + limit so the consumer doesn't pre-split a reply that fits one rich message. + """ + from gateway.platforms.base import BasePlatformAdapter, SendResult + + RichAdapter = type( + "RichCapableAdapter", + (BasePlatformAdapter,), + {"MAX_MESSAGE_LENGTH": 4096, "REQUIRES_EDIT_FINALIZE": True}, + ) + RichAdapter.__abstractmethods__ = frozenset() + adapter = RichAdapter.__new__(RichAdapter) + adapter._typing_paused = set() + adapter._fatal_error_message = None + adapter.supports_draft_streaming = lambda chat_type=None, metadata=None: False + adapter.prefers_fresh_final_streaming = lambda content, metadata=None: True + adapter.streaming_overflow_limit = lambda: overflow_limit + adapter.send = AsyncMock(side_effect=send_results) if send_results else AsyncMock( + return_value=SendResult(success=True, message_id="m1"), + ) + adapter.edit_message = AsyncMock(return_value=SendResult(success=True)) + adapter.delete_message = AsyncMock(return_value=True) + return adapter + + +class TestRichAwareOverflow: + """Rich-capable adapters raise the consumer's overflow limit so a reply that + fits one rich message isn't fragmented at the legacy 4,096 edit limit.""" + + def test_raw_message_limit_uses_adapter_rich_cap(self): + adapter = _make_rich_capable_adapter(overflow_limit=32768) + consumer = GatewayStreamConsumer(adapter, "12345", StreamConsumerConfig()) + assert consumer._raw_message_limit() == 32768 + + def test_raw_message_limit_falls_back_to_max_length(self): + # Adapter whose hook returns None (default) keeps the legacy limit. + adapter = _make_rich_capable_adapter() + adapter.streaming_overflow_limit = lambda: None + consumer = GatewayStreamConsumer(adapter, "12345", StreamConsumerConfig()) + assert consumer._raw_message_limit() == 4096 + + def test_raw_message_limit_mock_adapter_is_safe(self): + # MagicMock adapters (many existing tests) must not crash or wrongly + # inflate the limit from a truthy auto-attribute. + adapter = MagicMock() + adapter.MAX_MESSAGE_LENGTH = 4096 + consumer = GatewayStreamConsumer(adapter, "12345", StreamConsumerConfig()) + assert consumer._raw_message_limit() == 4096 + + @pytest.mark.asyncio + async def test_long_rich_reply_not_split_and_final_is_whole(self): + from gateway.platforms.base import SendResult + + long_text = "x" * 5000 # > 4096 legacy limit, < 32768 rich limit + adapter = _make_rich_capable_adapter(send_results=[ + SendResult(success=True, message_id="preview1"), + SendResult(success=True, message_id="final1"), + ]) + cfg = StreamConsumerConfig( + transport="auto", chat_type="dm", + edit_interval=0.01, buffer_threshold=5, cursor="", + fresh_final_after_seconds=0.0, + ) + consumer = GatewayStreamConsumer(adapter, "12345", cfg) + + consumer.on_delta(long_text) + task = asyncio.create_task(consumer.run()) + await asyncio.sleep(0.05) + consumer.finish() + await task + + # Exactly two whole sends: the preview and the fresh final — NOT split + # into ~4096 chunks. Both carry the full 5,000-char reply. + assert adapter.send.await_count == 2 + assert adapter.send.call_args_list[0].kwargs.get("content") == long_text + assert adapter.send.call_args_list[1].kwargs.get("content") == long_text + adapter.edit_message.assert_not_called() + adapter.delete_message.assert_awaited_once_with("12345", "preview1") + assert consumer.final_response_sent is True + + @pytest.mark.asyncio + async def test_fresh_final_deletes_all_preview_fragments(self): + from gateway.platforms.base import SendResult + + adapter = _make_rich_capable_adapter(send_results=[ + SendResult(success=True, message_id="final1"), + ]) + consumer = GatewayStreamConsumer(adapter, "12345", StreamConsumerConfig()) + # Simulate a reply that was split across the edit limit while streaming: + # three preview fragments, the last of which is the current message. + consumer._message_id = "frag3" + consumer._preview_message_ids = {"frag1", "frag2", "frag3"} + + ok = await consumer._try_fresh_final("the whole completed answer") + + assert ok is True + # All three stale fragments deleted; the fresh final never deleted. + deleted = {c.args[1] for c in adapter.delete_message.await_args_list} + assert deleted == {"frag1", "frag2", "frag3"} + assert "final1" not in deleted + assert consumer._message_id == "final1" + assert consumer._preview_message_ids == set() + assert consumer.final_response_sent is True diff --git a/tests/gateway/test_stream_consumer_thread_routing.py b/tests/gateway/test_stream_consumer_thread_routing.py index ec4611abfa3..b2b7f22ffe5 100644 --- a/tests/gateway/test_stream_consumer_thread_routing.py +++ b/tests/gateway/test_stream_consumer_thread_routing.py @@ -103,7 +103,8 @@ class TestInitialReplyToId: await consumer._send_or_edit("Test") call_kwargs = adapter.send.call_args[1] - assert call_kwargs["metadata"] == metadata + assert call_kwargs["metadata"] == {**metadata, "expect_edits": True} + assert metadata == {"thread_id": "omt_topic789"} class TestOverflowFirstMessage: diff --git a/tests/gateway/test_telegram_rich_messages.py b/tests/gateway/test_telegram_rich_messages.py index db971cd6d5f..70893bbd7a4 100644 --- a/tests/gateway/test_telegram_rich_messages.py +++ b/tests/gateway/test_telegram_rich_messages.py @@ -79,6 +79,25 @@ async def test_legacy_rich_messages_config_is_ignored(): adapter._bot.send_message.assert_not_called() +@pytest.mark.asyncio +async def test_expect_edits_metadata_keeps_preview_on_legacy_path(): + adapter = _make_adapter() + + result = await adapter.send( + "12345", + RICH_CONTENT, + metadata={"expect_edits": True}, + ) + + assert result.success is True + # Streaming preview sends will be edited later, so they must not be born as + # rich messages until Hermes wires rich_message edits directly. + bot = adapter._bot + assert bot is not None + bot.do_api_request.assert_not_called() + bot.send_message.assert_awaited() + + @pytest.mark.asyncio async def test_oversized_content_skips_rich_and_chunks(): adapter = _make_adapter() @@ -326,3 +345,40 @@ async def test_rich_draft_oversized_uses_legacy(): assert result.success is True adapter._bot.do_api_request.assert_not_called() adapter._bot.send_message_draft.assert_awaited_once() + + +# ---------------------------------------------------------------------- +# prefers_fresh_final_streaming: the stream consumer asks the adapter whether +# to finalize a streamed reply by sending a fresh (rich) message + deleting the +# preview, instead of final-editing the preview through the non-rich edit path. +# Telegram opts in exactly when the content is rich-eligible. +# ---------------------------------------------------------------------- +def test_prefers_fresh_final_streaming_when_rich_enabled(): + adapter = _make_adapter() + assert adapter.prefers_fresh_final_streaming(RICH_CONTENT) is True + + +def test_prefers_fresh_final_streaming_ignores_legacy_toggle(): + adapter = _make_adapter(extra={"rich_messages": False}) + assert adapter.prefers_fresh_final_streaming(RICH_CONTENT) is True + + +# ---------------------------------------------------------------------- +# streaming_overflow_limit: with rich on, the stream consumer may accumulate up +# to the 32,768-char rich cap before splitting, so a reply that fits one +# sendRichMessage / sendRichMessageDraft isn't fragmented at the 4,096 limit. +# ---------------------------------------------------------------------- +def test_streaming_overflow_limit_is_rich_cap_when_enabled(): + adapter = _make_adapter() + assert adapter.streaming_overflow_limit() == TelegramAdapter.RICH_MESSAGE_MAX_CHARS + + +def test_streaming_overflow_limit_ignores_legacy_toggle(): + adapter = _make_adapter(extra={"rich_messages": False}) + assert adapter.streaming_overflow_limit() == TelegramAdapter.RICH_MESSAGE_MAX_CHARS + + +def test_streaming_overflow_limit_none_when_rich_latched_off(): + adapter = _make_adapter() + adapter._rich_send_disabled = True + assert adapter.streaming_overflow_limit() is None