fix(gateway): confirm final delivery before suppressing send

Fixes #14238. During a compression/session split at the response
boundary, the interim callback delivered unrelated commentary, setting
response_previewed=True. The suppression logic treated that as proof the
final reply had been delivered and skipped the normal send — the response
was persisted to the child session but never sent to chat.

Only suppress the normal final send when the stream consumer confirms
final delivery (final_response_sent / final_content_delivered) or the
exact final response text was delivered as a preview.
This commit is contained in:
sgaofen 2026-06-29 02:14:08 -07:00 committed by Teknium
parent fa3dba4b30
commit 194bff0687
3 changed files with 89 additions and 10 deletions

View file

@ -17426,6 +17426,26 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
_notify_task = asyncio.create_task(_notify_long_running())
def _stream_confirmed_final_delivery(
consumer,
final_text: str,
*,
previewed: bool = False,
) -> bool:
"""Return True only when the actual final reply reached the user."""
if consumer is None:
return False
if getattr(consumer, "final_response_sent", False):
return True
if previewed:
has_delivered_text = getattr(consumer, "has_delivered_text", None)
if callable(has_delivered_text):
try:
return bool(has_delivered_text(final_text))
except Exception:
return False
return False
try:
# Run in thread pool to not block. Use an *inactivity*-based
# timeout instead of a wall-clock limit: the agent can run for
@ -17777,12 +17797,12 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
except Exception as e:
logger.debug("Stream consumer wait before queued message failed: %s", e)
_previewed = bool(result.get("response_previewed"))
_already_streamed = bool(
(_sc and getattr(_sc, "final_response_sent", False))
or _previewed
or (_sc and getattr(_sc, "final_content_delivered", False))
)
first_response = result.get("final_response", "")
_already_streamed = _stream_confirmed_final_delivery(
_sc,
first_response,
previewed=_previewed,
)
if first_response and not _already_streamed:
try:
logger.info(
@ -17953,11 +17973,10 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
if isinstance(response, dict) and not response.get("failed"):
_final = response.get("final_response") or ""
_is_empty_sentinel = not _final or _final == "(empty)"
_streamed = bool(
_sc and getattr(_sc, "final_response_sent", False)
)
# response_previewed means the interim_assistant_callback already
# sent the final text via the adapter (non-streaming path).
# saw the final text, but only suppress the normal send if that
# exact final text was delivered. Unrelated commentary/progress
# must not be mistaken for the final response (#14238).
_previewed = bool(response.get("response_previewed"))
_content_delivered = bool(
_sc and getattr(_sc, "final_content_delivered", False)
@ -17966,7 +17985,18 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
# after streaming finished — when the response was transformed, always
# send the final version so the appended content reaches the client.
_transformed = bool(response.get("response_transformed"))
if not _is_empty_sentinel and not _transformed and (_streamed or _previewed or _content_delivered):
# Only suppress the normal send when the actual final reply reached
# the user: the stream consumer streamed it (final_response_sent /
# final_content_delivered), or the interim preview delivered that
# *exact* final text. Unrelated commentary/progress shown during a
# compression/session split must not be mistaken for the final
# response (#14238).
_streamed = _stream_confirmed_final_delivery(
_sc,
_final,
previewed=_previewed,
)
if not _is_empty_sentinel and not _transformed and (_streamed or _content_delivered):
logger.info(
"Suppressing normal final send for session %s: final delivery already confirmed (streamed=%s previewed=%s content_delivered=%s).",
session_key or "?",

View file

@ -175,6 +175,7 @@ class GatewayStreamConsumer:
# streaming, even if the final edit (cursor removal etc.)
# subsequently failed.
self._final_content_delivered = False
self._delivered_commentary_texts: list[str] = []
# Cache adapter lifecycle capability: only platforms that need an
# explicit finalize call (e.g. DingTalk AI Cards) force us to make
# a redundant final edit. Everyone else keeps the fast path.
@ -291,6 +292,16 @@ class GatewayStreamConsumer:
pass
return await self.adapter.edit_message(**kwargs)
def has_delivered_text(self, text: str) -> bool:
"""Return True if *text* was already delivered as visible chat content."""
target = self._clean_for_display(text or "").strip()
if not target:
return False
visible_prefix = self._visible_prefix().strip()
if visible_prefix == target:
return True
return any(sent.strip() == target for sent in self._delivered_commentary_texts)
def on_segment_break(self) -> None:
"""Finalize the current stream segment and start a fresh message."""
self._queue.put(_NEW_SEGMENT)
@ -1173,6 +1184,10 @@ class GatewayStreamConsumer:
# stale tool bubble above it so the next tool starts a
# new bubble below.
self._notify_new_message()
# Record the exact delivered text so run.py can confirm whether
# an interim "preview" actually carried the final response, vs.
# unrelated commentary delivered during a session split (#14238).
self._delivered_commentary_texts.append(text)
return result.success
except Exception as e:
logger.error("Commentary send error: %s", e)

View file

@ -260,6 +260,7 @@ def _make_runner(adapter):
runner._session_db = None
runner._running_agents = {}
runner._session_run_generation = {}
runner.session_store = SimpleNamespace(_entries={}, _save=lambda: None)
runner.hooks = SimpleNamespace(loaded_hooks=False)
runner.config = SimpleNamespace(
thread_sessions_per_user=False,
@ -625,6 +626,24 @@ class PreviewedResponseAgent:
}
class PreviewedSplitAfterCommentaryAgent:
def __init__(self, **kwargs):
self.interim_assistant_callback = kwargs.get("interim_assistant_callback")
self.session_id = kwargs.get("session_id")
self.tools = []
def run_conversation(self, message, conversation_history=None, task_id=None):
if self.interim_assistant_callback:
self.interim_assistant_callback("I'll inspect the repo first.", already_streamed=False)
self.session_id = f"{self.session_id}-child"
return {
"final_response": "Final answer after compression.",
"response_previewed": True,
"messages": [],
"api_calls": 1,
}
class StreamingRefineAgent:
def __init__(self, **kwargs):
self.stream_delta_callback = kwargs.get("stream_delta_callback")
@ -942,6 +961,21 @@ async def test_run_agent_previewed_final_marks_already_sent(monkeypatch, tmp_pat
assert [call["content"] for call in adapter.sent] == ["You're welcome."]
@pytest.mark.asyncio
async def test_run_agent_previewed_split_keeps_final_delivery_pending(monkeypatch, tmp_path):
adapter, result = await _run_with_agent(
monkeypatch,
tmp_path,
PreviewedSplitAfterCommentaryAgent,
session_id="sess-split",
config_data={"display": {"interim_assistant_messages": True}},
)
assert result["session_id"] == "sess-split-child"
assert result.get("already_sent") is not True
assert [call["content"] for call in adapter.sent] == ["I'll inspect the repo first."]
@pytest.mark.asyncio
async def test_run_agent_matrix_streaming_omits_cursor(monkeypatch, tmp_path):
adapter, result = await _run_with_agent(