diff --git a/gateway/stream_consumer.py b/gateway/stream_consumer.py index e743df8d59..240084e9b1 100644 --- a/gateway/stream_consumer.py +++ b/gateway/stream_consumer.py @@ -280,6 +280,14 @@ class GatewayStreamConsumer: await self._send_or_edit(self._accumulated) except Exception: pass + # If we delivered any content before being cancelled, mark the + # final response as sent so the gateway's already_sent check + # doesn't trigger a duplicate message. The 5-second + # stream_task timeout (gateway/run.py) can cancel us while + # waiting on a slow Telegram API call — without this flag the + # gateway falls through to the normal send path. + if self._already_sent: + self._final_response_sent = True except Exception as e: logger.error("Stream consumer error: %s", e) diff --git a/tests/gateway/test_stream_consumer.py b/tests/gateway/test_stream_consumer.py index d66306722f..d8a1be2d2d 100644 --- a/tests/gateway/test_stream_consumer.py +++ b/tests/gateway/test_stream_consumer.py @@ -599,3 +599,84 @@ class TestInterimCommentaryMessages: assert sent_texts == ["Hello ▉", "world"] assert consumer.already_sent is True assert consumer.final_response_sent is True + + +class TestCancelledConsumerSetsFlags: + """Cancellation must set final_response_sent when already_sent is True. + + The 5-second stream_task timeout in gateway/run.py can cancel the + consumer while it's still processing. If final_response_sent stays + False, the gateway falls through to the normal send path and the + user sees a duplicate message. + """ + + @pytest.mark.asyncio + async def test_cancelled_with_already_sent_marks_final_response_sent(self): + """Cancelling after content was sent should set final_response_sent.""" + adapter = MagicMock() + adapter.send = AsyncMock( + return_value=SimpleNamespace(success=True, message_id="msg_1") + ) + adapter.edit_message = AsyncMock( + return_value=SimpleNamespace(success=True) + ) + adapter.MAX_MESSAGE_LENGTH = 4096 + + consumer = GatewayStreamConsumer( + adapter, + "chat_123", + StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5), + ) + + # Stream some text — the consumer sends it and sets already_sent + consumer.on_delta("Hello world") + task = asyncio.create_task(consumer.run()) + await asyncio.sleep(0.08) + + assert consumer.already_sent is True + + # Cancel the task (simulates the 5-second timeout in gateway) + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + + # The fix: final_response_sent should be True even though _DONE + # was never processed, preventing a duplicate message. + assert consumer.final_response_sent is True + + @pytest.mark.asyncio + async def test_cancelled_without_any_sends_does_not_mark_final(self): + """Cancelling before anything was sent should NOT set final_response_sent.""" + adapter = MagicMock() + adapter.send = AsyncMock( + return_value=SimpleNamespace(success=False, message_id=None) + ) + adapter.edit_message = AsyncMock( + return_value=SimpleNamespace(success=True) + ) + adapter.MAX_MESSAGE_LENGTH = 4096 + + consumer = GatewayStreamConsumer( + adapter, + "chat_123", + StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5), + ) + + # Send fails — already_sent stays False + consumer.on_delta("x") + task = asyncio.create_task(consumer.run()) + await asyncio.sleep(0.08) + + assert consumer.already_sent is False + + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + + # Without a successful send, final_response_sent should stay False + # so the normal gateway send path can deliver the response. + assert consumer.final_response_sent is False