fix(telegram): preserve rich formatting on stream final

This commit is contained in:
ITheEqualizer 2026-06-13 10:53:28 -07:00 committed by Teknium
parent 819def44c7
commit 7c0605bf22
8 changed files with 499 additions and 38 deletions

View file

@ -511,13 +511,19 @@ PLATFORM_HINTS = {
"Standard Markdown is automatically converted to Telegram formatting. "
"Supported: **bold**, *italic*, ~~strikethrough~~, ||spoiler||, "
"`inline code`, ```code blocks```, [links](url), and ## headers. "
"Telegram supports rich Markdown, so when it improves clarity you may "
"use headings, tables (pipe `| col | col |` syntax), task lists "
"(`- [ ]` / `- [x]`), nested blockquotes, collapsible details, "
"footnotes/references, math/formulas (`$...$`, `$$...$$`), underline, "
"subscript/superscript, marked (highlighted) text, and anchors. Prefer "
"real Markdown tables and task lists over hand-built bullet substitutes "
"when presenting structured data. "
"Telegram now supports rich Markdown, so lean into it: whenever it "
"makes the answer clearer or easier to scan, actively reach for real "
"Markdown tables (pipe `| col | col |` syntax), bullet and numbered "
"lists, task lists (`- [ ]` / `- [x]`), headings, nested blockquotes, "
"collapsible details, footnotes/references, math/formulas (`$...$`, "
"`$$...$$`), underline, subscript/superscript, marked (highlighted) "
"text, and anchors. Default to structured formatting over dense "
"paragraphs for any comparison, set of steps, key/value summary, or "
"tabular data. Prefer real Markdown tables and task lists over "
"hand-built bullet substitutes when presenting structured data; these "
"degrade gracefully (tables become readable bullet groups) when rich "
"rendering is unavailable, but advanced constructs like math and "
"collapsible details may render as plain source text in that case. "
"You can send media files natively: to deliver a file to the user, "
"include MEDIA:/absolute/path/to/file in your response. Images "
"(.png, .jpg, .webp) appear as photos, audio (.ogg) sends as voice "

View file

@ -1953,6 +1953,46 @@ class BasePlatformAdapter(ABC):
"""
return False
def prefers_fresh_final_streaming(
self,
content: str,
metadata: Optional[Dict[str, Any]] = None,
) -> bool:
"""Whether the stream consumer should finalize a streamed reply by
sending a *fresh* final message (and deleting the preview) instead of
final-editing the preview.
Some adapters can send richer final messages than their current edit
implementation supports. Telegram is the motivating case: Hermes sends
final replies through ``sendRichMessage`` but still finalizes streamed
previews through its existing MarkdownV2 edit path until Bot API 10.1's
``rich_message`` edit parameter is wired directly. Such adapters
override this to ask the consumer to re-deliver the completed answer as
a new rich message and best-effort delete the stale preview, so the
final rendering matches the rich send path.
Default implementation returns False legacy platforms keep the
edit-in-place finalization path.
"""
return False
def streaming_overflow_limit(self) -> Optional[int]:
"""Max single-message length (in this adapter's ``message_len_fn``
units) the stream consumer may accumulate before it splits, when the
adapter can deliver a larger message than its legacy per-message limit.
Telegram Bot API 10.1 Rich Messages accept up to 32,768 chars in a
single ``sendRichMessage`` / ``sendRichMessageDraft``, far above the
4,096 MarkdownV2 limit. Adapters with such a richer send/draft path
override this so the consumer doesn't fragment a reply that fits one
rich message; the live edit preview is still bound by the platform's
edit limit, but the finalized reply (and DM draft preview) is delivered
whole.
Return ``None`` (default) to use ``MAX_MESSAGE_LENGTH``.
"""
return None
async def send_draft(
self,
chat_id: str,

View file

@ -349,8 +349,11 @@ class TelegramAdapter(BasePlatformAdapter):
MAX_MESSAGE_LENGTH = 4096
supports_code_blocks = True # Telegram MarkdownV2 renders fenced code blocks
# Bot API 10.1 Rich Messages cap the raw markdown/html text at 32,768
# UTF-8 bytes. Content above this is sent via the legacy chunking path.
RICH_MESSAGE_MAX_BYTES = 32768
# UTF-8 characters. Content above this is sent via the legacy chunking path.
RICH_MESSAGE_MAX_CHARS = 32768
# Backwards-compatible alias for tests/external callers that referenced the
# initial implementation name. The API limit is character-based, not bytes.
RICH_MESSAGE_MAX_BYTES = RICH_MESSAGE_MAX_CHARS
# Threshold for detecting Telegram client-side message splits.
# When a chunk is near this limit, a continuation is almost certain.
_SPLIT_THRESHOLD = 4000
@ -920,19 +923,20 @@ class TelegramAdapter(BasePlatformAdapter):
# the RAW agent markdown so richer constructs (tables, task lists,
# collapsible details, math, ...) render natively. The legacy MarkdownV2
# send() path stays as the fallback for unsupported/oversized content and
# older PTB/clients. Streaming edits/drafts are intentionally untouched —
# Telegram exposes no rich-edit method.
# older PTB/clients. Streaming edits stay on Hermes' existing MarkdownV2
# edit path for now; finalization can re-send as rich and delete the stale
# preview until rich_message edit support is wired directly.
# ------------------------------------------------------------------
def _content_fits_rich_limits(self, content: str) -> bool:
"""Cheap pre-check for the one hard rich limit we can count locally.
Only the 32,768 UTF-8 byte text cap is enforced here. Other Bot API
Only the 32,768 UTF-8 character text cap is enforced here. Other Bot API
rich limits (500 blocks, 16 nesting levels, 20 table columns, ...) are
not pre-counted; if exceeded Telegram returns a BadRequest, which
:meth:`_is_rich_fallback_error` classifies as permanent so the send
degrades to the legacy chunking path.
"""
return len(content.encode("utf-8")) <= self.RICH_MESSAGE_MAX_BYTES
return len(content) <= self.RICH_MESSAGE_MAX_CHARS
def _bot_supports_rich(self) -> bool:
"""True when the bound bot can issue raw ``sendRichMessage`` calls.
@ -946,15 +950,57 @@ class TelegramAdapter(BasePlatformAdapter):
"""
return inspect.iscoroutinefunction(getattr(self._bot, "do_api_request", None))
def _should_attempt_rich(self, content: str) -> bool:
def _should_attempt_rich(
self, content: str, metadata: Optional[Dict[str, Any]] = None
) -> bool:
return bool(
not getattr(self, "_rich_send_disabled", False)
and not (metadata or {}).get("expect_edits")
and content
and content.strip()
and self._content_fits_rich_limits(content)
and self._bot_supports_rich()
)
def prefers_fresh_final_streaming(
self, content: str, metadata: Optional[Dict[str, Any]] = None
) -> bool:
"""Finalize rich-eligible streamed replies with a fresh sendRichMessage
instead of Hermes' current MarkdownV2 edit path.
The final edit path has not yet been upgraded to Bot API 10.1's
``rich_message`` edit parameter, so finalizing through edit would lose
rich constructs such as tables/task lists. When the completed content
is rich-eligible, re-send it via ``sendRichMessage`` and delete the
preview (see ``gateway.stream_consumer._try_fresh_final``).
``metadata`` is intentionally ignored: the preview was sent with
``expect_edits=True`` (to stay on the editable path mid-stream), but the
FINAL answer is a brand-new message that should render rich. Gating
otherwise matches :meth:`_should_attempt_rich`: rich not latched off,
content present and within the rich character limit, and the bot exposes
an async ``do_api_request``.
"""
return self._should_attempt_rich(content)
def streaming_overflow_limit(self) -> Optional[int]:
"""Allow the stream consumer to accumulate up to the rich-message cap
before splitting, so a reply that fits one ``sendRichMessage`` /
``sendRichMessageDraft`` isn't fragmented at the 4,096 MarkdownV2 limit.
Gated on the same rich capability as the send path (minus the
content-length check raising that cap is the whole point): rich not
latched off and the bot exposes an async ``do_api_request``. Returns
``None`` ( legacy 4,096 limit) when rich isn't available, so non-rich
streams split exactly as before.
"""
if (
not getattr(self, "_rich_send_disabled", False)
and self._bot_supports_rich()
):
return self.RICH_MESSAGE_MAX_CHARS
return None
def _rich_message_payload(
self, content: str, *, skip_entity_detection: bool = False
) -> Dict[str, Any]:
@ -2146,7 +2192,7 @@ class TelegramAdapter(BasePlatformAdapter):
# through to the legacy MarkdownV2 path on permanent/capability
# errors or DM-topic routing skips; returns directly on success or
# on a transient failure (which must NOT be legacy-resent).
if self._should_attempt_rich(content):
if self._should_attempt_rich(content, metadata=metadata):
rich_result = await self._try_send_rich(chat_id, content, reply_to, metadata)
if rich_result is not None:
if rich_result.success:

View file

@ -143,6 +143,13 @@ class GatewayStreamConsumer:
# timestamps would be stale by completion time. Ported from
# openclaw/openclaw#72038.
self._message_created_ts: Optional[float] = None
# Every real preview message id the consumer has put on screen during
# this response (first send + any continuation messages from oversized
# edits/sends). The fresh-final path deletes all of them when it
# re-delivers the completed answer as a single (rich) message, so a
# reply that was split across the platform's edit limit while streaming
# doesn't leave stale fragments above the final message.
self._preview_message_ids: "set[str]" = set()
self._already_sent = False
self._edit_supported = True # Disabled when progressive edits are no longer usable
self._last_edit_time = 0.0
@ -420,7 +427,10 @@ class GatewayStreamConsumer:
if isinstance(self.adapter, _BasePlatformAdapter)
else len
)
_raw_limit = getattr(self.adapter, "MAX_MESSAGE_LENGTH", 4096)
# Rich-capable adapters (Telegram rich messages) raise this above the
# legacy per-message limit so a reply that fits one rich send/draft
# isn't fragmented at 4096 while streaming. See _raw_message_limit.
_raw_limit = self._raw_message_limit()
_safe_limit = max(500, _raw_limit - _len_fn(self.cfg.cursor) - 100)
# Resolve native draft streaming once per run. When enabled the
@ -589,9 +599,20 @@ class GatewayStreamConsumer:
if self._accumulated:
if self._fallback_final_send:
await self._send_fallback_final(self._accumulated)
elif current_update_visible and (
not self._adapter_requires_finalize
or self._last_edit_overflowed
elif self._final_response_sent:
# A finalize=True tick above already delivered the
# final answer via the adapter's fresh-final path
# (_try_fresh_final sent a fresh rich message and
# deleted the preview). Running a second finalize
# edit here would duplicate the message / re-delete,
# so just record delivery and stop.
self._final_content_delivered = True
elif (
current_update_visible
and (
not self._adapter_requires_finalize
or self._last_edit_overflowed
)
):
# Mid-stream edit above already delivered the
# final accumulated content. Skip the redundant
@ -729,6 +750,9 @@ class GatewayStreamConsumer:
return reply_to_id
try:
meta = dict(self.metadata) if self.metadata else {}
# This chunk becomes the next edit target — adapters that support
# rich final sends (Telegram) must keep it on the editable path.
meta["expect_edits"] = True
result = await self.adapter.send(
chat_id=self.chat_id,
content=text,
@ -737,6 +761,7 @@ class GatewayStreamConsumer:
)
if result.success and result.message_id:
self._message_id = str(result.message_id)
self._track_preview_ids_from_result(result)
self._already_sent = True
self._last_sent_text = text
# Fresh content bubble — close off any stale tool bubble
@ -1114,6 +1139,76 @@ class GatewayStreamConsumer:
age = time.monotonic() - self._message_created_ts
return age >= threshold
def _raw_message_limit(self) -> int:
"""Per-message length budget (in the adapter's ``message_len_fn`` units)
before the consumer splits an overflowing reply.
Adapters with a richer send/draft path (e.g. Telegram rich messages)
can raise this above ``MAX_MESSAGE_LENGTH`` via
``streaming_overflow_limit`` so a reply that fits one rich message isn't
fragmented at the legacy edit limit. Falls back to
``MAX_MESSAGE_LENGTH`` (4096 default) for everyone else.
"""
base = getattr(self.adapter, "MAX_MESSAGE_LENGTH", 4096)
# isinstance gate: MagicMock adapters return mock objects (truthy, not
# ints) for arbitrary attribute access — keep them on the base limit.
if isinstance(self.adapter, _BasePlatformAdapter):
try:
cap = self.adapter.streaming_overflow_limit()
except Exception as e:
logger.debug("streaming_overflow_limit check failed: %s", e)
cap = None
if isinstance(cap, int) and cap > base:
return cap
return base
def _track_preview_id(self, message_id: Optional[str]) -> None:
"""Record a real preview message id for fresh-final cleanup."""
if message_id and message_id != "__no_edit__":
self._preview_message_ids.add(str(message_id))
def _track_preview_ids_from_result(self, result: Any) -> None:
"""Record every message id a send/edit result exposes: the primary id
plus any continuation ids from an oversized split
(``continuation_message_ids`` or ``raw_response['message_ids']``)."""
self._track_preview_id(getattr(result, "message_id", None))
for mid in (getattr(result, "continuation_message_ids", None) or ()):
self._track_preview_id(mid)
raw = getattr(result, "raw_response", None) or {}
if isinstance(raw, dict):
for mid in (raw.get("message_ids") or ()):
self._track_preview_id(mid)
def _adapter_prefers_fresh_final(self, text: str) -> bool:
"""Return True when the adapter would rather finalize a streamed reply
by sending a fresh message and deleting the preview than by editing the
preview in place e.g. Telegram, whose ``sendRichMessage`` send path
currently renders richer markdown than Hermes' MarkdownV2 edit path.
Returns False when there is no real preview to replace (no message id,
or the ``__no_edit__`` sentinel), when the adapter doesn't expose the
hook, or on any error (the consumer then keeps the edit-in-place path).
"""
if not self._message_id or self._message_id == "__no_edit__":
return False
fn = getattr(self.adapter, "prefers_fresh_final_streaming", None)
if fn is None:
return False
try:
try:
result = fn(text, metadata=self.metadata)
except TypeError:
# Adapter / test double whose hook doesn't accept the metadata
# keyword — fall back to the positional-only form.
result = fn(text)
except Exception as e:
logger.debug("prefers_fresh_final_streaming check failed: %s", e)
return False
# ``is True`` (not ``bool(...)``) so a MagicMock adapter's auto-child
# method — truthy by default in tests — does not wrongly enable the
# fresh-final path. Mirrors the REQUIRES_EDIT_FINALIZE gate in __init__.
return result is True
async def _try_fresh_final(self, text: str, *, is_turn_final: bool = True) -> bool:
"""Send ``text`` as a brand-new message (best-effort delete the old
preview) so the platform's visible timestamp reflects completion
@ -1127,7 +1222,13 @@ class GatewayStreamConsumer:
Ported from openclaw/openclaw#72038.
"""
old_message_id = self._message_id
# Every preview message the user has seen for this response: the
# current one plus any continuation fragments tracked while streaming
# (an oversized reply split across the platform's edit limit). All of
# them are replaced by the single fresh message below.
stale_ids = set(self._preview_message_ids)
if self._message_id and self._message_id != "__no_edit__":
stale_ids.add(self._message_id)
try:
result = await self.adapter.send(
chat_id=self.chat_id,
@ -1139,25 +1240,29 @@ class GatewayStreamConsumer:
return False
if not getattr(result, "success", False):
return False
# Successful fresh send — try to delete the stale preview so the
# user doesn't see the old edit-stuck message underneath. Cleanup
# is best-effort; platforms that don't implement ``delete_message``
# just leave the preview behind (still an acceptable outcome —
# the visible final timestamp is the important part).
if old_message_id and old_message_id != "__no_edit__":
delete_fn = getattr(self.adapter, "delete_message", None)
if delete_fn is not None:
try:
await delete_fn(self.chat_id, old_message_id)
except Exception as e:
logger.debug(
"Fresh-final preview cleanup failed (%s): %s",
old_message_id, e,
)
# Adopt the new message id as the current message so subsequent
# callers (e.g. overflow split loops, finalize retries) see a
# consistent state.
new_message_id = getattr(result, "message_id", None)
# Successful fresh send — try to delete the stale preview(s) so the
# user doesn't see the old edit-stuck message(s) underneath. Cleanup
# is best-effort; platforms that don't implement ``delete_message``
# just leave the preview behind (still an acceptable outcome — the
# visible final timestamp is the important part). Never delete the
# message we just sent.
delete_fn = getattr(self.adapter, "delete_message", None)
if delete_fn is not None:
for stale_id in stale_ids:
if not stale_id or stale_id == "__no_edit__" or stale_id == new_message_id:
continue
try:
await delete_fn(self.chat_id, stale_id)
except Exception as e:
logger.debug(
"Fresh-final preview cleanup failed (%s): %s",
stale_id, e,
)
self._preview_message_ids = set()
if new_message_id:
self._message_id = new_message_id
self._message_created_ts = time.monotonic()
@ -1267,9 +1372,19 @@ class GatewayStreamConsumer:
# old preview follows. Ported from
# openclaw/openclaw#72038. Gated by config so the
# legacy edit-in-place path stays the default.
#
# Adapters can also opt in regardless of the time threshold
# via prefers_fresh_final_streaming (e.g. Telegram, whose
# send path renders richer markdown than its edit path):
# finalizing through edit would visibly downgrade a rich
# preview, so re-deliver as a fresh message + delete the
# preview instead.
if (
finalize
and self._should_send_fresh_final()
and (
self._should_send_fresh_final()
or self._adapter_prefers_fresh_final(text)
)
and await self._try_fresh_final(
text, is_turn_final=is_turn_final,
)
@ -1283,6 +1398,9 @@ class GatewayStreamConsumer:
)
if result.success:
self._already_sent = True
# Record any continuation fragments an oversized edit
# split off, so fresh-final can clean them all up.
self._track_preview_ids_from_result(result)
# Adapter may have split-and-delivered an oversized
# edit across the original message + N continuations.
# When that happens, ``message_id`` is the LAST visible
@ -1405,7 +1523,7 @@ class GatewayStreamConsumer:
chat_id=self.chat_id,
content=text,
reply_to=self._initial_reply_to_id,
metadata=self.metadata,
metadata={**(self.metadata or {}), "expect_edits": True},
)
if result.success:
if result.message_id:
@ -1414,6 +1532,9 @@ class GatewayStreamConsumer:
# the user so fresh-final logic can detect stale
# preview timestamps on long-running responses.
self._message_created_ts = time.monotonic()
# Record this (and any continuation fragments from an
# oversized first send) for fresh-final cleanup.
self._track_preview_ids_from_result(result)
else:
self._edit_supported = False
self._already_sent = True

View file

@ -887,6 +887,11 @@ class TestPromptBuilderConstants:
assert "table" in lowered
assert "task list" in lowered
assert "math" in lowered
# Hint should proactively steer toward structured formatting, not just
# permit it: bullet + numbered lists for scannable, structured output.
assert "bullet" in lowered
assert "numbered" in lowered
# Local media delivery guidance must remain intact.
assert "include MEDIA:" in hint
def test_platform_hints_mattermost(self):

View file

@ -321,3 +321,189 @@ class TestAlreadySentInDraftMode:
# After the regular sendMessage finalize, _already_sent is True.
assert consumer._already_sent is True
def _make_fresh_final_adapter():
"""Build a non-draft adapter that prefers a fresh final send.
Mirrors Telegram's rich-message contract: REQUIRES_EDIT_FINALIZE so the
final tick is routed through even when the text is unchanged, and
prefers_fresh_final_streaming() True so the consumer delivers the final
answer via a *fresh* send + preview delete instead of an edit.
``send`` returns two distinct ids so the test can tell the preview
(first send) from the fresh final (second send) and assert the preview
is the one deleted.
"""
from gateway.platforms.base import BasePlatformAdapter, SendResult
FreshFinalAdapter = type(
"FreshFinalAdapter",
(BasePlatformAdapter,),
{"MAX_MESSAGE_LENGTH": 4096, "REQUIRES_EDIT_FINALIZE": True},
)
FreshFinalAdapter.__abstractmethods__ = frozenset()
adapter = FreshFinalAdapter.__new__(FreshFinalAdapter)
adapter._typing_paused = set()
adapter._fatal_error_message = None
# Edit-based path only — no native drafts.
adapter.supports_draft_streaming = lambda chat_type=None, metadata=None: False
# Accepts the metadata kwarg the consumer passes; ignores it (like Telegram).
adapter.prefers_fresh_final_streaming = lambda content, metadata=None: True
adapter.send = AsyncMock(side_effect=[
SendResult(success=True, message_id="preview1"),
SendResult(success=True, message_id="final1"),
])
adapter.edit_message = AsyncMock(return_value=SendResult(success=True))
adapter.delete_message = AsyncMock(return_value=True)
return adapter
class TestAdapterPrefersFreshFinal:
"""An adapter whose send path is richer than its edit path (e.g. Telegram
rich messages) finalizes a streamed reply by sending a fresh final message
and deleting the preview, instead of final-editing the preview."""
@pytest.mark.asyncio
async def test_edit_stream_finalizes_with_fresh_send_and_deletes_preview(self):
adapter = _make_fresh_final_adapter()
cfg = StreamConsumerConfig(
transport="auto", chat_type="dm",
edit_interval=0.01, buffer_threshold=5, cursor="",
fresh_final_after_seconds=0.0, # only the adapter hook drives fresh-final
)
consumer = GatewayStreamConsumer(adapter, "12345", cfg)
consumer.on_delta("Full answer here")
task = asyncio.create_task(consumer.run())
# Let the first send land so a real preview message_id exists before
# finalization — the fresh-final path only engages with a live preview.
await asyncio.sleep(0.05)
consumer.finish()
await task
# Two sends: the streaming preview, then the fresh final.
assert adapter.send.await_count == 2
first_content = adapter.send.call_args_list[0].kwargs.get("content")
second_content = adapter.send.call_args_list[1].kwargs.get("content")
# First update delivered the preview via adapter.send.
assert first_content == "Full answer here"
# Finalization re-sent the same completed content as a fresh message.
assert second_content == "Full answer here"
# The edit path must NOT be used to finalize a rich preview.
adapter.edit_message.assert_not_called()
# The stale preview is best-effort deleted (by its id, not the final's).
adapter.delete_message.assert_awaited_once_with("12345", "preview1")
assert consumer.final_response_sent is True
def _make_rich_capable_adapter(*, overflow_limit=32768, send_results=None):
"""Non-draft adapter that mimics Telegram rich messages: REQUIRES_EDIT_FINALIZE,
prefers a fresh (rich) final send, and reports a 32,768 streaming overflow
limit so the consumer doesn't pre-split a reply that fits one rich message.
"""
from gateway.platforms.base import BasePlatformAdapter, SendResult
RichAdapter = type(
"RichCapableAdapter",
(BasePlatformAdapter,),
{"MAX_MESSAGE_LENGTH": 4096, "REQUIRES_EDIT_FINALIZE": True},
)
RichAdapter.__abstractmethods__ = frozenset()
adapter = RichAdapter.__new__(RichAdapter)
adapter._typing_paused = set()
adapter._fatal_error_message = None
adapter.supports_draft_streaming = lambda chat_type=None, metadata=None: False
adapter.prefers_fresh_final_streaming = lambda content, metadata=None: True
adapter.streaming_overflow_limit = lambda: overflow_limit
adapter.send = AsyncMock(side_effect=send_results) if send_results else AsyncMock(
return_value=SendResult(success=True, message_id="m1"),
)
adapter.edit_message = AsyncMock(return_value=SendResult(success=True))
adapter.delete_message = AsyncMock(return_value=True)
return adapter
class TestRichAwareOverflow:
"""Rich-capable adapters raise the consumer's overflow limit so a reply that
fits one rich message isn't fragmented at the legacy 4,096 edit limit."""
def test_raw_message_limit_uses_adapter_rich_cap(self):
adapter = _make_rich_capable_adapter(overflow_limit=32768)
consumer = GatewayStreamConsumer(adapter, "12345", StreamConsumerConfig())
assert consumer._raw_message_limit() == 32768
def test_raw_message_limit_falls_back_to_max_length(self):
# Adapter whose hook returns None (default) keeps the legacy limit.
adapter = _make_rich_capable_adapter()
adapter.streaming_overflow_limit = lambda: None
consumer = GatewayStreamConsumer(adapter, "12345", StreamConsumerConfig())
assert consumer._raw_message_limit() == 4096
def test_raw_message_limit_mock_adapter_is_safe(self):
# MagicMock adapters (many existing tests) must not crash or wrongly
# inflate the limit from a truthy auto-attribute.
adapter = MagicMock()
adapter.MAX_MESSAGE_LENGTH = 4096
consumer = GatewayStreamConsumer(adapter, "12345", StreamConsumerConfig())
assert consumer._raw_message_limit() == 4096
@pytest.mark.asyncio
async def test_long_rich_reply_not_split_and_final_is_whole(self):
from gateway.platforms.base import SendResult
long_text = "x" * 5000 # > 4096 legacy limit, < 32768 rich limit
adapter = _make_rich_capable_adapter(send_results=[
SendResult(success=True, message_id="preview1"),
SendResult(success=True, message_id="final1"),
])
cfg = StreamConsumerConfig(
transport="auto", chat_type="dm",
edit_interval=0.01, buffer_threshold=5, cursor="",
fresh_final_after_seconds=0.0,
)
consumer = GatewayStreamConsumer(adapter, "12345", cfg)
consumer.on_delta(long_text)
task = asyncio.create_task(consumer.run())
await asyncio.sleep(0.05)
consumer.finish()
await task
# Exactly two whole sends: the preview and the fresh final — NOT split
# into ~4096 chunks. Both carry the full 5,000-char reply.
assert adapter.send.await_count == 2
assert adapter.send.call_args_list[0].kwargs.get("content") == long_text
assert adapter.send.call_args_list[1].kwargs.get("content") == long_text
adapter.edit_message.assert_not_called()
adapter.delete_message.assert_awaited_once_with("12345", "preview1")
assert consumer.final_response_sent is True
@pytest.mark.asyncio
async def test_fresh_final_deletes_all_preview_fragments(self):
from gateway.platforms.base import SendResult
adapter = _make_rich_capable_adapter(send_results=[
SendResult(success=True, message_id="final1"),
])
consumer = GatewayStreamConsumer(adapter, "12345", StreamConsumerConfig())
# Simulate a reply that was split across the edit limit while streaming:
# three preview fragments, the last of which is the current message.
consumer._message_id = "frag3"
consumer._preview_message_ids = {"frag1", "frag2", "frag3"}
ok = await consumer._try_fresh_final("the whole completed answer")
assert ok is True
# All three stale fragments deleted; the fresh final never deleted.
deleted = {c.args[1] for c in adapter.delete_message.await_args_list}
assert deleted == {"frag1", "frag2", "frag3"}
assert "final1" not in deleted
assert consumer._message_id == "final1"
assert consumer._preview_message_ids == set()
assert consumer.final_response_sent is True

View file

@ -103,7 +103,8 @@ class TestInitialReplyToId:
await consumer._send_or_edit("Test")
call_kwargs = adapter.send.call_args[1]
assert call_kwargs["metadata"] == metadata
assert call_kwargs["metadata"] == {**metadata, "expect_edits": True}
assert metadata == {"thread_id": "omt_topic789"}
class TestOverflowFirstMessage:

View file

@ -79,6 +79,25 @@ async def test_legacy_rich_messages_config_is_ignored():
adapter._bot.send_message.assert_not_called()
@pytest.mark.asyncio
async def test_expect_edits_metadata_keeps_preview_on_legacy_path():
adapter = _make_adapter()
result = await adapter.send(
"12345",
RICH_CONTENT,
metadata={"expect_edits": True},
)
assert result.success is True
# Streaming preview sends will be edited later, so they must not be born as
# rich messages until Hermes wires rich_message edits directly.
bot = adapter._bot
assert bot is not None
bot.do_api_request.assert_not_called()
bot.send_message.assert_awaited()
@pytest.mark.asyncio
async def test_oversized_content_skips_rich_and_chunks():
adapter = _make_adapter()
@ -326,3 +345,40 @@ async def test_rich_draft_oversized_uses_legacy():
assert result.success is True
adapter._bot.do_api_request.assert_not_called()
adapter._bot.send_message_draft.assert_awaited_once()
# ----------------------------------------------------------------------
# prefers_fresh_final_streaming: the stream consumer asks the adapter whether
# to finalize a streamed reply by sending a fresh (rich) message + deleting the
# preview, instead of final-editing the preview through the non-rich edit path.
# Telegram opts in exactly when the content is rich-eligible.
# ----------------------------------------------------------------------
def test_prefers_fresh_final_streaming_when_rich_enabled():
adapter = _make_adapter()
assert adapter.prefers_fresh_final_streaming(RICH_CONTENT) is True
def test_prefers_fresh_final_streaming_ignores_legacy_toggle():
adapter = _make_adapter(extra={"rich_messages": False})
assert adapter.prefers_fresh_final_streaming(RICH_CONTENT) is True
# ----------------------------------------------------------------------
# streaming_overflow_limit: with rich on, the stream consumer may accumulate up
# to the 32,768-char rich cap before splitting, so a reply that fits one
# sendRichMessage / sendRichMessageDraft isn't fragmented at the 4,096 limit.
# ----------------------------------------------------------------------
def test_streaming_overflow_limit_is_rich_cap_when_enabled():
adapter = _make_adapter()
assert adapter.streaming_overflow_limit() == TelegramAdapter.RICH_MESSAGE_MAX_CHARS
def test_streaming_overflow_limit_ignores_legacy_toggle():
adapter = _make_adapter(extra={"rich_messages": False})
assert adapter.streaming_overflow_limit() == TelegramAdapter.RICH_MESSAGE_MAX_CHARS
def test_streaming_overflow_limit_none_when_rich_latched_off():
adapter = _make_adapter()
adapter._rich_send_disabled = True
assert adapter.streaming_overflow_limit() is None