diff --git a/gateway/stream_consumer.py b/gateway/stream_consumer.py index 9e005754aa3..6c115e715e7 100644 --- a/gateway/stream_consumer.py +++ b/gateway/stream_consumer.py @@ -1440,11 +1440,37 @@ class GatewayStreamConsumer: # finalizing through edit would visibly downgrade a rich # preview, so re-deliver as a fresh message + delete the # preview instead. + # + # When the adapter exposes prefers_fresh_final_streaming + # and explicitly returns False, the time-based threshold + # must NOT override that decision. On Telegram the + # fresh-final path sends a Rich Message (sendRichMessage) + # that overlaps with the legacy MarkdownV2 preview already + # visible from streaming — both remain on screen because + # the old message is only best-effort deleted. Adapters + # without the hook still get the time-based fresh-final. + # (#47048) + # Check the *class* for the hook so MagicMock adapters + # (which auto-create attributes on access) are not + # falsely detected as having it. Also check instance + # __dict__ for test doubles that explicitly assign the + # attribute (e.g. adapter.prefers_fresh_final_streaming + # = MagicMock(return_value=False)). + _has_prefers_hook = ( + hasattr(type(self.adapter), + "prefers_fresh_final_streaming") + or "prefers_fresh_final_streaming" + in getattr(self.adapter, "__dict__", {}) + ) + _prefers_fresh = self._adapter_prefers_fresh_final(text) if ( finalize and ( - self._should_send_fresh_final() - or self._adapter_prefers_fresh_final(text) + _prefers_fresh + or ( + not _has_prefers_hook + and self._should_send_fresh_final() + ) ) and await self._try_fresh_final( text, is_turn_final=is_turn_final, diff --git a/tests/gateway/test_stream_consumer.py b/tests/gateway/test_stream_consumer.py index 9dca1f9bedd..d564f6b1dce 100644 --- a/tests/gateway/test_stream_consumer.py +++ b/tests/gateway/test_stream_consumer.py @@ -2009,3 +2009,106 @@ class TestUtf16OverflowDetection: # this file passing — they all use MagicMock adapters. assert consumer is not None + +class TestFreshFinalRespectsAdapterDecline: + """Regression: when an adapter explicitly declines fresh-final via + ``prefers_fresh_final_streaming = False``, the time-based + ``_should_send_fresh_final()`` must NOT override that decision. + (#47048 — Telegram rich-message overlap with legacy MarkdownV2 preview) + """ + + @pytest.mark.asyncio + async def test_adapter_decline_fresh_final_overrides_time_threshold(self): + """Adapter with prefers_fresh_final_streaming=False must NOT take + the fresh-final path even when fresh_final_after_seconds is large.""" + adapter = MagicMock() + adapter.MAX_MESSAGE_LENGTH = 4096 + adapter.send = AsyncMock( + return_value=SimpleNamespace(success=True, message_id="rich_msg"), + ) + adapter.edit_message = AsyncMock( + return_value=SimpleNamespace(success=True, message_id="edit_msg"), + ) + adapter.delete_message = AsyncMock(return_value=True) + # Adapter explicitly declines fresh-final (like Telegram) + adapter.prefers_fresh_final_streaming = MagicMock(return_value=False) + + config = StreamConsumerConfig( + edit_interval=0.01, + buffer_threshold=5, + fresh_final_after_seconds=1.0, # time threshold would trigger + cursor=" ▉", + ) + consumer = GatewayStreamConsumer(adapter, "chat_123", config) + + # Simulate: first message sent during streaming + consumer.on_delta("Hello world") + task = asyncio.create_task(consumer.run()) + await asyncio.sleep(0.05) + # First message should have been sent + assert consumer._message_id is not None + # Simulate time passing (beyond threshold) + consumer._message_created_ts -= 10.0 + + # Finalize + consumer.on_delta("Hello world final") + consumer.finish() + await task + + # The adapter declined fresh-final, so send() should NOT have been + # called for the final message — only edit_message(finalize=True). + adapter.send.assert_called_once() # Only the initial send + adapter.edit_message.assert_called() # Finalize edit + # Verify edit was called with finalize=True + edit_calls = [ + c for c in adapter.edit_message.call_args_list + if c.kwargs.get("finalize") or (len(c.args) > 3 and c.args[3]) + ] + assert len(edit_calls) >= 1, ( + "Expected finalize=True edit call, got none" + ) + + @pytest.mark.asyncio + async def test_no_hook_adapter_uses_time_threshold(self): + """Adapter WITHOUT prefers_fresh_final_streaming must still use + the time-based fresh-final path (backward compat).""" + adapter = MagicMock() + adapter.MAX_MESSAGE_LENGTH = 4096 + adapter.send = AsyncMock( + return_value=SimpleNamespace(success=True, message_id="msg_1"), + ) + adapter.edit_message = AsyncMock( + return_value=SimpleNamespace(success=True, message_id="edit_msg"), + ) + adapter.delete_message = AsyncMock(return_value=True) + # No prefers_fresh_final_streaming attribute + if hasattr(adapter, "prefers_fresh_final_streaming"): + del adapter.prefers_fresh_final_streaming + + config = StreamConsumerConfig( + edit_interval=0.01, + buffer_threshold=5, + fresh_final_after_seconds=1.0, + cursor=" ▉", + ) + consumer = GatewayStreamConsumer(adapter, "chat_123", config) + + # Simulate: first message sent during streaming + consumer.on_delta("Hello world") + task = asyncio.create_task(consumer.run()) + await asyncio.sleep(0.05) + assert consumer._message_id is not None + # Simulate time passing + consumer._message_created_ts -= 10.0 + + # Finalize + consumer.on_delta("Hello world final") + consumer.finish() + await task + + # Without the hook, time-based fresh-final should trigger: + # send() called twice (initial + fresh-final) + assert adapter.send.call_count == 2, ( + f"Expected 2 send calls (initial + fresh-final), got {adapter.send.call_count}" + ) +