fix(stream-consumer): only set _final_content_delivered when final response confirmed delivered

In GatewayStreamConsumer._run(), _final_content_delivered was set to True
based on the success of a mid-stream finalize edit, before the final
finalize edit was attempted. When the final edit later failed (Telegram
flood control, retry-after), _final_response_sent stayed False but
_final_content_delivered was already True, so gateway/run.py suppressed
its normal final send and the user saw a partial / fallback message
instead of the real answer.

Changes in gateway/stream_consumer.py:
- Remove the premature _final_content_delivered = True at the top of
  the got_done block.
- Set _final_content_delivered = True only when the actual final send /
  edit succeeds, in each finalize branch (no-finalize adapter,
  _message_id finalize, no-_already_sent send).
- _send_fallback_final: don't set _final_response_sent = True when only
  some chunks were delivered; the gateway should still attempt a
  complete final send. Set _final_content_delivered = True alongside
  _final_response_sent on the success path and short-text path.
- Cancellation handler: set _final_content_delivered = True alongside
  _final_response_sent when the best-effort final edit succeeds.

Adds TestFinalContentDeliveredGuard with 3 regression tests covering
the core bug scenario, the happy path, and partial fallback.

Closes #33708
Closes #25010
Refs #29200

Co-authored-by: Teknium <127238744+teknium1@users.noreply.github.com>
This commit is contained in:
Indigo Karasu 2026-05-28 02:58:29 -07:00 committed by Teknium
parent a91b1c8b31
commit 9179396cb7
2 changed files with 141 additions and 9 deletions

View file

@ -552,11 +552,6 @@ class GatewayStreamConsumer:
self._last_edit_time = time.monotonic()
if got_done:
# Record that the final content reached the user even
# if the cosmetic final edit below fails.
if current_update_visible and self._accumulated:
self._final_content_delivered = True
# Final edit without cursor. If progressive editing failed
# mid-stream, send a single continuation/fallback message
# here instead of letting the base gateway path send the
@ -573,6 +568,7 @@ class GatewayStreamConsumer:
# final edit — but only for adapters that don't
# need an explicit finalize signal.
self._final_response_sent = True
self._final_content_delivered = True
elif self._message_id:
# Either the mid-stream edit didn't run (no
# visible update this tick) OR the adapter needs
@ -580,8 +576,12 @@ class GatewayStreamConsumer:
self._final_response_sent = await self._send_or_edit(
self._accumulated, finalize=True,
)
if self._final_response_sent:
self._final_content_delivered = True
elif not self._already_sent:
self._final_response_sent = await self._send_or_edit(self._accumulated)
if self._final_response_sent:
self._final_content_delivered = True
return
if commentary_text is not None:
@ -641,6 +641,7 @@ class GatewayStreamConsumer:
# "Let me search…") had been delivered, not the real answer.
if _best_effort_ok and not self._final_response_sent:
self._final_response_sent = True
self._final_content_delivered = True
except Exception as e:
logger.error("Stream consumer error: %s", e)
@ -778,6 +779,7 @@ class GatewayStreamConsumer:
pass
self._already_sent = True
self._final_response_sent = True
self._final_content_delivered = True
return
raw_limit = getattr(self.adapter, "MAX_MESSAGE_LENGTH", 4096)
@ -814,11 +816,13 @@ class GatewayStreamConsumer:
if not result or not result.success:
if sent_any_chunk:
# Some continuation text already reached the user. Suppress
# the base gateway final-send path so we don't resend the
# full response and create another duplicate.
# Some continuation text already reached the user, but not
# the full response. Do NOT set _final_response_sent — the
# base gateway final-send path should still deliver the
# complete response so the user gets the full answer.
# Suppress only _already_sent to avoid a duplicate send
# of the same partial content.
self._already_sent = True
self._final_response_sent = True
self._message_id = last_message_id
self._last_sent_text = last_successful_chunk
self._fallback_prefix = ""
@ -856,6 +860,7 @@ class GatewayStreamConsumer:
self._message_id = last_message_id
self._already_sent = True
self._final_response_sent = True
self._final_content_delivered = True
self._last_sent_text = chunks[-1]
self._fallback_prefix = ""

View file

