mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-25 00:51:20 +00:00
fix(gateway): retry transient send failures and notify user on exhaustion (#3288)
When send() fails due to a network error (ConnectError, ReadTimeout, etc.), the failure was silently logged and the user received no feedback — appearing as a hang. In one reported case, a user waited 1+ hour for a response that had already been generated but failed to deliver (#2910). Adds _send_with_retry() to BasePlatformAdapter: - Transient errors: retry up to 2x with exponential backoff + jitter - On exhaustion: send delivery-failure notice so user knows to retry - Permanent errors: fall back to plain-text version (preserves existing behavior) - SendResult.retryable flag for platform-specific transient errors All adapters benefit automatically via BasePlatformAdapter inheritance. Cherry-picked from PR #3108 by Mibayy. Co-authored-by: Mibayy <mibayy@users.noreply.github.com>
This commit is contained in:
parent
716e616d28
commit
bde45f5a2a
2 changed files with 336 additions and 14 deletions
|
|
@ -8,6 +8,7 @@ and implement the required methods.
|
|||
import asyncio
|
||||
import logging
|
||||
import os
|
||||
import random
|
||||
import re
|
||||
import uuid
|
||||
from abc import ABC, abstractmethod
|
||||
|
|
@ -329,6 +330,24 @@ class SendResult:
|
|||
message_id: Optional[str] = None
|
||||
error: Optional[str] = None
|
||||
raw_response: Any = None
|
||||
retryable: bool = False # True for transient errors (network, timeout) — base will retry automatically
|
||||
|
||||
|
||||
# Error substrings that indicate a transient network failure worth retrying
|
||||
_RETRYABLE_ERROR_PATTERNS = (
|
||||
"connecterror",
|
||||
"connectionerror",
|
||||
"connectionreset",
|
||||
"connectionrefused",
|
||||
"timeout",
|
||||
"timed out",
|
||||
"network",
|
||||
"broken pipe",
|
||||
"remotedisconnected",
|
||||
"eoferror",
|
||||
"readtimeout",
|
||||
"writetimeout",
|
||||
)
|
||||
|
||||
|
||||
# Type for message handlers
|
||||
|
|
@ -833,6 +852,91 @@ class BasePlatformAdapter(ABC):
|
|||
except Exception:
|
||||
pass
|
||||
|
||||
@staticmethod
|
||||
def _is_retryable_error(error: Optional[str]) -> bool:
|
||||
"""Return True if the error string looks like a transient network failure."""
|
||||
if not error:
|
||||
return False
|
||||
lowered = error.lower()
|
||||
return any(pat in lowered for pat in _RETRYABLE_ERROR_PATTERNS)
|
||||
|
||||
async def _send_with_retry(
|
||||
self,
|
||||
chat_id: str,
|
||||
content: str,
|
||||
reply_to: Optional[str] = None,
|
||||
metadata: Any = None,
|
||||
max_retries: int = 2,
|
||||
base_delay: float = 2.0,
|
||||
) -> "SendResult":
|
||||
"""
|
||||
Send a message with automatic retry for transient network errors.
|
||||
|
||||
On permanent failures (e.g. formatting / permission errors) falls back
|
||||
to a plain-text version before giving up. If all attempts fail due to
|
||||
network errors, sends the user a brief delivery-failure notice so they
|
||||
know to retry rather than waiting indefinitely.
|
||||
"""
|
||||
|
||||
result = await self.send(
|
||||
chat_id=chat_id,
|
||||
content=content,
|
||||
reply_to=reply_to,
|
||||
metadata=metadata,
|
||||
)
|
||||
|
||||
if result.success:
|
||||
return result
|
||||
|
||||
error_str = result.error or ""
|
||||
is_network = result.retryable or self._is_retryable_error(error_str)
|
||||
|
||||
if is_network:
|
||||
# Retry with exponential backoff for transient errors
|
||||
for attempt in range(1, max_retries + 1):
|
||||
delay = base_delay * (2 ** (attempt - 1)) + random.uniform(0, 1)
|
||||
logger.warning(
|
||||
"[%s] Send failed (attempt %d/%d, retrying in %.1fs): %s",
|
||||
self.name, attempt, max_retries, delay, error_str,
|
||||
)
|
||||
await asyncio.sleep(delay)
|
||||
result = await self.send(
|
||||
chat_id=chat_id,
|
||||
content=content,
|
||||
reply_to=reply_to,
|
||||
metadata=metadata,
|
||||
)
|
||||
if result.success:
|
||||
logger.info("[%s] Send succeeded on retry %d", self.name, attempt)
|
||||
return result
|
||||
error_str = result.error or ""
|
||||
if not (result.retryable or self._is_retryable_error(error_str)):
|
||||
break # error switched to non-transient — fall through to plain-text fallback
|
||||
else:
|
||||
# All retries exhausted (loop completed without break) — notify user
|
||||
logger.error("[%s] Failed to deliver response after %d retries: %s", self.name, max_retries, error_str)
|
||||
notice = (
|
||||
"\u26a0\ufe0f Message delivery failed after multiple attempts. "
|
||||
"Please try again \u2014 your request was processed but the response could not be sent."
|
||||
)
|
||||
try:
|
||||
await self.send(chat_id=chat_id, content=notice, reply_to=reply_to, metadata=metadata)
|
||||
except Exception as notify_err:
|
||||
logger.debug("[%s] Could not send delivery-failure notice: %s", self.name, notify_err)
|
||||
return result
|
||||
|
||||
# Non-network / post-retry formatting failure: try plain text as fallback
|
||||
logger.warning("[%s] Send failed: %s — trying plain-text fallback", self.name, error_str)
|
||||
fallback_result = await self.send(
|
||||
chat_id=chat_id,
|
||||
content=f"(Response formatting failed, plain text:)\n\n{content[:3500]}",
|
||||
reply_to=reply_to,
|
||||
metadata=metadata,
|
||||
)
|
||||
if not fallback_result.success:
|
||||
logger.error("[%s] Fallback send also failed: %s", self.name, fallback_result.error)
|
||||
return fallback_result
|
||||
|
||||
async def handle_message(self, event: MessageEvent) -> None:
|
||||
"""
|
||||
Process an incoming message.
|
||||
|
|
@ -982,26 +1086,13 @@ class BasePlatformAdapter(ABC):
|
|||
# Send the text portion
|
||||
if text_content:
|
||||
logger.info("[%s] Sending response (%d chars) to %s", self.name, len(text_content), event.source.chat_id)
|
||||
result = await self.send(
|
||||
result = await self._send_with_retry(
|
||||
chat_id=event.source.chat_id,
|
||||
content=text_content,
|
||||
reply_to=event.message_id,
|
||||
metadata=_thread_metadata,
|
||||
)
|
||||
|
||||
# Log send failures (don't raise - user already saw tool progress)
|
||||
if not result.success:
|
||||
print(f"[{self.name}] Failed to send response: {result.error}")
|
||||
# Try sending without markdown as fallback
|
||||
fallback_result = await self.send(
|
||||
chat_id=event.source.chat_id,
|
||||
content=f"(Response formatting failed, plain text:)\n\n{text_content[:3500]}",
|
||||
reply_to=event.message_id,
|
||||
metadata=_thread_metadata,
|
||||
)
|
||||
if not fallback_result.success:
|
||||
print(f"[{self.name}] Fallback send also failed: {fallback_result.error}")
|
||||
|
||||
# Human-like pacing delay between text and media
|
||||
human_delay = self._get_human_delay()
|
||||
|
||||
|
|
|
|||
231
tests/gateway/test_send_retry.py
Normal file
231
tests/gateway/test_send_retry.py
Normal file
|
|
@ -0,0 +1,231 @@
|
|||
"""
|
||||
Tests for BasePlatformAdapter._send_with_retry and _is_retryable_error.
|
||||
|
||||
Verifies that:
|
||||
- Transient network errors trigger retry with backoff
|
||||
- Permanent errors fall back to plain-text immediately (no retry)
|
||||
- User receives a delivery-failure notice when all retries are exhausted
|
||||
- Successful sends on retry return success
|
||||
- SendResult.retryable flag is respected
|
||||
"""
|
||||
import pytest
|
||||
from unittest.mock import AsyncMock, patch
|
||||
|
||||
from gateway.platforms.base import BasePlatformAdapter, SendResult, _RETRYABLE_ERROR_PATTERNS
|
||||
from gateway.platforms.base import Platform, PlatformConfig
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Minimal concrete adapter for testing (no real network)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class _StubAdapter(BasePlatformAdapter):
|
||||
def __init__(self):
|
||||
cfg = PlatformConfig()
|
||||
super().__init__(cfg, Platform.TELEGRAM)
|
||||
self._send_results = [] # queue of SendResult to return per call
|
||||
self._send_calls = [] # record of (chat_id, content) sent
|
||||
|
||||
def _next_result(self) -> SendResult:
|
||||
if self._send_results:
|
||||
return self._send_results.pop(0)
|
||||
return SendResult(success=True, message_id="ok")
|
||||
|
||||
async def send(self, chat_id, content, reply_to=None, metadata=None, **kwargs) -> SendResult:
|
||||
self._send_calls.append((chat_id, content))
|
||||
return self._next_result()
|
||||
|
||||
async def connect(self) -> bool:
|
||||
return True
|
||||
|
||||
async def disconnect(self) -> None:
|
||||
pass
|
||||
|
||||
async def send_typing(self, chat_id, metadata=None) -> None:
|
||||
pass
|
||||
|
||||
async def get_chat_info(self, chat_id):
|
||||
return {"name": "test", "type": "direct", "chat_id": chat_id}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _is_retryable_error
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestIsRetryableError:
|
||||
def test_none_is_not_retryable(self):
|
||||
assert not _StubAdapter._is_retryable_error(None)
|
||||
|
||||
def test_empty_string_is_not_retryable(self):
|
||||
assert not _StubAdapter._is_retryable_error("")
|
||||
|
||||
@pytest.mark.parametrize("pattern", _RETRYABLE_ERROR_PATTERNS)
|
||||
def test_known_pattern_is_retryable(self, pattern):
|
||||
assert _StubAdapter._is_retryable_error(f"httpx.{pattern.title()}: connection dropped")
|
||||
|
||||
def test_permission_error_not_retryable(self):
|
||||
assert not _StubAdapter._is_retryable_error("Forbidden: bot was blocked by the user")
|
||||
|
||||
def test_bad_request_not_retryable(self):
|
||||
assert not _StubAdapter._is_retryable_error("Bad Request: can't parse entities")
|
||||
|
||||
def test_case_insensitive(self):
|
||||
assert _StubAdapter._is_retryable_error("CONNECTERROR: host unreachable")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _send_with_retry — success on first attempt
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestSendWithRetrySuccess:
|
||||
@pytest.mark.asyncio
|
||||
async def test_success_first_attempt(self):
|
||||
adapter = _StubAdapter()
|
||||
adapter._send_results = [SendResult(success=True, message_id="123")]
|
||||
result = await adapter._send_with_retry("chat1", "hello")
|
||||
assert result.success
|
||||
assert len(adapter._send_calls) == 1
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_returns_message_id(self):
|
||||
adapter = _StubAdapter()
|
||||
adapter._send_results = [SendResult(success=True, message_id="abc")]
|
||||
result = await adapter._send_with_retry("chat1", "hi")
|
||||
assert result.message_id == "abc"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _send_with_retry — network error with successful retry
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestSendWithRetryNetworkRetry:
|
||||
@pytest.mark.asyncio
|
||||
async def test_retries_on_connect_error_and_succeeds(self):
|
||||
adapter = _StubAdapter()
|
||||
adapter._send_results = [
|
||||
SendResult(success=False, error="httpx.ConnectError: connection refused"),
|
||||
SendResult(success=True, message_id="ok"),
|
||||
]
|
||||
with patch("asyncio.sleep", new_callable=AsyncMock):
|
||||
result = await adapter._send_with_retry("chat1", "hello", max_retries=2, base_delay=0)
|
||||
assert result.success
|
||||
assert len(adapter._send_calls) == 2 # initial + 1 retry
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_retries_on_timeout_and_succeeds(self):
|
||||
adapter = _StubAdapter()
|
||||
adapter._send_results = [
|
||||
SendResult(success=False, error="ReadTimeout: request timed out"),
|
||||
SendResult(success=False, error="ReadTimeout: request timed out"),
|
||||
SendResult(success=True, message_id="ok"),
|
||||
]
|
||||
with patch("asyncio.sleep", new_callable=AsyncMock):
|
||||
result = await adapter._send_with_retry("chat1", "hello", max_retries=3, base_delay=0)
|
||||
assert result.success
|
||||
assert len(adapter._send_calls) == 3
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_retryable_flag_respected(self):
|
||||
"""SendResult.retryable=True should trigger retry even if error string doesn't match."""
|
||||
adapter = _StubAdapter()
|
||||
adapter._send_results = [
|
||||
SendResult(success=False, error="internal platform error", retryable=True),
|
||||
SendResult(success=True, message_id="ok"),
|
||||
]
|
||||
with patch("asyncio.sleep", new_callable=AsyncMock):
|
||||
result = await adapter._send_with_retry("chat1", "hello", max_retries=2, base_delay=0)
|
||||
assert result.success
|
||||
assert len(adapter._send_calls) == 2
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_network_to_nonnetwork_transition_falls_back_to_plaintext(self):
|
||||
"""If error switches from network to formatting mid-retry, fall through to plain-text fallback."""
|
||||
adapter = _StubAdapter()
|
||||
adapter._send_results = [
|
||||
SendResult(success=False, error="httpx.ConnectError: host unreachable"),
|
||||
SendResult(success=False, error="Bad Request: can't parse entities"),
|
||||
SendResult(success=True, message_id="fallback_ok"), # plain-text fallback
|
||||
]
|
||||
with patch("asyncio.sleep", new_callable=AsyncMock):
|
||||
result = await adapter._send_with_retry("chat1", "**bold**", max_retries=2, base_delay=0)
|
||||
assert result.success
|
||||
# 3 calls: initial (network) + 1 retry (non-network, breaks loop) + plain-text fallback
|
||||
assert len(adapter._send_calls) == 3
|
||||
assert "plain text" in adapter._send_calls[-1][1].lower()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _send_with_retry — all retries exhausted → user notification
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestSendWithRetryExhausted:
|
||||
@pytest.mark.asyncio
|
||||
async def test_sends_user_notice_after_exhaustion(self):
|
||||
adapter = _StubAdapter()
|
||||
network_err = SendResult(success=False, error="httpx.ConnectError: host unreachable")
|
||||
# initial + 2 retries + notice attempt
|
||||
adapter._send_results = [network_err, network_err, network_err, SendResult(success=True)]
|
||||
with patch("asyncio.sleep", new_callable=AsyncMock):
|
||||
result = await adapter._send_with_retry("chat1", "hello", max_retries=2, base_delay=0)
|
||||
# Result is the last failed one (before notice)
|
||||
assert not result.success
|
||||
# 4 total calls: 1 initial + 2 retries + 1 notice
|
||||
assert len(adapter._send_calls) == 4
|
||||
# The notice content should mention delivery failure
|
||||
notice_content = adapter._send_calls[-1][1]
|
||||
assert "delivery failed" in notice_content.lower() or "Message delivery failed" in notice_content
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_notice_send_exception_doesnt_propagate(self):
|
||||
"""If the notice itself throws, _send_with_retry should not raise."""
|
||||
adapter = _StubAdapter()
|
||||
network_err = SendResult(success=False, error="ConnectError")
|
||||
adapter._send_results = [network_err, network_err, network_err]
|
||||
|
||||
original_send = adapter.send
|
||||
call_count = [0]
|
||||
|
||||
async def send_with_notice_failure(chat_id, content, **kwargs):
|
||||
call_count[0] += 1
|
||||
if call_count[0] > 3:
|
||||
raise RuntimeError("notice send also failed")
|
||||
return network_err
|
||||
|
||||
adapter.send = send_with_notice_failure
|
||||
with patch("asyncio.sleep", new_callable=AsyncMock):
|
||||
result = await adapter._send_with_retry("chat1", "hello", max_retries=2, base_delay=0)
|
||||
assert not result.success # still failed, but no exception raised
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _send_with_retry — non-network failure → plain-text fallback (no retry)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestSendWithRetryFallback:
|
||||
@pytest.mark.asyncio
|
||||
async def test_non_network_error_falls_back_immediately(self):
|
||||
adapter = _StubAdapter()
|
||||
adapter._send_results = [
|
||||
SendResult(success=False, error="Bad Request: can't parse entities"),
|
||||
SendResult(success=True, message_id="fallback_ok"),
|
||||
]
|
||||
with patch("asyncio.sleep", new_callable=AsyncMock) as mock_sleep:
|
||||
result = await adapter._send_with_retry("chat1", "**bold**", max_retries=2, base_delay=0)
|
||||
# No sleep — no retry loop for non-network errors
|
||||
mock_sleep.assert_not_called()
|
||||
assert result.success
|
||||
assert len(adapter._send_calls) == 2
|
||||
# Fallback content should be plain-text notice
|
||||
assert "plain text" in adapter._send_calls[1][1].lower()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_fallback_failure_logged_but_not_raised(self):
|
||||
adapter = _StubAdapter()
|
||||
adapter._send_results = [
|
||||
SendResult(success=False, error="Forbidden: bot blocked"),
|
||||
SendResult(success=False, error="Forbidden: bot blocked"),
|
||||
]
|
||||
with patch("asyncio.sleep", new_callable=AsyncMock):
|
||||
result = await adapter._send_with_retry("chat1", "hello", max_retries=2)
|
||||
assert not result.success
|
||||
assert len(adapter._send_calls) == 2 # original + fallback only
|
||||
Loading…
Add table
Add a link
Reference in a new issue