mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-18 04:41:56 +00:00
fix(gateway): prevent duplicate final send when only cosmetic edit failed
When the stream consumer's got_done handler successfully delivers the final response content via _send_or_edit but the subsequent edit (e.g. cursor removal) fails, final_response_sent remains False even though the user has already received the final answer. The gateway's fallback send path then re-delivers the same content, causing the user to see the response twice on Telegram. Introduce a new _final_content_delivered flag on the stream consumer, set by the got_done handler when the final content has reached the user. The _run_agent suppression logic now treats this flag as an additional signal (alongside final_response_sent and response_previewed) that final delivery is already complete. This preserves the existing behavior for intermediate-text-only streams (where already_sent=True but no final content has been delivered) — those still receive the gateway's fallback send, matching the test expectation in test_partial_stream_output_does_not_set_already_sent. Adds TestFinalContentDeliveredSuppression with two cases covering both the suppression (content delivered + edit failed) and the non-suppression (intermediate text only) branches.
This commit is contained in:
parent
b4b8509fe8
commit
bc42e62b17
3 changed files with 80 additions and 2 deletions
|
|
@ -16131,6 +16131,7 @@ class GatewayRunner:
|
||||||
_already_streamed = bool(
|
_already_streamed = bool(
|
||||||
(_sc and getattr(_sc, "final_response_sent", False))
|
(_sc and getattr(_sc, "final_response_sent", False))
|
||||||
or _previewed
|
or _previewed
|
||||||
|
or (_sc and getattr(_sc, "final_content_delivered", False))
|
||||||
)
|
)
|
||||||
first_response = result.get("final_response", "")
|
first_response = result.get("final_response", "")
|
||||||
if first_response and not _already_streamed:
|
if first_response and not _already_streamed:
|
||||||
|
|
@ -16292,12 +16293,16 @@ class GatewayRunner:
|
||||||
# response_previewed means the interim_assistant_callback already
|
# response_previewed means the interim_assistant_callback already
|
||||||
# sent the final text via the adapter (non-streaming path).
|
# sent the final text via the adapter (non-streaming path).
|
||||||
_previewed = bool(response.get("response_previewed"))
|
_previewed = bool(response.get("response_previewed"))
|
||||||
if not _is_empty_sentinel and (_streamed or _previewed):
|
_content_delivered = bool(
|
||||||
|
_sc and getattr(_sc, "final_content_delivered", False)
|
||||||
|
)
|
||||||
|
if not _is_empty_sentinel and (_streamed or _previewed or _content_delivered):
|
||||||
logger.info(
|
logger.info(
|
||||||
"Suppressing normal final send for session %s: final delivery already confirmed (streamed=%s previewed=%s).",
|
"Suppressing normal final send for session %s: final delivery already confirmed (streamed=%s previewed=%s content_delivered=%s).",
|
||||||
session_key or "?",
|
session_key or "?",
|
||||||
_streamed,
|
_streamed,
|
||||||
_previewed,
|
_previewed,
|
||||||
|
_content_delivered,
|
||||||
)
|
)
|
||||||
response["already_sent"] = True
|
response["already_sent"] = True
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -150,6 +150,10 @@ class GatewayStreamConsumer:
|
||||||
self._flood_strikes = 0 # Consecutive flood-control edit failures
|
self._flood_strikes = 0 # Consecutive flood-control edit failures
|
||||||
self._current_edit_interval = self.cfg.edit_interval # Adaptive backoff
|
self._current_edit_interval = self.cfg.edit_interval # Adaptive backoff
|
||||||
self._final_response_sent = False
|
self._final_response_sent = False
|
||||||
|
# Set when the final response content was sent to the user via
|
||||||
|
# streaming, even if the final edit (cursor removal etc.)
|
||||||
|
# subsequently failed.
|
||||||
|
self._final_content_delivered = False
|
||||||
# Cache adapter lifecycle capability: only platforms that need an
|
# Cache adapter lifecycle capability: only platforms that need an
|
||||||
# explicit finalize call (e.g. DingTalk AI Cards) force us to make
|
# explicit finalize call (e.g. DingTalk AI Cards) force us to make
|
||||||
# a redundant final edit. Everyone else keeps the fast path.
|
# a redundant final edit. Everyone else keeps the fast path.
|
||||||
|
|
@ -187,6 +191,12 @@ class GatewayStreamConsumer:
|
||||||
"""True when the stream consumer delivered the final assistant reply."""
|
"""True when the stream consumer delivered the final assistant reply."""
|
||||||
return self._final_response_sent
|
return self._final_response_sent
|
||||||
|
|
||||||
|
@property
|
||||||
|
def final_content_delivered(self) -> bool:
|
||||||
|
"""True when the final response content reached the user, even if
|
||||||
|
the subsequent cosmetic edit (cursor removal) failed."""
|
||||||
|
return self._final_content_delivered
|
||||||
|
|
||||||
def on_segment_break(self) -> None:
|
def on_segment_break(self) -> None:
|
||||||
"""Finalize the current stream segment and start a fresh message."""
|
"""Finalize the current stream segment and start a fresh message."""
|
||||||
self._queue.put(_NEW_SEGMENT)
|
self._queue.put(_NEW_SEGMENT)
|
||||||
|
|
@ -455,6 +465,8 @@ class GatewayStreamConsumer:
|
||||||
# tool-progress edits or fallback-mode promotion (#10748)
|
# tool-progress edits or fallback-mode promotion (#10748)
|
||||||
# — that doesn't mean the final answer reached the user.
|
# — that doesn't mean the final answer reached the user.
|
||||||
self._final_response_sent = chunks_delivered
|
self._final_response_sent = chunks_delivered
|
||||||
|
if chunks_delivered:
|
||||||
|
self._final_content_delivered = True
|
||||||
return
|
return
|
||||||
if got_segment_break:
|
if got_segment_break:
|
||||||
self._message_id = None
|
self._message_id = None
|
||||||
|
|
@ -505,6 +517,11 @@ class GatewayStreamConsumer:
|
||||||
self._last_edit_time = time.monotonic()
|
self._last_edit_time = time.monotonic()
|
||||||
|
|
||||||
if got_done:
|
if got_done:
|
||||||
|
# Record that the final content reached the user even
|
||||||
|
# if the cosmetic final edit below fails.
|
||||||
|
if current_update_visible and self._accumulated:
|
||||||
|
self._final_content_delivered = True
|
||||||
|
|
||||||
# Final edit without cursor. If progressive editing failed
|
# Final edit without cursor. If progressive editing failed
|
||||||
# mid-stream, send a single continuation/fallback message
|
# mid-stream, send a single continuation/fallback message
|
||||||
# here instead of letting the base gateway path send the
|
# here instead of letting the base gateway path send the
|
||||||
|
|
|
||||||
|
|
@ -467,3 +467,59 @@ class TestCancellationHandlerDeliveryConfirmation:
|
||||||
final_response_sent = True
|
final_response_sent = True
|
||||||
|
|
||||||
assert final_response_sent is True # the bug: partial promoted to final
|
assert final_response_sent is True # the bug: partial promoted to final
|
||||||
|
|
||||||
|
|
||||||
|
class TestFinalContentDeliveredSuppression:
|
||||||
|
"""When stream consumer delivered the final content but the cosmetic
|
||||||
|
final edit (cursor removal) failed, the gateway must suppress the
|
||||||
|
fallback send to prevent duplicate messages.
|
||||||
|
|
||||||
|
Covers the scenario not handled by final_response_sent alone:
|
||||||
|
content reached the user via _send_or_edit, but the subsequent edit
|
||||||
|
that clears a typing cursor or streaming marker failed, leaving
|
||||||
|
final_response_sent=False even though the user already saw the text.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def test_content_delivered_but_final_edit_failed_suppresses(self):
|
||||||
|
"""final_content_delivered=True + final_response_sent=False
|
||||||
|
must suppress (content already visible to user)."""
|
||||||
|
sc = SimpleNamespace(
|
||||||
|
already_sent=True,
|
||||||
|
final_response_sent=False,
|
||||||
|
final_content_delivered=True,
|
||||||
|
)
|
||||||
|
response = {"final_response": "Hello!", "response_previewed": False}
|
||||||
|
|
||||||
|
_streamed = bool(getattr(sc, "final_response_sent", False))
|
||||||
|
_previewed = bool(response.get("response_previewed"))
|
||||||
|
_content_delivered = bool(getattr(sc, "final_content_delivered", False))
|
||||||
|
_is_empty_sentinel = (
|
||||||
|
not response.get("final_response")
|
||||||
|
or response.get("final_response") == "(empty)"
|
||||||
|
)
|
||||||
|
if not _is_empty_sentinel and (_streamed or _previewed or _content_delivered):
|
||||||
|
response["already_sent"] = True
|
||||||
|
|
||||||
|
assert response.get("already_sent") is True
|
||||||
|
|
||||||
|
def test_intermediate_text_only_does_not_suppress(self):
|
||||||
|
"""already_sent=True from intermediate text + final_content_delivered=False
|
||||||
|
must NOT suppress (user still needs the real final answer)."""
|
||||||
|
sc = SimpleNamespace(
|
||||||
|
already_sent=True,
|
||||||
|
final_response_sent=False,
|
||||||
|
final_content_delivered=False,
|
||||||
|
)
|
||||||
|
response = {"final_response": "Real answer", "response_previewed": False}
|
||||||
|
|
||||||
|
_streamed = bool(getattr(sc, "final_response_sent", False))
|
||||||
|
_previewed = bool(response.get("response_previewed"))
|
||||||
|
_content_delivered = bool(getattr(sc, "final_content_delivered", False))
|
||||||
|
_is_empty_sentinel = (
|
||||||
|
not response.get("final_response")
|
||||||
|
or response.get("final_response") == "(empty)"
|
||||||
|
)
|
||||||
|
if not _is_empty_sentinel and (_streamed or _previewed or _content_delivered):
|
||||||
|
response["already_sent"] = True
|
||||||
|
|
||||||
|
assert "already_sent" not in response
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue