mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-25 00:51:20 +00:00
fix(streaming): prevent duplicate Telegram replies when stream task is cancelled (#9319)
When the 5-second stream_task timeout in gateway/run.py expires (due to slow Telegram API calls from rate limiting after several messages), the stream consumer is cancelled via asyncio.CancelledError. The CancelledError handler did a best-effort final edit but never set final_response_sent, so the gateway fell through to the normal send path and delivered the full response again as a reply — causing a duplicate. The fix: in the CancelledError handler, set final_response_sent = True when already_sent is True (i.e., the stream consumer had already delivered content to the user). This tells the gateway's already_sent check that the response was delivered, preventing the duplicate send. Adds two tests verifying the cancellation behavior: - Cancelled with already_sent=True → final_response_sent=True (no dup) - Cancelled with already_sent=False → final_response_sent=False (normal send path proceeds) Reported by community user hume on Discord.
This commit is contained in:
parent
d15efc9c1b
commit
0cc7f79016
2 changed files with 89 additions and 0 deletions
|
|
@ -280,6 +280,14 @@ class GatewayStreamConsumer:
|
|||
await self._send_or_edit(self._accumulated)
|
||||
except Exception:
|
||||
pass
|
||||
# If we delivered any content before being cancelled, mark the
|
||||
# final response as sent so the gateway's already_sent check
|
||||
# doesn't trigger a duplicate message. The 5-second
|
||||
# stream_task timeout (gateway/run.py) can cancel us while
|
||||
# waiting on a slow Telegram API call — without this flag the
|
||||
# gateway falls through to the normal send path.
|
||||
if self._already_sent:
|
||||
self._final_response_sent = True
|
||||
except Exception as e:
|
||||
logger.error("Stream consumer error: %s", e)
|
||||
|
||||
|
|
|
|||
|
|
@ -599,3 +599,84 @@ class TestInterimCommentaryMessages:
|
|||
assert sent_texts == ["Hello ▉", "world"]
|
||||
assert consumer.already_sent is True
|
||||
assert consumer.final_response_sent is True
|
||||
|
||||
|
||||
class TestCancelledConsumerSetsFlags:
|
||||
"""Cancellation must set final_response_sent when already_sent is True.
|
||||
|
||||
The 5-second stream_task timeout in gateway/run.py can cancel the
|
||||
consumer while it's still processing. If final_response_sent stays
|
||||
False, the gateway falls through to the normal send path and the
|
||||
user sees a duplicate message.
|
||||
"""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_cancelled_with_already_sent_marks_final_response_sent(self):
|
||||
"""Cancelling after content was sent should set final_response_sent."""
|
||||
adapter = MagicMock()
|
||||
adapter.send = AsyncMock(
|
||||
return_value=SimpleNamespace(success=True, message_id="msg_1")
|
||||
)
|
||||
adapter.edit_message = AsyncMock(
|
||||
return_value=SimpleNamespace(success=True)
|
||||
)
|
||||
adapter.MAX_MESSAGE_LENGTH = 4096
|
||||
|
||||
consumer = GatewayStreamConsumer(
|
||||
adapter,
|
||||
"chat_123",
|
||||
StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5),
|
||||
)
|
||||
|
||||
# Stream some text — the consumer sends it and sets already_sent
|
||||
consumer.on_delta("Hello world")
|
||||
task = asyncio.create_task(consumer.run())
|
||||
await asyncio.sleep(0.08)
|
||||
|
||||
assert consumer.already_sent is True
|
||||
|
||||
# Cancel the task (simulates the 5-second timeout in gateway)
|
||||
task.cancel()
|
||||
try:
|
||||
await task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
|
||||
# The fix: final_response_sent should be True even though _DONE
|
||||
# was never processed, preventing a duplicate message.
|
||||
assert consumer.final_response_sent is True
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_cancelled_without_any_sends_does_not_mark_final(self):
|
||||
"""Cancelling before anything was sent should NOT set final_response_sent."""
|
||||
adapter = MagicMock()
|
||||
adapter.send = AsyncMock(
|
||||
return_value=SimpleNamespace(success=False, message_id=None)
|
||||
)
|
||||
adapter.edit_message = AsyncMock(
|
||||
return_value=SimpleNamespace(success=True)
|
||||
)
|
||||
adapter.MAX_MESSAGE_LENGTH = 4096
|
||||
|
||||
consumer = GatewayStreamConsumer(
|
||||
adapter,
|
||||
"chat_123",
|
||||
StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5),
|
||||
)
|
||||
|
||||
# Send fails — already_sent stays False
|
||||
consumer.on_delta("x")
|
||||
task = asyncio.create_task(consumer.run())
|
||||
await asyncio.sleep(0.08)
|
||||
|
||||
assert consumer.already_sent is False
|
||||
|
||||
task.cancel()
|
||||
try:
|
||||
await task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
|
||||
# Without a successful send, final_response_sent should stay False
|
||||
# so the normal gateway send path can deliver the response.
|
||||
assert consumer.final_response_sent is False
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue