From 194bff06876396f441fd40c12379066f52661832 Mon Sep 17 00:00:00 2001 From: sgaofen <135070653+sgaofen@users.noreply.github.com> Date: Mon, 29 Jun 2026 02:14:08 -0700 Subject: [PATCH] fix(gateway): confirm final delivery before suppressing send MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fixes #14238. During a compression/session split at the response boundary, the interim callback delivered unrelated commentary, setting response_previewed=True. The suppression logic treated that as proof the final reply had been delivered and skipped the normal send — the response was persisted to the child session but never sent to chat. Only suppress the normal final send when the stream consumer confirms final delivery (final_response_sent / final_content_delivered) or the exact final response text was delivered as a preview. --- gateway/run.py | 50 ++++++++++++++++++----- gateway/stream_consumer.py | 15 +++++++ tests/gateway/test_run_progress_topics.py | 34 +++++++++++++++ 3 files changed, 89 insertions(+), 10 deletions(-) diff --git a/gateway/run.py b/gateway/run.py index e67f9e6e00f..ceaefd9a830 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -17426,6 +17426,26 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew _notify_task = asyncio.create_task(_notify_long_running()) + def _stream_confirmed_final_delivery( + consumer, + final_text: str, + *, + previewed: bool = False, + ) -> bool: + """Return True only when the actual final reply reached the user.""" + if consumer is None: + return False + if getattr(consumer, "final_response_sent", False): + return True + if previewed: + has_delivered_text = getattr(consumer, "has_delivered_text", None) + if callable(has_delivered_text): + try: + return bool(has_delivered_text(final_text)) + except Exception: + return False + return False + try: # Run in thread pool to not block. Use an *inactivity*-based # timeout instead of a wall-clock limit: the agent can run for @@ -17777,12 +17797,12 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew except Exception as e: logger.debug("Stream consumer wait before queued message failed: %s", e) _previewed = bool(result.get("response_previewed")) - _already_streamed = bool( - (_sc and getattr(_sc, "final_response_sent", False)) - or _previewed - or (_sc and getattr(_sc, "final_content_delivered", False)) - ) first_response = result.get("final_response", "") + _already_streamed = _stream_confirmed_final_delivery( + _sc, + first_response, + previewed=_previewed, + ) if first_response and not _already_streamed: try: logger.info( @@ -17953,11 +17973,10 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew if isinstance(response, dict) and not response.get("failed"): _final = response.get("final_response") or "" _is_empty_sentinel = not _final or _final == "(empty)" - _streamed = bool( - _sc and getattr(_sc, "final_response_sent", False) - ) # response_previewed means the interim_assistant_callback already - # sent the final text via the adapter (non-streaming path). + # saw the final text, but only suppress the normal send if that + # exact final text was delivered. Unrelated commentary/progress + # must not be mistaken for the final response (#14238). _previewed = bool(response.get("response_previewed")) _content_delivered = bool( _sc and getattr(_sc, "final_content_delivered", False) @@ -17966,7 +17985,18 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew # after streaming finished — when the response was transformed, always # send the final version so the appended content reaches the client. _transformed = bool(response.get("response_transformed")) - if not _is_empty_sentinel and not _transformed and (_streamed or _previewed or _content_delivered): + # Only suppress the normal send when the actual final reply reached + # the user: the stream consumer streamed it (final_response_sent / + # final_content_delivered), or the interim preview delivered that + # *exact* final text. Unrelated commentary/progress shown during a + # compression/session split must not be mistaken for the final + # response (#14238). + _streamed = _stream_confirmed_final_delivery( + _sc, + _final, + previewed=_previewed, + ) + if not _is_empty_sentinel and not _transformed and (_streamed or _content_delivered): logger.info( "Suppressing normal final send for session %s: final delivery already confirmed (streamed=%s previewed=%s content_delivered=%s).", session_key or "?", diff --git a/gateway/stream_consumer.py b/gateway/stream_consumer.py index 6c115e715e7..d5641b66409 100644 --- a/gateway/stream_consumer.py +++ b/gateway/stream_consumer.py @@ -175,6 +175,7 @@ class GatewayStreamConsumer: # streaming, even if the final edit (cursor removal etc.) # subsequently failed. self._final_content_delivered = False + self._delivered_commentary_texts: list[str] = [] # Cache adapter lifecycle capability: only platforms that need an # explicit finalize call (e.g. DingTalk AI Cards) force us to make # a redundant final edit. Everyone else keeps the fast path. @@ -291,6 +292,16 @@ class GatewayStreamConsumer: pass return await self.adapter.edit_message(**kwargs) + def has_delivered_text(self, text: str) -> bool: + """Return True if *text* was already delivered as visible chat content.""" + target = self._clean_for_display(text or "").strip() + if not target: + return False + visible_prefix = self._visible_prefix().strip() + if visible_prefix == target: + return True + return any(sent.strip() == target for sent in self._delivered_commentary_texts) + def on_segment_break(self) -> None: """Finalize the current stream segment and start a fresh message.""" self._queue.put(_NEW_SEGMENT) @@ -1173,6 +1184,10 @@ class GatewayStreamConsumer: # stale tool bubble above it so the next tool starts a # new bubble below. self._notify_new_message() + # Record the exact delivered text so run.py can confirm whether + # an interim "preview" actually carried the final response, vs. + # unrelated commentary delivered during a session split (#14238). + self._delivered_commentary_texts.append(text) return result.success except Exception as e: logger.error("Commentary send error: %s", e) diff --git a/tests/gateway/test_run_progress_topics.py b/tests/gateway/test_run_progress_topics.py index ba97e570c26..00c6cce014f 100644 --- a/tests/gateway/test_run_progress_topics.py +++ b/tests/gateway/test_run_progress_topics.py @@ -260,6 +260,7 @@ def _make_runner(adapter): runner._session_db = None runner._running_agents = {} runner._session_run_generation = {} + runner.session_store = SimpleNamespace(_entries={}, _save=lambda: None) runner.hooks = SimpleNamespace(loaded_hooks=False) runner.config = SimpleNamespace( thread_sessions_per_user=False, @@ -625,6 +626,24 @@ class PreviewedResponseAgent: } +class PreviewedSplitAfterCommentaryAgent: + def __init__(self, **kwargs): + self.interim_assistant_callback = kwargs.get("interim_assistant_callback") + self.session_id = kwargs.get("session_id") + self.tools = [] + + def run_conversation(self, message, conversation_history=None, task_id=None): + if self.interim_assistant_callback: + self.interim_assistant_callback("I'll inspect the repo first.", already_streamed=False) + self.session_id = f"{self.session_id}-child" + return { + "final_response": "Final answer after compression.", + "response_previewed": True, + "messages": [], + "api_calls": 1, + } + + class StreamingRefineAgent: def __init__(self, **kwargs): self.stream_delta_callback = kwargs.get("stream_delta_callback") @@ -942,6 +961,21 @@ async def test_run_agent_previewed_final_marks_already_sent(monkeypatch, tmp_pat assert [call["content"] for call in adapter.sent] == ["You're welcome."] +@pytest.mark.asyncio +async def test_run_agent_previewed_split_keeps_final_delivery_pending(monkeypatch, tmp_path): + adapter, result = await _run_with_agent( + monkeypatch, + tmp_path, + PreviewedSplitAfterCommentaryAgent, + session_id="sess-split", + config_data={"display": {"interim_assistant_messages": True}}, + ) + + assert result["session_id"] == "sess-split-child" + assert result.get("already_sent") is not True + assert [call["content"] for call in adapter.sent] == ["I'll inspect the repo first."] + + @pytest.mark.asyncio async def test_run_agent_matrix_streaming_omits_cursor(monkeypatch, tmp_path): adapter, result = await _run_with_agent(