diff --git a/gateway/run.py b/gateway/run.py index 8ea699145a..8447dd0f51 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -9397,15 +9397,15 @@ class GatewayRunner: except Exception as e: logger.debug("Stream consumer wait before queued message failed: %s", e) _already_streamed = bool( - _sc - and ( - getattr(_sc, "final_response_sent", False) - or getattr(_sc, "already_sent", False) - ) + _sc and getattr(_sc, "final_response_sent", False) ) first_response = result.get("final_response", "") if first_response and not _already_streamed: try: + logger.info( + "Queued follow-up for session %s: final stream delivery not confirmed; sending first response before continuing.", + session_key[:20] if session_key else "?", + ) await adapter.send( source.chat_id, first_response, @@ -9413,6 +9413,11 @@ class GatewayRunner: ) except Exception as e: logger.warning("Failed to send first response before queued message: %s", e) + elif first_response: + logger.info( + "Queued follow-up for session %s: skipping resend because final streamed delivery was confirmed.", + session_key[:20] if session_key else "?", + ) # Release deferred bg-review notifications now that the # first response has been delivered. Pop from the # adapter's callback dict (prevents double-fire in @@ -9519,14 +9524,19 @@ class GatewayRunner: if isinstance(response, dict) and not response.get("failed"): _final = response.get("final_response") or "" _is_empty_sentinel = not _final or _final == "(empty)" - _streamed = _sc and ( - getattr(_sc, "final_response_sent", False) - or getattr(_sc, "already_sent", False) + _streamed = bool( + _sc and getattr(_sc, "final_response_sent", False) ) # response_previewed means the interim_assistant_callback already # sent the final text via the adapter (non-streaming path). _previewed = bool(response.get("response_previewed")) if not _is_empty_sentinel and (_streamed or _previewed): + logger.info( + "Suppressing normal final send for session %s: final delivery already confirmed (streamed=%s previewed=%s).", + session_key[:20] if session_key else "?", + _streamed, + _previewed, + ) response["already_sent"] = True return response diff --git a/tests/gateway/test_duplicate_reply_suppression.py b/tests/gateway/test_duplicate_reply_suppression.py index d8298db83c..f454d75eb0 100644 --- a/tests/gateway/test_duplicate_reply_suppression.py +++ b/tests/gateway/test_duplicate_reply_suppression.py @@ -3,10 +3,10 @@ Covers three fix paths: 1. base.py: stale response suppressed when interrupt_event is set and a pending message exists (#8221 / #2483) - 2. run.py return path: already_sent propagated from stream consumer's - already_sent flag without requiring response_previewed (#8375) - 3. run.py queued-message path: first response correctly detected as - already-streamed when already_sent is True without response_previewed + 2. run.py return path: only confirmed final streamed delivery suppresses + the fallback final send; partial streamed output must not + 3. run.py queued-message path: first response is skipped only when the + final response was actually streamed, not merely when partial output existed """ import asyncio @@ -153,15 +153,16 @@ class TestBaseInterruptSuppression: assert any(s["content"] == "Valid response" for s in adapter.sent) -# =================================================================== -# Test 2: run.py — already_sent without response_previewed (#8375) +# Test 2: run.py — partial streamed output must not suppress final send # =================================================================== -class TestAlreadySentWithoutResponsePreviewed: - """The already_sent flag on the response dict should be set when the - stream consumer's already_sent is True, even if response_previewed is - False. This prevents duplicate sends when streaming was interrupted - by flood control.""" +class TestOnlyFinalStreamDeliverySuppressesFinalSend: + """The gateway should suppress the fallback final send only when the + stream consumer confirmed the final assistant reply was delivered. + + Partial streamed output is not enough. If only already_sent=True, + the fallback final send must still happen so Telegram users don't lose + the real answer.""" def _make_mock_stream_consumer(self, already_sent=False, final_response_sent=False): sc = SimpleNamespace( @@ -170,21 +171,20 @@ class TestAlreadySentWithoutResponsePreviewed: ) return sc - def test_already_sent_set_without_response_previewed(self): - """Stream consumer already_sent=True should propagate to response - dict even when response_previewed is False.""" + def test_partial_stream_output_does_not_set_already_sent(self): + """already_sent=True alone must NOT suppress final delivery.""" sc = self._make_mock_stream_consumer(already_sent=True, final_response_sent=False) response = {"final_response": "text", "response_previewed": False} - # Reproduce the logic from run.py return path (post-fix) if sc and isinstance(response, dict) and not response.get("failed"): - if ( - getattr(sc, "final_response_sent", False) - or getattr(sc, "already_sent", False) - ): + _final = response.get("final_response") or "" + _is_empty_sentinel = not _final or _final == "(empty)" + _streamed = bool(sc and getattr(sc, "final_response_sent", False)) + _previewed = bool(response.get("response_previewed")) + if not _is_empty_sentinel and (_streamed or _previewed): response["already_sent"] = True - assert response.get("already_sent") is True + assert "already_sent" not in response def test_already_sent_not_set_when_nothing_sent(self): """When stream consumer hasn't sent anything, already_sent should @@ -193,24 +193,26 @@ class TestAlreadySentWithoutResponsePreviewed: response = {"final_response": "text", "response_previewed": False} if sc and isinstance(response, dict) and not response.get("failed"): - if ( - getattr(sc, "final_response_sent", False) - or getattr(sc, "already_sent", False) - ): + _final = response.get("final_response") or "" + _is_empty_sentinel = not _final or _final == "(empty)" + _streamed = bool(sc and getattr(sc, "final_response_sent", False)) + _previewed = bool(response.get("response_previewed")) + if not _is_empty_sentinel and (_streamed or _previewed): response["already_sent"] = True assert "already_sent" not in response def test_already_sent_set_on_final_response_sent(self): - """final_response_sent=True should still work as before.""" + """final_response_sent=True should suppress duplicate final sends.""" sc = self._make_mock_stream_consumer(already_sent=False, final_response_sent=True) response = {"final_response": "text"} if sc and isinstance(response, dict) and not response.get("failed"): - if ( - getattr(sc, "final_response_sent", False) - or getattr(sc, "already_sent", False) - ): + _final = response.get("final_response") or "" + _is_empty_sentinel = not _final or _final == "(empty)" + _streamed = bool(sc and getattr(sc, "final_response_sent", False)) + _previewed = bool(response.get("response_previewed")) + if not _is_empty_sentinel and (_streamed or _previewed): response["already_sent"] = True assert response.get("already_sent") is True @@ -222,10 +224,11 @@ class TestAlreadySentWithoutResponsePreviewed: response = {"final_response": "Error: something broke", "failed": True} if sc and isinstance(response, dict) and not response.get("failed"): - if ( - getattr(sc, "final_response_sent", False) - or getattr(sc, "already_sent", False) - ): + _final = response.get("final_response") or "" + _is_empty_sentinel = not _final or _final == "(empty)" + _streamed = bool(sc and getattr(sc, "final_response_sent", False)) + _previewed = bool(response.get("response_previewed")) + if not _is_empty_sentinel and (_streamed or _previewed): response["already_sent"] = True assert "already_sent" not in response @@ -255,10 +258,9 @@ class TestEmptyResponseNotSuppressed: if sc and isinstance(response, dict) and not response.get("failed"): _final = response.get("final_response") or "" _is_empty_sentinel = not _final or _final == "(empty)" - if not _is_empty_sentinel and ( - getattr(sc, "final_response_sent", False) - or getattr(sc, "already_sent", False) - ): + _streamed = bool(sc and getattr(sc, "final_response_sent", False)) + _previewed = bool(response.get("response_previewed")) + if not _is_empty_sentinel and (_streamed or _previewed): response["already_sent"] = True def test_empty_sentinel_not_suppressed_with_already_sent(self): @@ -283,10 +285,10 @@ class TestEmptyResponseNotSuppressed: self._apply_suppression_logic(response, sc) assert "already_sent" not in response - def test_real_response_still_suppressed_with_already_sent(self): - """Normal non-empty response should still be suppressed when - streaming delivered content.""" - sc = self._make_mock_stream_consumer(already_sent=True, final_response_sent=False) + def test_real_response_still_suppressed_only_when_final_delivery_confirmed(self): + """Normal non-empty response should be suppressed only when the final + response was actually streamed.""" + sc = self._make_mock_stream_consumer(already_sent=True, final_response_sent=True) response = {"final_response": "Here are the search results..."} self._apply_suppression_logic(response, sc) assert response.get("already_sent") is True @@ -299,8 +301,8 @@ class TestEmptyResponseNotSuppressed: assert "already_sent" not in response class TestQueuedMessageAlreadyStreamed: - """The queued-message path should detect that the first response was - already streamed (already_sent=True) even without response_previewed.""" + """The queued-message path should skip the first response only when the + final response was actually streamed.""" def _make_mock_sc(self, already_sent=False, final_response_sent=False): return SimpleNamespace( @@ -308,18 +310,23 @@ class TestQueuedMessageAlreadyStreamed: final_response_sent=final_response_sent, ) - def test_queued_path_detects_already_streamed(self): - """already_sent=True on stream consumer means first response was - streamed — skip re-sending before processing queued message.""" - _sc = self._make_mock_sc(already_sent=True) + def test_queued_path_only_skips_send_when_final_response_was_streamed(self): + """Partial streamed output alone must not suppress the first response + before the queued follow-up is processed.""" + _sc = self._make_mock_sc(already_sent=True, final_response_sent=False) - # Reproduce the queued-message logic from run.py (post-fix) _already_streamed = bool( - _sc - and ( - getattr(_sc, "final_response_sent", False) - or getattr(_sc, "already_sent", False) - ) + _sc and getattr(_sc, "final_response_sent", False) + ) + + assert _already_streamed is False + + def test_queued_path_detects_confirmed_final_stream_delivery(self): + """Confirmed final streamed delivery should skip the resend.""" + _sc = self._make_mock_sc(already_sent=True, final_response_sent=True) + + _already_streamed = bool( + _sc and getattr(_sc, "final_response_sent", False) ) assert _already_streamed is True @@ -327,14 +334,10 @@ class TestQueuedMessageAlreadyStreamed: def test_queued_path_sends_when_not_streamed(self): """Nothing was streamed — first response should be sent before processing the queued message.""" - _sc = self._make_mock_sc(already_sent=False) + _sc = self._make_mock_sc(already_sent=False, final_response_sent=False) _already_streamed = bool( - _sc - and ( - getattr(_sc, "final_response_sent", False) - or getattr(_sc, "already_sent", False) - ) + _sc and getattr(_sc, "final_response_sent", False) ) assert _already_streamed is False @@ -344,11 +347,7 @@ class TestQueuedMessageAlreadyStreamed: _sc = None _already_streamed = bool( - _sc - and ( - getattr(_sc, "final_response_sent", False) - or getattr(_sc, "already_sent", False) - ) + _sc and getattr(_sc, "final_response_sent", False) ) assert _already_streamed is False