From 9179396cb72a934d03b7f9917e3778dc6eeef99c Mon Sep 17 00:00:00 2001 From: Indigo Karasu Date: Thu, 28 May 2026 02:58:29 -0700 Subject: [PATCH] fix(stream-consumer): only set _final_content_delivered when final response confirmed delivered In GatewayStreamConsumer._run(), _final_content_delivered was set to True based on the success of a mid-stream finalize edit, before the final finalize edit was attempted. When the final edit later failed (Telegram flood control, retry-after), _final_response_sent stayed False but _final_content_delivered was already True, so gateway/run.py suppressed its normal final send and the user saw a partial / fallback message instead of the real answer. Changes in gateway/stream_consumer.py: - Remove the premature _final_content_delivered = True at the top of the got_done block. - Set _final_content_delivered = True only when the actual final send / edit succeeds, in each finalize branch (no-finalize adapter, _message_id finalize, no-_already_sent send). - _send_fallback_final: don't set _final_response_sent = True when only some chunks were delivered; the gateway should still attempt a complete final send. Set _final_content_delivered = True alongside _final_response_sent on the success path and short-text path. - Cancellation handler: set _final_content_delivered = True alongside _final_response_sent when the best-effort final edit succeeds. Adds TestFinalContentDeliveredGuard with 3 regression tests covering the core bug scenario, the happy path, and partial fallback. Closes #33708 Closes #25010 Refs #29200 Co-authored-by: Teknium <127238744+teknium1@users.noreply.github.com> --- gateway/stream_consumer.py | 23 +++-- tests/gateway/test_stream_consumer.py | 127 ++++++++++++++++++++++++++ 2 files changed, 141 insertions(+), 9 deletions(-) diff --git a/gateway/stream_consumer.py b/gateway/stream_consumer.py index 4ba65ddf4c5..18ab819eee9 100644 --- a/gateway/stream_consumer.py +++ b/gateway/stream_consumer.py @@ -552,11 +552,6 @@ class GatewayStreamConsumer: self._last_edit_time = time.monotonic() if got_done: - # Record that the final content reached the user even - # if the cosmetic final edit below fails. - if current_update_visible and self._accumulated: - self._final_content_delivered = True - # Final edit without cursor. If progressive editing failed # mid-stream, send a single continuation/fallback message # here instead of letting the base gateway path send the @@ -573,6 +568,7 @@ class GatewayStreamConsumer: # final edit — but only for adapters that don't # need an explicit finalize signal. self._final_response_sent = True + self._final_content_delivered = True elif self._message_id: # Either the mid-stream edit didn't run (no # visible update this tick) OR the adapter needs @@ -580,8 +576,12 @@ class GatewayStreamConsumer: self._final_response_sent = await self._send_or_edit( self._accumulated, finalize=True, ) + if self._final_response_sent: + self._final_content_delivered = True elif not self._already_sent: self._final_response_sent = await self._send_or_edit(self._accumulated) + if self._final_response_sent: + self._final_content_delivered = True return if commentary_text is not None: @@ -641,6 +641,7 @@ class GatewayStreamConsumer: # "Let me search…") had been delivered, not the real answer. if _best_effort_ok and not self._final_response_sent: self._final_response_sent = True + self._final_content_delivered = True except Exception as e: logger.error("Stream consumer error: %s", e) @@ -778,6 +779,7 @@ class GatewayStreamConsumer: pass self._already_sent = True self._final_response_sent = True + self._final_content_delivered = True return raw_limit = getattr(self.adapter, "MAX_MESSAGE_LENGTH", 4096) @@ -814,11 +816,13 @@ class GatewayStreamConsumer: if not result or not result.success: if sent_any_chunk: - # Some continuation text already reached the user. Suppress - # the base gateway final-send path so we don't resend the - # full response and create another duplicate. + # Some continuation text already reached the user, but not + # the full response. Do NOT set _final_response_sent — the + # base gateway final-send path should still deliver the + # complete response so the user gets the full answer. + # Suppress only _already_sent to avoid a duplicate send + # of the same partial content. self._already_sent = True - self._final_response_sent = True self._message_id = last_message_id self._last_sent_text = last_successful_chunk self._fallback_prefix = "" @@ -856,6 +860,7 @@ class GatewayStreamConsumer: self._message_id = last_message_id self._already_sent = True self._final_response_sent = True + self._final_content_delivered = True self._last_sent_text = chunks[-1] self._fallback_prefix = "" diff --git a/tests/gateway/test_stream_consumer.py b/tests/gateway/test_stream_consumer.py index 3a6baa65b05..9a445532d0d 100644 --- a/tests/gateway/test_stream_consumer.py +++ b/tests/gateway/test_stream_consumer.py @@ -939,6 +939,133 @@ class TestFinalResponseDeliveryGuard: assert consumer._final_response_sent is True +class TestFinalContentDeliveredGuard: + """Regression coverage for #25010 — _final_content_delivered must only be + set when the final response is actually confirmed delivered to the user, + not when a mid-stream edit happened to show partial content. Prematurely + setting this flag causes the gateway to suppress the normal final send, + leaving the user with an incomplete partial message.""" + + @pytest.mark.asyncio + async def test_mid_stream_edit_success_does_not_mark_content_delivered(self): + """When the mid-stream edit with finalize=True succeeds but the + subsequent finalize edit fails, _final_content_delivered must stay + False so the gateway does not suppress its fallback send (#25010). + + Simulates TelegramAdapter which sets REQUIRES_EDIT_FINALIZE=True, + requiring a second finalize edit even when content is unchanged.""" + adapter = MagicMock() + adapter.REQUIRES_EDIT_FINALIZE = True # Telegram adapter behavior + # First send (initial streaming message) succeeds + # Mid-stream finalize edit succeeds + # Final finalize edit FAILS (e.g. flood control on Telegram) + adapter.edit_message = AsyncMock(side_effect=[ + SimpleNamespace(success=True), # mid-stream edit + SimpleNamespace(success=True), # finalize edit on line 548 + SimpleNamespace(success=False), # final finalize on line 580 (FAILS) + ]) + adapter.send = AsyncMock( + return_value=SimpleNamespace(success=True, message_id="msg_1"), + ) + adapter.MAX_MESSAGE_LENGTH = 4096 + + config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5) + consumer = GatewayStreamConsumer(adapter, "chat_123", config) + + # Simulate streaming: send initial text, then more text, then done + consumer.on_delta("Part one of the response...\n") + task = asyncio.create_task(consumer.run()) + await asyncio.sleep(0.05) + + consumer.on_delta("Part two, the complete final answer.\n") + await asyncio.sleep(0.05) + + consumer.finish() + await task + + # The key assertion: _final_content_delivered must NOT be True, + # because the final edit failed and the complete response was never + # confirmed delivered. + assert consumer._final_content_delivered is False, ( + "_final_content_delivered was prematurely set to True — gateway " + "will wrongly suppress its fallback send, leaving the user with " + "an incomplete partial message (#25010)" + ) + # The gateway must still be allowed to send the complete response + assert consumer._final_response_sent is False, ( + "_final_response_sent must also be False when the final edit failed" + ) + + @pytest.mark.asyncio + async def test_final_edit_success_does_mark_content_delivered(self): + """When the final finalize edit succeeds, _final_content_delivered + must be True — the normal happy path should still work.""" + adapter = MagicMock() + adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True)) + adapter.send = AsyncMock( + return_value=SimpleNamespace(success=True, message_id="msg_1"), + ) + adapter.MAX_MESSAGE_LENGTH = 4096 + + config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5) + consumer = GatewayStreamConsumer(adapter, "chat_123", config) + + consumer.on_delta("The complete response.\n") + task = asyncio.create_task(consumer.run()) + await asyncio.sleep(0.05) + + consumer.finish() + await task + + assert consumer._final_content_delivered is True, ( + "_final_content_delivered must be True when the final edit succeeds" + ) + assert consumer._final_response_sent is True + + @pytest.mark.asyncio + async def test_fallback_partial_send_does_not_mark_final_sent(self): + """When fallback final send delivers only some chunks before failing, + _final_response_sent must stay False so the gateway can still attempt + a complete final send (#25010).""" + call_count = 0 + + async def fake_send(*, chat_id, content, **kwargs): + nonlocal call_count + call_count += 1 + if call_count <= 2: + return SimpleNamespace(success=True, message_id="msg_1") + # Third chunk (fallback continuation) FAILS + return SimpleNamespace(success=False, error="flood_control:13.0") + + adapter = MagicMock() + adapter.send = AsyncMock(side_effect=fake_send) + adapter.edit_message = AsyncMock( + return_value=SimpleNamespace(success=False, error="flood_control:13.0"), + ) + adapter.MAX_MESSAGE_LENGTH = 4096 + + config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5) + consumer = GatewayStreamConsumer(adapter, "chat_123", config) + + # Trigger enough delta to enter fallback mode + consumer.on_delta("Initial streaming text...\n") + task = asyncio.create_task(consumer.run()) + await asyncio.sleep(0.05) + + # Send a very long text that will trigger overflow/fallback + long_text = ("x" * 3000 + "\n") + ("y" * 3000 + "\n") + "Final answer.\n" + consumer.on_delta(long_text) + await asyncio.sleep(0.1) + + consumer.finish() + await task + + assert consumer._final_response_sent is False, ( + "Partial fallback send must not set _final_response_sent — gateway " + "must still be able to deliver the complete response (#25010)" + ) + + class TestEditOverflowSplitAndDeliver: """When edit_message split-and-delivers an oversized payload across the original message + N continuations (Telegram >4096 UTF-16), the consumer