diff --git a/gateway/stream_consumer.py b/gateway/stream_consumer.py index ae00aee39..146715b16 100644 --- a/gateway/stream_consumer.py +++ b/gateway/stream_consumer.py @@ -430,6 +430,21 @@ class GatewayStreamConsumer: # a real string like "msg_1", not "__no_edit__", so that case # still resets and creates a fresh segment as intended.) if got_segment_break: + # If the segment-break edit failed to deliver the + # accumulated content (flood control that has not yet + # promoted to fallback mode, or fallback mode itself), + # _accumulated still holds pre-boundary text the user + # never saw. Flush that tail as a continuation message + # before the reset below wipes _accumulated — otherwise + # text generated before the tool boundary is silently + # dropped (issue #8124). + if ( + self._accumulated + and not current_update_visible + and self._message_id + and self._message_id != "__no_edit__" + ): + await self._flush_segment_tail_on_edit_failure() self._reset_segment_state(preserve_no_edit=True) await asyncio.sleep(0.05) # Small yield to not busy-loop @@ -620,6 +635,39 @@ class GatewayStreamConsumer: err_lower = err.lower() return "flood" in err_lower or "retry after" in err_lower or "rate" in err_lower + async def _flush_segment_tail_on_edit_failure(self) -> None: + """Deliver un-sent tail content before a segment-break reset. + + When an edit fails (flood control, transport error) and a tool + boundary arrives before the next retry, ``_accumulated`` holds text + that was generated but never shown to the user. Without this flush, + the segment reset would discard that tail and leave a frozen cursor + in the partial message. + + Sends the tail that sits after the last successfully-delivered + prefix as a new message, and best-effort strips the stuck cursor + from the previous partial message. + """ + if not self._fallback_final_send: + await self._try_strip_cursor() + visible = self._fallback_prefix or self._visible_prefix() + tail = self._accumulated + if visible and tail.startswith(visible): + tail = tail[len(visible):].lstrip() + tail = self._clean_for_display(tail) + if not tail.strip(): + return + try: + result = await self.adapter.send( + chat_id=self.chat_id, + content=tail, + metadata=self.metadata, + ) + if result.success: + self._already_sent = True + except Exception as e: + logger.error("Segment-break tail flush error: %s", e) + async def _try_strip_cursor(self) -> None: """Best-effort edit to remove the cursor from the last visible message. diff --git a/tests/gateway/test_stream_consumer.py b/tests/gateway/test_stream_consumer.py index 99ac4dc18..3063196f4 100644 --- a/tests/gateway/test_stream_consumer.py +++ b/tests/gateway/test_stream_consumer.py @@ -502,11 +502,13 @@ class TestSegmentBreakOnToolBoundary: @pytest.mark.asyncio async def test_segment_break_clears_failed_edit_fallback_state(self): - """A tool boundary after edit failure must not duplicate the next segment.""" + """A tool boundary after edit failure must flush the undelivered tail + without duplicating the prefix the user already saw (#8124).""" adapter = MagicMock() send_results = [ SimpleNamespace(success=True, message_id="msg_1"), SimpleNamespace(success=True, message_id="msg_2"), + SimpleNamespace(success=True, message_id="msg_3"), ] adapter.send = AsyncMock(side_effect=send_results) adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=False, error="flood_control:6")) @@ -526,7 +528,60 @@ class TestSegmentBreakOnToolBoundary: await task sent_texts = [call[1]["content"] for call in adapter.send.call_args_list] - assert sent_texts == ["Hello ▉", "Next segment"] + # The undelivered "world" tail must reach the user, and the next + # segment must not duplicate "Hello" that was already visible. + assert sent_texts == ["Hello ▉", "world", "Next segment"] + + @pytest.mark.asyncio + async def test_segment_break_after_mid_stream_edit_failure_preserves_tail(self): + """Regression for #8124: when an earlier edit succeeded but later edits + fail (persistent flood control) and a tool boundary arrives before the + fallback threshold is reached, the pre-boundary tail must still be + delivered — not silently dropped by the segment reset.""" + adapter = MagicMock() + # msg_1 for the initial partial, msg_2 for the flushed tail, + # msg_3 for the post-boundary segment. + send_results = [ + SimpleNamespace(success=True, message_id="msg_1"), + SimpleNamespace(success=True, message_id="msg_2"), + SimpleNamespace(success=True, message_id="msg_3"), + ] + adapter.send = AsyncMock(side_effect=send_results) + + # First two edits succeed, everything after fails with flood control + # — simulating Telegram's "edit once then get rate-limited" pattern. + edit_results = [ + SimpleNamespace(success=True), # "Hello world ▉" — succeeds + SimpleNamespace(success=False, error="flood_control:6.0"), # "Hello world more ▉" — flood triggered + SimpleNamespace(success=False, error="flood_control:6.0"), # finalize edit at segment break + SimpleNamespace(success=False, error="flood_control:6.0"), # cursor-strip attempt + ] + adapter.edit_message = AsyncMock(side_effect=edit_results + [edit_results[-1]] * 10) + adapter.MAX_MESSAGE_LENGTH = 4096 + + config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5, cursor=" ▉") + consumer = GatewayStreamConsumer(adapter, "chat_123", config) + + consumer.on_delta("Hello") + task = asyncio.create_task(consumer.run()) + await asyncio.sleep(0.08) + consumer.on_delta(" world") + await asyncio.sleep(0.08) + consumer.on_delta(" more") + await asyncio.sleep(0.08) + consumer.on_delta(None) # tool boundary + consumer.on_delta("Here is the tool result.") + consumer.finish() + await task + + sent_texts = [call[1]["content"] for call in adapter.send.call_args_list] + # "more" must have been delivered, not dropped. + all_text = " ".join(sent_texts) + assert "more" in all_text, ( + f"Pre-boundary tail 'more' was silently dropped: sends={sent_texts}" + ) + # Post-boundary text must also reach the user. + assert "Here is the tool result." in all_text @pytest.mark.asyncio async def test_no_message_id_enters_fallback_mode(self):