From 6636fecd473b13a99f69fc2f742d7b2ce788c324 Mon Sep 17 00:00:00 2001 From: Teknium <127238744+teknium1@users.noreply.github.com> Date: Sun, 10 May 2026 15:13:54 -0700 Subject: [PATCH] fix(gateway): only mark final response sent when split-overflow chunks actually land (#23420) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The split-overflow path in _send_or_edit (gateway/stream_consumer.py) was copying the cumulative _already_sent flag into _final_response_sent on the done frame. _already_sent goes True on any successful prior edit (tool progress) or on fallback-mode promotion when an edit fails — neither proves the *current* chunked send delivered the final answer. When the chunked send actually fails (network error, flood control), the consumer would wrongly claim 'final delivered' and the gateway's independent fallback delivery in run.py would be suppressed. User saw only tool-progress bubbles and never got the answer. Now we track per-chunk success locally: _send_new_chunk returns the new message_id on success or returns the passed-in reply_to unchanged on failure. If at least one returned id differs, chunks_delivered = True; otherwise stays False, gateway fallback runs. Adds two regression tests: - test_split_overflow_failed_send_does_not_mark_final_sent — primes _already_sent=True, then makes every send fail; asserts _final_response_sent stays False. - test_split_overflow_partial_send_marks_final_sent — happy path, asserts _final_response_sent goes True. Note: the companion bug at the CancelledError handler (issue cited lines 417-418) was already fixed by 3b5572ded on 2026-04-16. Closes #10748 --- gateway/stream_consumer.py | 12 ++++- tests/gateway/test_stream_consumer.py | 72 +++++++++++++++++++++++++++ 2 files changed, 82 insertions(+), 2 deletions(-) diff --git a/gateway/stream_consumer.py b/gateway/stream_consumer.py index a0366037496..918c49ce85a 100644 --- a/gateway/stream_consumer.py +++ b/gateway/stream_consumer.py @@ -362,13 +362,21 @@ class GatewayStreamConsumer: chunks = self.adapter.truncate_message( self._accumulated, _safe_limit ) + chunks_delivered = False + reply_to = self._message_id for chunk in chunks: - await self._send_new_chunk(chunk, self._message_id) + new_id = await self._send_new_chunk(chunk, reply_to) + if new_id is not None and new_id != reply_to: + chunks_delivered = True self._accumulated = "" self._last_sent_text = "" self._last_edit_time = time.monotonic() if got_done: - self._final_response_sent = self._already_sent + # Only claim final delivery if THESE chunks actually + # landed. ``_already_sent`` may be True from prior + # tool-progress edits or fallback-mode promotion (#10748) + # — that doesn't mean the final answer reached the user. + self._final_response_sent = chunks_delivered return if got_segment_break: self._message_id = None diff --git a/tests/gateway/test_stream_consumer.py b/tests/gateway/test_stream_consumer.py index bc8df59191f..b5e423f9605 100644 --- a/tests/gateway/test_stream_consumer.py +++ b/tests/gateway/test_stream_consumer.py @@ -867,6 +867,78 @@ class TestSegmentBreakOnToolBoundary: assert consumer._final_response_sent is True +class TestFinalResponseDeliveryGuard: + """Regression coverage for #10748 — _final_response_sent must reflect + actual delivery of the *current* chunked send, not the cumulative + `_already_sent` flag (which earlier tool-progress edits or fallback-mode + promotion can taint).""" + + @pytest.mark.asyncio + async def test_split_overflow_failed_send_does_not_mark_final_sent(self): + """Split-overflow path: if every chunk send fails on done frame, + _final_response_sent must stay False so the gateway falls back.""" + adapter = MagicMock() + # Every send fails — _send_new_chunk returns the passed-in reply_to. + adapter.send = AsyncMock( + return_value=SimpleNamespace(success=False, error="network down"), + ) + adapter.edit_message = AsyncMock( + return_value=SimpleNamespace(success=True), + ) + adapter.MAX_MESSAGE_LENGTH = 100 + adapter.truncate_message = MagicMock( + side_effect=lambda text, limit: [text[:limit], text[limit:]], + ) + + config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5) + consumer = GatewayStreamConsumer(adapter, "chat_123", config) + + # Simulate prior tool-progress edits that set _already_sent + consumer._already_sent = True + + # Long text > MAX_MESSAGE_LENGTH, no existing message id (fresh send path) + long_text = "x" * 200 + consumer.on_delta(long_text) + task = asyncio.create_task(consumer.run()) + await asyncio.sleep(0.05) + consumer.finish() + await task + + assert consumer._final_response_sent is False, ( + "_already_sent leaked into _final_response_sent — gateway will " + "wrongly suppress its fallback delivery (#10748)" + ) + + @pytest.mark.asyncio + async def test_split_overflow_partial_send_marks_final_sent(self): + """Split-overflow path: if at least one chunk lands on done frame, + we did deliver the final answer — _final_response_sent must be True.""" + adapter = MagicMock() + adapter.send = AsyncMock(side_effect=[ + SimpleNamespace(success=True, message_id="msg_1"), + SimpleNamespace(success=True, message_id="msg_2"), + ]) + adapter.edit_message = AsyncMock( + return_value=SimpleNamespace(success=True), + ) + adapter.MAX_MESSAGE_LENGTH = 100 + adapter.truncate_message = MagicMock( + side_effect=lambda text, limit: [text[:limit], text[limit:]], + ) + + config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5) + consumer = GatewayStreamConsumer(adapter, "chat_123", config) + + long_text = "x" * 200 + consumer.on_delta(long_text) + task = asyncio.create_task(consumer.run()) + await asyncio.sleep(0.05) + consumer.finish() + await task + + assert consumer._final_response_sent is True + + class TestInterimCommentaryMessages: @pytest.mark.asyncio async def test_commentary_message_stays_separate_from_final_stream(self):