diff --git a/gateway/stream_consumer.py b/gateway/stream_consumer.py index a0366037496..918c49ce85a 100644 --- a/gateway/stream_consumer.py +++ b/gateway/stream_consumer.py @@ -362,13 +362,21 @@ class GatewayStreamConsumer: chunks = self.adapter.truncate_message( self._accumulated, _safe_limit ) + chunks_delivered = False + reply_to = self._message_id for chunk in chunks: - await self._send_new_chunk(chunk, self._message_id) + new_id = await self._send_new_chunk(chunk, reply_to) + if new_id is not None and new_id != reply_to: + chunks_delivered = True self._accumulated = "" self._last_sent_text = "" self._last_edit_time = time.monotonic() if got_done: - self._final_response_sent = self._already_sent + # Only claim final delivery if THESE chunks actually + # landed. ``_already_sent`` may be True from prior + # tool-progress edits or fallback-mode promotion (#10748) + # — that doesn't mean the final answer reached the user. + self._final_response_sent = chunks_delivered return if got_segment_break: self._message_id = None diff --git a/tests/gateway/test_stream_consumer.py b/tests/gateway/test_stream_consumer.py index bc8df59191f..b5e423f9605 100644 --- a/tests/gateway/test_stream_consumer.py +++ b/tests/gateway/test_stream_consumer.py @@ -867,6 +867,78 @@ class TestSegmentBreakOnToolBoundary: assert consumer._final_response_sent is True +class TestFinalResponseDeliveryGuard: + """Regression coverage for #10748 — _final_response_sent must reflect + actual delivery of the *current* chunked send, not the cumulative + `_already_sent` flag (which earlier tool-progress edits or fallback-mode + promotion can taint).""" + + @pytest.mark.asyncio + async def test_split_overflow_failed_send_does_not_mark_final_sent(self): + """Split-overflow path: if every chunk send fails on done frame, + _final_response_sent must stay False so the gateway falls back.""" + adapter = MagicMock() + # Every send fails — _send_new_chunk returns the passed-in reply_to. + adapter.send = AsyncMock( + return_value=SimpleNamespace(success=False, error="network down"), + ) + adapter.edit_message = AsyncMock( + return_value=SimpleNamespace(success=True), + ) + adapter.MAX_MESSAGE_LENGTH = 100 + adapter.truncate_message = MagicMock( + side_effect=lambda text, limit: [text[:limit], text[limit:]], + ) + + config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5) + consumer = GatewayStreamConsumer(adapter, "chat_123", config) + + # Simulate prior tool-progress edits that set _already_sent + consumer._already_sent = True + + # Long text > MAX_MESSAGE_LENGTH, no existing message id (fresh send path) + long_text = "x" * 200 + consumer.on_delta(long_text) + task = asyncio.create_task(consumer.run()) + await asyncio.sleep(0.05) + consumer.finish() + await task + + assert consumer._final_response_sent is False, ( + "_already_sent leaked into _final_response_sent — gateway will " + "wrongly suppress its fallback delivery (#10748)" + ) + + @pytest.mark.asyncio + async def test_split_overflow_partial_send_marks_final_sent(self): + """Split-overflow path: if at least one chunk lands on done frame, + we did deliver the final answer — _final_response_sent must be True.""" + adapter = MagicMock() + adapter.send = AsyncMock(side_effect=[ + SimpleNamespace(success=True, message_id="msg_1"), + SimpleNamespace(success=True, message_id="msg_2"), + ]) + adapter.edit_message = AsyncMock( + return_value=SimpleNamespace(success=True), + ) + adapter.MAX_MESSAGE_LENGTH = 100 + adapter.truncate_message = MagicMock( + side_effect=lambda text, limit: [text[:limit], text[limit:]], + ) + + config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5) + consumer = GatewayStreamConsumer(adapter, "chat_123", config) + + long_text = "x" * 200 + consumer.on_delta(long_text) + task = asyncio.create_task(consumer.run()) + await asyncio.sleep(0.05) + consumer.finish() + await task + + assert consumer._final_response_sent is True + + class TestInterimCommentaryMessages: @pytest.mark.asyncio async def test_commentary_message_stays_separate_from_final_stream(self):