mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-25 00:51:20 +00:00
fix(gateway): suppress duplicate replies on interrupt and streaming flood control
Three fixes for the duplicate reply bug affecting all gateway platforms: 1. base.py: Suppress stale response when the session was interrupted by a new message that hasn't been consumed yet. Checks both interrupt_event and _pending_messages to avoid false positives. (#8221, #2483) 2. run.py (return path): Remove response_previewed guard from already_sent check. Stream consumer's already_sent alone is authoritative — if content was delivered via streaming, the duplicate send must be suppressed regardless of the agent's response_previewed flag. (#8375) 3. run.py (queued-message path): Same fix — already_sent without response_previewed now correctly marks the first response as already streamed, preventing re-send before processing the queued message. The response_previewed field is still produced by the agent (run_agent.py) but is no longer required as a gate for duplicate suppression. The stream consumer's already_sent flag is the delivery-level truth about what the user actually saw. Concepts from PR #8380 (konsisumer). Closes #8375, #8221, #2483.
This commit is contained in:
parent
7b2700c9af
commit
2546b7acea
3 changed files with 308 additions and 10 deletions
|
|
@ -1624,6 +1624,21 @@ class BasePlatformAdapter(ABC):
|
|||
# streaming already delivered the text (already_sent=True) or
|
||||
# when the message was queued behind an active agent. Log at
|
||||
# DEBUG to avoid noisy warnings for expected behavior.
|
||||
#
|
||||
# Suppress stale response when the session was interrupted by a
|
||||
# new message that hasn't been consumed yet. The pending message
|
||||
# is processed by the pending-message handler below (#8221/#2483).
|
||||
if (
|
||||
response
|
||||
and interrupt_event.is_set()
|
||||
and session_key in self._pending_messages
|
||||
):
|
||||
logger.info(
|
||||
"[%s] Suppressing stale response for interrupted session %s",
|
||||
self.name,
|
||||
session_key,
|
||||
)
|
||||
response = None
|
||||
if not response:
|
||||
logger.debug("[%s] Handler returned empty/None response for %s", self.name, event.source.chat_id)
|
||||
if response:
|
||||
|
|
|
|||
|
|
@ -9231,15 +9231,11 @@ class GatewayRunner:
|
|||
pass
|
||||
except Exception as e:
|
||||
logger.debug("Stream consumer wait before queued message failed: %s", e)
|
||||
_response_previewed = bool(result.get("response_previewed"))
|
||||
_already_streamed = bool(
|
||||
_sc
|
||||
and (
|
||||
getattr(_sc, "final_response_sent", False)
|
||||
or (
|
||||
_response_previewed
|
||||
and getattr(_sc, "already_sent", False)
|
||||
)
|
||||
or getattr(_sc, "already_sent", False)
|
||||
)
|
||||
)
|
||||
first_response = result.get("final_response", "")
|
||||
|
|
@ -9323,13 +9319,9 @@ class GatewayRunner:
|
|||
# them even if streaming had sent earlier partial output.
|
||||
_sc = stream_consumer_holder[0]
|
||||
if _sc and isinstance(response, dict) and not response.get("failed"):
|
||||
_response_previewed = bool(response.get("response_previewed"))
|
||||
if (
|
||||
getattr(_sc, "final_response_sent", False)
|
||||
or (
|
||||
_response_previewed
|
||||
and getattr(_sc, "already_sent", False)
|
||||
)
|
||||
or getattr(_sc, "already_sent", False)
|
||||
):
|
||||
response["already_sent"] = True
|
||||
|
||||
|
|
|
|||
291
tests/gateway/test_duplicate_reply_suppression.py
Normal file
291
tests/gateway/test_duplicate_reply_suppression.py
Normal file
|
|
@ -0,0 +1,291 @@
|
|||
"""Tests for duplicate reply suppression across the gateway stack.
|
||||
|
||||
Covers three fix paths:
|
||||
1. base.py: stale response suppressed when interrupt_event is set and a
|
||||
pending message exists (#8221 / #2483)
|
||||
2. run.py return path: already_sent propagated from stream consumer's
|
||||
already_sent flag without requiring response_previewed (#8375)
|
||||
3. run.py queued-message path: first response correctly detected as
|
||||
already-streamed when already_sent is True without response_previewed
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
from types import SimpleNamespace
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from gateway.config import Platform, PlatformConfig
|
||||
from gateway.platforms.base import (
|
||||
BasePlatformAdapter,
|
||||
MessageEvent,
|
||||
MessageType,
|
||||
ProcessingOutcome,
|
||||
SendResult,
|
||||
)
|
||||
from gateway.session import SessionSource, build_session_key
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class StubAdapter(BasePlatformAdapter):
|
||||
"""Minimal concrete adapter for testing."""
|
||||
|
||||
def __init__(self):
|
||||
super().__init__(PlatformConfig(enabled=True, token="fake"), Platform.DISCORD)
|
||||
self.sent = []
|
||||
|
||||
async def connect(self):
|
||||
return True
|
||||
|
||||
async def disconnect(self):
|
||||
pass
|
||||
|
||||
async def send(self, chat_id, content, reply_to=None, metadata=None):
|
||||
self.sent.append({"chat_id": chat_id, "content": content})
|
||||
return SendResult(success=True, message_id="msg1")
|
||||
|
||||
async def send_typing(self, chat_id, metadata=None):
|
||||
pass
|
||||
|
||||
async def get_chat_info(self, chat_id):
|
||||
return {"id": chat_id}
|
||||
|
||||
|
||||
def _make_event(text="hello", chat_id="c1", user_id="u1"):
|
||||
return MessageEvent(
|
||||
text=text,
|
||||
source=SessionSource(
|
||||
platform=Platform.DISCORD,
|
||||
chat_id=chat_id,
|
||||
chat_type="dm",
|
||||
user_id=user_id,
|
||||
),
|
||||
message_id="m1",
|
||||
)
|
||||
|
||||
|
||||
# ===================================================================
|
||||
# Test 1: base.py — stale response suppressed on interrupt (#8221)
|
||||
# ===================================================================
|
||||
|
||||
class TestBaseInterruptSuppression:
|
||||
@pytest.mark.asyncio
|
||||
async def test_stale_response_suppressed_when_interrupted(self):
|
||||
"""When interrupt_event is set AND a pending message exists,
|
||||
base.py should suppress the stale response instead of sending it."""
|
||||
adapter = StubAdapter()
|
||||
|
||||
stale_response = "This is the stale answer to the first question."
|
||||
pending_response = "This is the answer to the second question."
|
||||
call_count = 0
|
||||
|
||||
async def fake_handler(event):
|
||||
nonlocal call_count
|
||||
call_count += 1
|
||||
if call_count == 1:
|
||||
return stale_response
|
||||
return pending_response
|
||||
|
||||
adapter.set_message_handler(fake_handler)
|
||||
|
||||
event_a = _make_event(text="first question")
|
||||
session_key = build_session_key(event_a.source)
|
||||
|
||||
# Simulate: message A is being processed, message B arrives
|
||||
# The interrupt event is set and B is in pending_messages
|
||||
interrupt_event = asyncio.Event()
|
||||
interrupt_event.set()
|
||||
adapter._active_sessions[session_key] = interrupt_event
|
||||
|
||||
event_b = _make_event(text="second question")
|
||||
adapter._pending_messages[session_key] = event_b
|
||||
|
||||
await adapter._process_message_background(event_a, session_key)
|
||||
|
||||
# The stale response should NOT have been sent.
|
||||
stale_sends = [s for s in adapter.sent if s["content"] == stale_response]
|
||||
assert len(stale_sends) == 0, (
|
||||
f"Stale response was sent {len(stale_sends)} time(s) — should be suppressed"
|
||||
)
|
||||
# The pending message's response SHOULD have been sent.
|
||||
pending_sends = [s for s in adapter.sent if s["content"] == pending_response]
|
||||
assert len(pending_sends) == 1, "Pending message response should be sent"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_response_not_suppressed_without_interrupt(self):
|
||||
"""Normal case: no interrupt, response should be sent."""
|
||||
adapter = StubAdapter()
|
||||
|
||||
async def fake_handler(event):
|
||||
return "Normal response"
|
||||
|
||||
adapter.set_message_handler(fake_handler)
|
||||
event = _make_event()
|
||||
session_key = build_session_key(event.source)
|
||||
|
||||
await adapter._process_message_background(event, session_key)
|
||||
|
||||
assert any(s["content"] == "Normal response" for s in adapter.sent)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_response_not_suppressed_with_interrupt_but_no_pending(self):
|
||||
"""Interrupt event set but no pending message (race already resolved) —
|
||||
response should still be sent."""
|
||||
adapter = StubAdapter()
|
||||
|
||||
async def fake_handler(event):
|
||||
return "Valid response"
|
||||
|
||||
adapter.set_message_handler(fake_handler)
|
||||
event = _make_event()
|
||||
session_key = build_session_key(event.source)
|
||||
|
||||
# Set interrupt but no pending message
|
||||
interrupt_event = asyncio.Event()
|
||||
interrupt_event.set()
|
||||
adapter._active_sessions[session_key] = interrupt_event
|
||||
|
||||
await adapter._process_message_background(event, session_key)
|
||||
|
||||
assert any(s["content"] == "Valid response" for s in adapter.sent)
|
||||
|
||||
|
||||
# ===================================================================
|
||||
# Test 2: run.py — already_sent without response_previewed (#8375)
|
||||
# ===================================================================
|
||||
|
||||
class TestAlreadySentWithoutResponsePreviewed:
|
||||
"""The already_sent flag on the response dict should be set when the
|
||||
stream consumer's already_sent is True, even if response_previewed is
|
||||
False. This prevents duplicate sends when streaming was interrupted
|
||||
by flood control."""
|
||||
|
||||
def _make_mock_stream_consumer(self, already_sent=False, final_response_sent=False):
|
||||
sc = SimpleNamespace(
|
||||
already_sent=already_sent,
|
||||
final_response_sent=final_response_sent,
|
||||
)
|
||||
return sc
|
||||
|
||||
def test_already_sent_set_without_response_previewed(self):
|
||||
"""Stream consumer already_sent=True should propagate to response
|
||||
dict even when response_previewed is False."""
|
||||
sc = self._make_mock_stream_consumer(already_sent=True, final_response_sent=False)
|
||||
response = {"final_response": "text", "response_previewed": False}
|
||||
|
||||
# Reproduce the logic from run.py return path (post-fix)
|
||||
if sc and isinstance(response, dict) and not response.get("failed"):
|
||||
if (
|
||||
getattr(sc, "final_response_sent", False)
|
||||
or getattr(sc, "already_sent", False)
|
||||
):
|
||||
response["already_sent"] = True
|
||||
|
||||
assert response.get("already_sent") is True
|
||||
|
||||
def test_already_sent_not_set_when_nothing_sent(self):
|
||||
"""When stream consumer hasn't sent anything, already_sent should
|
||||
not be set on the response."""
|
||||
sc = self._make_mock_stream_consumer(already_sent=False, final_response_sent=False)
|
||||
response = {"final_response": "text", "response_previewed": False}
|
||||
|
||||
if sc and isinstance(response, dict) and not response.get("failed"):
|
||||
if (
|
||||
getattr(sc, "final_response_sent", False)
|
||||
or getattr(sc, "already_sent", False)
|
||||
):
|
||||
response["already_sent"] = True
|
||||
|
||||
assert "already_sent" not in response
|
||||
|
||||
def test_already_sent_set_on_final_response_sent(self):
|
||||
"""final_response_sent=True should still work as before."""
|
||||
sc = self._make_mock_stream_consumer(already_sent=False, final_response_sent=True)
|
||||
response = {"final_response": "text"}
|
||||
|
||||
if sc and isinstance(response, dict) and not response.get("failed"):
|
||||
if (
|
||||
getattr(sc, "final_response_sent", False)
|
||||
or getattr(sc, "already_sent", False)
|
||||
):
|
||||
response["already_sent"] = True
|
||||
|
||||
assert response.get("already_sent") is True
|
||||
|
||||
def test_already_sent_not_set_on_failed_response(self):
|
||||
"""Failed responses should never be suppressed — user needs to see
|
||||
the error message even if streaming sent earlier partial output."""
|
||||
sc = self._make_mock_stream_consumer(already_sent=True, final_response_sent=False)
|
||||
response = {"final_response": "Error: something broke", "failed": True}
|
||||
|
||||
if sc and isinstance(response, dict) and not response.get("failed"):
|
||||
if (
|
||||
getattr(sc, "final_response_sent", False)
|
||||
or getattr(sc, "already_sent", False)
|
||||
):
|
||||
response["already_sent"] = True
|
||||
|
||||
assert "already_sent" not in response
|
||||
|
||||
|
||||
# ===================================================================
|
||||
# Test 3: run.py queued-message path — _already_streamed detection
|
||||
# ===================================================================
|
||||
|
||||
class TestQueuedMessageAlreadyStreamed:
|
||||
"""The queued-message path should detect that the first response was
|
||||
already streamed (already_sent=True) even without response_previewed."""
|
||||
|
||||
def _make_mock_sc(self, already_sent=False, final_response_sent=False):
|
||||
return SimpleNamespace(
|
||||
already_sent=already_sent,
|
||||
final_response_sent=final_response_sent,
|
||||
)
|
||||
|
||||
def test_queued_path_detects_already_streamed(self):
|
||||
"""already_sent=True on stream consumer means first response was
|
||||
streamed — skip re-sending before processing queued message."""
|
||||
_sc = self._make_mock_sc(already_sent=True)
|
||||
|
||||
# Reproduce the queued-message logic from run.py (post-fix)
|
||||
_already_streamed = bool(
|
||||
_sc
|
||||
and (
|
||||
getattr(_sc, "final_response_sent", False)
|
||||
or getattr(_sc, "already_sent", False)
|
||||
)
|
||||
)
|
||||
|
||||
assert _already_streamed is True
|
||||
|
||||
def test_queued_path_sends_when_not_streamed(self):
|
||||
"""Nothing was streamed — first response should be sent before
|
||||
processing the queued message."""
|
||||
_sc = self._make_mock_sc(already_sent=False)
|
||||
|
||||
_already_streamed = bool(
|
||||
_sc
|
||||
and (
|
||||
getattr(_sc, "final_response_sent", False)
|
||||
or getattr(_sc, "already_sent", False)
|
||||
)
|
||||
)
|
||||
|
||||
assert _already_streamed is False
|
||||
|
||||
def test_queued_path_with_no_stream_consumer(self):
|
||||
"""No stream consumer at all (streaming disabled) — not streamed."""
|
||||
_sc = None
|
||||
|
||||
_already_streamed = bool(
|
||||
_sc
|
||||
and (
|
||||
getattr(_sc, "final_response_sent", False)
|
||||
or getattr(_sc, "already_sent", False)
|
||||
)
|
||||
)
|
||||
|
||||
assert _already_streamed is False
|
||||
Loading…
Add table
Add a link
Reference in a new issue