fix(gateway): finalize best-effort delivery when stream consumer is cancelled

This commit is contained in:
GodsBoy 2026-06-10 11:10:17 +02:00 committed by Teknium
parent 590b3c0d7e
commit da818510ec
2 changed files with 131 additions and 2 deletions

View file

@ -652,11 +652,21 @@ class GatewayStreamConsumer:
await asyncio.sleep(0.05) # Small yield to not busy-loop
except asyncio.CancelledError:
# Best-effort final edit on cancellation
# Best-effort final edit on cancellation. finalize=True so
# REQUIRES_EDIT_FINALIZE platforms (Telegram) apply final
# formatting — a plain edit here would leave the entire reply
# rendered as a raw streaming preview while the success flags
# below suppress the gateway's formatted re-send.
# is_turn_final=False keeps _try_fresh_final from setting
# _final_response_sent itself; this handler owns the flags.
_best_effort_ok = False
if self._accumulated and self._message_id:
try:
_best_effort_ok = bool(await self._send_or_edit(self._accumulated))
_best_effort_ok = bool(
await self._send_or_edit(
self._accumulated, finalize=True, is_turn_final=False,
)
)
except Exception:
pass
# Only confirm final delivery if the best-effort send above

View file

@ -347,6 +347,125 @@ class TestSegmentBreakDoesNotMarkFinalSent:
assert any("answer is 42" in t for t in self._delivered_texts(adapter))
class TestCancelledBestEffortDeliveryFinalizes:
"""Cancel-path best-effort delivery must go through the finalize path.
The gateway cancels the consumer shortly after finish(). The
CancelledError handler re-delivers the accumulated text; previously it
did so with finalize=False, so REQUIRES_EDIT_FINALIZE platforms
(Telegram) kept the plain streaming preview the whole final reply
rendered with raw markdown markers while the success flags still
suppressed the gateway's formatted re-send.
"""
@pytest.mark.asyncio
async def test_cancel_best_effort_edit_is_finalized(self):
adapter = _make_adapter()
adapter.REQUIRES_EDIT_FINALIZE = True
consumer = GatewayStreamConsumer(
adapter=adapter,
chat_id="chat",
config=StreamConsumerConfig(
edit_interval=0.01, buffer_threshold=5, cursor="",
),
)
consumer.on_delta("Reply with **bold** and `code` markers.")
task = asyncio.create_task(consumer.run())
await asyncio.sleep(0.05) # preview lands; message_id set
task.cancel()
await asyncio.gather(task, return_exceptions=True)
finalize_edits = [
c for c in adapter.edit_message.call_args_list
if c.kwargs.get("finalize")
]
assert finalize_edits, (
"cancel best-effort delivery must use finalize=True so "
"REQUIRES_EDIT_FINALIZE platforms apply final formatting"
)
assert consumer.final_response_sent is True
assert consumer.final_content_delivered is True
@pytest.mark.asyncio
async def test_cancel_best_effort_failure_keeps_gateway_resend_possible(self):
adapter = _make_adapter()
adapter.REQUIRES_EDIT_FINALIZE = True
consumer = GatewayStreamConsumer(
adapter=adapter,
chat_id="chat",
config=StreamConsumerConfig(
edit_interval=0.01, buffer_threshold=5, cursor="",
),
)
consumer.on_delta("Reply with **bold** and `code` markers.")
task = asyncio.create_task(consumer.run())
await asyncio.sleep(0.05)
# Best-effort delivery at cancel time fails.
adapter.edit_message = AsyncMock(return_value=SimpleNamespace(
success=False, error="boom",
))
task.cancel()
await asyncio.gather(task, return_exceptions=True)
assert consumer.final_response_sent is False
assert consumer.final_content_delivered is False
@pytest.mark.asyncio
async def test_cancel_without_preview_makes_no_delivery_attempt(self):
adapter = _make_adapter()
adapter.REQUIRES_EDIT_FINALIZE = True
consumer = GatewayStreamConsumer(
adapter=adapter,
chat_id="chat",
config=StreamConsumerConfig(
edit_interval=0.01, buffer_threshold=5, cursor="",
),
)
task = asyncio.create_task(consumer.run())
await asyncio.sleep(0.02)
task.cancel()
await asyncio.gather(task, return_exceptions=True)
adapter.edit_message.assert_not_called()
assert consumer.final_response_sent is False
assert consumer.final_content_delivered is False
@pytest.mark.asyncio
async def test_cancel_with_fresh_final_enabled_delivers_and_flags_via_handler(self):
"""With fresh_final_after_seconds enabled and an aged preview, the
finalized cancel-path delivery is eligible for fresh-final
(delete + fresh send). is_turn_final=False keeps _try_fresh_final
from setting the flags itself; the cancel handler sets them after
the successful delivery."""
adapter = _make_adapter()
adapter.REQUIRES_EDIT_FINALIZE = True
adapter.send.side_effect = [
SimpleNamespace(success=True, message_id="initial_preview"),
SimpleNamespace(success=True, message_id="fresh_final"),
]
consumer = GatewayStreamConsumer(
adapter=adapter,
chat_id="chat",
config=StreamConsumerConfig(
edit_interval=0.01, buffer_threshold=5, cursor="",
fresh_final_after_seconds=0.001,
),
)
consumer.on_delta("Reply with **bold** and `code` markers.")
task = asyncio.create_task(consumer.run())
await asyncio.sleep(0.05)
consumer._message_created_ts = 0.0 # force the preview stale
task.cancel()
await asyncio.gather(task, return_exceptions=True)
# Fresh-final engaged: a second send replaced the stale preview.
assert adapter.send.call_count == 2
adapter.delete_message.assert_awaited_once_with("chat", "initial_preview")
# Flags were set by the cancel handler after successful delivery.
assert consumer.final_response_sent is True
assert consumer.final_content_delivered is True
class TestStreamConsumerConfigFreshFinalField:
"""The dataclass field must exist and default to 0 (disabled)."""