@ -939,6 +939,133 @@ class TestFinalResponseDeliveryGuard:
assert consumer._final_response_sent is True
class TestFinalContentDeliveredGuard:
"""Regression coverage for #25010 — _final_content_delivered must only be
set when the final response is actually confirmed delivered to the user,
not when a mid-stream edit happened to show partial content. Prematurely
setting this flag causes the gateway to suppress the normal final send,
leaving the user with an incomplete partial message."""
@pytest.mark.asyncio
async def test_mid_stream_edit_success_does_not_mark_content_delivered(self):
"""When the mid-stream edit with finalize=True succeeds but the
subsequent finalize edit fails, _final_content_delivered must stay
False so the gateway does not suppress its fallback send (#25010).
Simulates TelegramAdapter which sets REQUIRES_EDIT_FINALIZE=True,
requiring a second finalize edit even when content is unchanged."""
adapter = MagicMock()
adapter.REQUIRES_EDIT_FINALIZE = True # Telegram adapter behavior
# First send (initial streaming message) succeeds
# Mid-stream finalize edit succeeds
# Final finalize edit FAILS (e.g. flood control on Telegram)
adapter.edit_message = AsyncMock(side_effect=[
SimpleNamespace(success=True), # mid-stream edit
SimpleNamespace(success=True), # finalize edit on line 548
SimpleNamespace(success=False), # final finalize on line 580 (FAILS)
])
adapter.send = AsyncMock(
return_value=SimpleNamespace(success=True, message_id="msg_1"),
)
adapter.MAX_MESSAGE_LENGTH = 4096
config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5)
consumer = GatewayStreamConsumer(adapter, "chat_123", config)
# Simulate streaming: send initial text, then more text, then done
consumer.on_delta("Part one of the response...\n")
task = asyncio.create_task(consumer.run())
await asyncio.sleep(0.05)
consumer.on_delta("Part two, the complete final answer.\n")
await asyncio.sleep(0.05)
consumer.finish()
await task
# The key assertion: _final_content_delivered must NOT be True,
# because the final edit failed and the complete response was never
# confirmed delivered.
assert consumer._final_content_delivered is False, (
"_final_content_delivered was prematurely set to True — gateway "
"will wrongly suppress its fallback send, leaving the user with "
"an incomplete partial message (#25010)"
)
# The gateway must still be allowed to send the complete response
assert consumer._final_response_sent is False, (
"_final_response_sent must also be False when the final edit failed"
)
@pytest.mark.asyncio
async def test_final_edit_success_does_mark_content_delivered(self):
"""When the final finalize edit succeeds, _final_content_delivered
must be True the normal happy path should still work."""
adapter = MagicMock()
adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True))
adapter.send = AsyncMock(
return_value=SimpleNamespace(success=True, message_id="msg_1"),
)
adapter.MAX_MESSAGE_LENGTH = 4096
config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5)
consumer = GatewayStreamConsumer(adapter, "chat_123", config)
consumer.on_delta("The complete response.\n")
task = asyncio.create_task(consumer.run())
await asyncio.sleep(0.05)
consumer.finish()
await task
assert consumer._final_content_delivered is True, (
"_final_content_delivered must be True when the final edit succeeds"
)
assert consumer._final_response_sent is True
@pytest.mark.asyncio
async def test_fallback_partial_send_does_not_mark_final_sent(self):
"""When fallback final send delivers only some chunks before failing,
_final_response_sent must stay False so the gateway can still attempt
a complete final send (#25010)."""
call_count = 0
async def fake_send(*, chat_id, content, **kwargs):
nonlocal call_count
call_count += 1
if call_count <= 2:
return SimpleNamespace(success=True, message_id="msg_1")
# Third chunk (fallback continuation) FAILS
return SimpleNamespace(success=False, error="flood_control:13.0")
adapter = MagicMock()
adapter.send = AsyncMock(side_effect=fake_send)
adapter.edit_message = AsyncMock(
return_value=SimpleNamespace(success=False, error="flood_control:13.0"),
)
adapter.MAX_MESSAGE_LENGTH = 4096
config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5)
consumer = GatewayStreamConsumer(adapter, "chat_123", config)
# Trigger enough delta to enter fallback mode
consumer.on_delta("Initial streaming text...\n")
task = asyncio.create_task(consumer.run())
await asyncio.sleep(0.05)
# Send a very long text that will trigger overflow/fallback
long_text = ("x" * 3000 + "\n") + ("y" * 3000 + "\n") + "Final answer.\n"
consumer.on_delta(long_text)
await asyncio.sleep(0.1)
consumer.finish()
await task
assert consumer._final_response_sent is False, (
"Partial fallback send must not set _final_response_sent — gateway "
"must still be able to deliver the complete response (#25010)"
)
class TestEditOverflowSplitAndDeliver:
"""When edit_message split-and-delivers an oversized payload across the
original message + N continuations (Telegram >4096 UTF-16), the consumer