From da818510ec753f1c7def777eeca51bb1e4d17d1e Mon Sep 17 00:00:00 2001 From: GodsBoy Date: Wed, 10 Jun 2026 11:10:17 +0200 Subject: [PATCH] fix(gateway): finalize best-effort delivery when stream consumer is cancelled --- gateway/stream_consumer.py | 14 ++- .../test_stream_consumer_fresh_final.py | 119 ++++++++++++++++++ 2 files changed, 131 insertions(+), 2 deletions(-) diff --git a/gateway/stream_consumer.py b/gateway/stream_consumer.py index 8ebde02411b..2bd77d8dac6 100644 --- a/gateway/stream_consumer.py +++ b/gateway/stream_consumer.py @@ -652,11 +652,21 @@ class GatewayStreamConsumer: await asyncio.sleep(0.05) # Small yield to not busy-loop except asyncio.CancelledError: - # Best-effort final edit on cancellation + # Best-effort final edit on cancellation. finalize=True so + # REQUIRES_EDIT_FINALIZE platforms (Telegram) apply final + # formatting — a plain edit here would leave the entire reply + # rendered as a raw streaming preview while the success flags + # below suppress the gateway's formatted re-send. + # is_turn_final=False keeps _try_fresh_final from setting + # _final_response_sent itself; this handler owns the flags. _best_effort_ok = False if self._accumulated and self._message_id: try: - _best_effort_ok = bool(await self._send_or_edit(self._accumulated)) + _best_effort_ok = bool( + await self._send_or_edit( + self._accumulated, finalize=True, is_turn_final=False, + ) + ) except Exception: pass # Only confirm final delivery if the best-effort send above diff --git a/tests/gateway/test_stream_consumer_fresh_final.py b/tests/gateway/test_stream_consumer_fresh_final.py index 2ecef4a488b..bf467781638 100644 --- a/tests/gateway/test_stream_consumer_fresh_final.py +++ b/tests/gateway/test_stream_consumer_fresh_final.py @@ -347,6 +347,125 @@ class TestSegmentBreakDoesNotMarkFinalSent: assert any("answer is 42" in t for t in self._delivered_texts(adapter)) +class TestCancelledBestEffortDeliveryFinalizes: + """Cancel-path best-effort delivery must go through the finalize path. + + The gateway cancels the consumer shortly after finish(). The + CancelledError handler re-delivers the accumulated text; previously it + did so with finalize=False, so REQUIRES_EDIT_FINALIZE platforms + (Telegram) kept the plain streaming preview — the whole final reply + rendered with raw markdown markers — while the success flags still + suppressed the gateway's formatted re-send. + """ + + @pytest.mark.asyncio + async def test_cancel_best_effort_edit_is_finalized(self): + adapter = _make_adapter() + adapter.REQUIRES_EDIT_FINALIZE = True + consumer = GatewayStreamConsumer( + adapter=adapter, + chat_id="chat", + config=StreamConsumerConfig( + edit_interval=0.01, buffer_threshold=5, cursor=" ▉", + ), + ) + consumer.on_delta("Reply with **bold** and `code` markers.") + task = asyncio.create_task(consumer.run()) + await asyncio.sleep(0.05) # preview lands; message_id set + task.cancel() + await asyncio.gather(task, return_exceptions=True) + + finalize_edits = [ + c for c in adapter.edit_message.call_args_list + if c.kwargs.get("finalize") + ] + assert finalize_edits, ( + "cancel best-effort delivery must use finalize=True so " + "REQUIRES_EDIT_FINALIZE platforms apply final formatting" + ) + assert consumer.final_response_sent is True + assert consumer.final_content_delivered is True + + @pytest.mark.asyncio + async def test_cancel_best_effort_failure_keeps_gateway_resend_possible(self): + adapter = _make_adapter() + adapter.REQUIRES_EDIT_FINALIZE = True + consumer = GatewayStreamConsumer( + adapter=adapter, + chat_id="chat", + config=StreamConsumerConfig( + edit_interval=0.01, buffer_threshold=5, cursor=" ▉", + ), + ) + consumer.on_delta("Reply with **bold** and `code` markers.") + task = asyncio.create_task(consumer.run()) + await asyncio.sleep(0.05) + # Best-effort delivery at cancel time fails. + adapter.edit_message = AsyncMock(return_value=SimpleNamespace( + success=False, error="boom", + )) + task.cancel() + await asyncio.gather(task, return_exceptions=True) + + assert consumer.final_response_sent is False + assert consumer.final_content_delivered is False + + @pytest.mark.asyncio + async def test_cancel_without_preview_makes_no_delivery_attempt(self): + adapter = _make_adapter() + adapter.REQUIRES_EDIT_FINALIZE = True + consumer = GatewayStreamConsumer( + adapter=adapter, + chat_id="chat", + config=StreamConsumerConfig( + edit_interval=0.01, buffer_threshold=5, cursor=" ▉", + ), + ) + task = asyncio.create_task(consumer.run()) + await asyncio.sleep(0.02) + task.cancel() + await asyncio.gather(task, return_exceptions=True) + + adapter.edit_message.assert_not_called() + assert consumer.final_response_sent is False + assert consumer.final_content_delivered is False + + @pytest.mark.asyncio + async def test_cancel_with_fresh_final_enabled_delivers_and_flags_via_handler(self): + """With fresh_final_after_seconds enabled and an aged preview, the + finalized cancel-path delivery is eligible for fresh-final + (delete + fresh send). is_turn_final=False keeps _try_fresh_final + from setting the flags itself; the cancel handler sets them after + the successful delivery.""" + adapter = _make_adapter() + adapter.REQUIRES_EDIT_FINALIZE = True + adapter.send.side_effect = [ + SimpleNamespace(success=True, message_id="initial_preview"), + SimpleNamespace(success=True, message_id="fresh_final"), + ] + consumer = GatewayStreamConsumer( + adapter=adapter, + chat_id="chat", + config=StreamConsumerConfig( + edit_interval=0.01, buffer_threshold=5, cursor=" ▉", + fresh_final_after_seconds=0.001, + ), + ) + consumer.on_delta("Reply with **bold** and `code` markers.") + task = asyncio.create_task(consumer.run()) + await asyncio.sleep(0.05) + consumer._message_created_ts = 0.0 # force the preview stale + task.cancel() + await asyncio.gather(task, return_exceptions=True) + + # Fresh-final engaged: a second send replaced the stale preview. + assert adapter.send.call_count == 2 + adapter.delete_message.assert_awaited_once_with("chat", "initial_preview") + # Flags were set by the cancel handler after successful delivery. + assert consumer.final_response_sent is True + assert consumer.final_content_delivered is True + + class TestStreamConsumerConfigFreshFinalField: """The dataclass field must exist and default to 0 (disabled)."""