mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-25 00:51:20 +00:00
fix(telegram): prevent duplicate message delivery on send timeout (#5153)
TimedOut is a subclass of NetworkError in python-telegram-bot. The inner retry loop in send() and the outer _send_with_retry() in base.py both treated it as a transient connection error and retried — but send_message is not idempotent. When the request reaches Telegram but the HTTP response times out, the message is already delivered. Retrying sends duplicates. Worst case: up to 9 copies (inner 3x × outer 3x). Inner loop (telegram.py): - Import TimedOut separately, isinstance-check before generic NetworkError retry (same pattern as BadRequest carve-out from #3390) - Re-raise immediately — no retry - Mark as retryable=False in outer exception handler Outer loop (base.py): - Remove 'timeout', 'timed out', 'readtimeout', 'writetimeout' from _RETRYABLE_ERROR_PATTERNS (read/write timeouts are delivery-ambiguous) - Add 'connecttimeout' (safe — connection never established) - Keep 'network' (other platforms still need it) - Add _is_timeout_error() + early return to prevent plain-text fallback on timeout errors (would also cause duplicate delivery) Connection errors (ConnectionReset, ConnectError, etc.) are still retried — these fail before the request reaches the server. Credit: tmdgusya (PR #3899), barun1997 (PR #3904) for identifying the bug and proposing fixes. Closes #3899, closes #3904.
This commit is contained in:
parent
c8220e69a1
commit
85cefc7a5a
4 changed files with 132 additions and 11 deletions
|
|
@ -377,23 +377,26 @@ class SendResult:
|
||||||
message_id: Optional[str] = None
|
message_id: Optional[str] = None
|
||||||
error: Optional[str] = None
|
error: Optional[str] = None
|
||||||
raw_response: Any = None
|
raw_response: Any = None
|
||||||
retryable: bool = False # True for transient errors (network, timeout) — base will retry automatically
|
retryable: bool = False # True for transient connection errors — base will retry automatically
|
||||||
|
|
||||||
|
|
||||||
# Error substrings that indicate a transient network failure worth retrying
|
# Error substrings that indicate a transient *connection* failure worth retrying.
|
||||||
|
# "timeout" / "timed out" / "readtimeout" / "writetimeout" are intentionally
|
||||||
|
# excluded: a read/write timeout on a non-idempotent call (e.g. send_message)
|
||||||
|
# means the request may have reached the server — retrying risks duplicate
|
||||||
|
# delivery. "connecttimeout" is safe because the connection was never
|
||||||
|
# established. Platforms that know a timeout is safe to retry should set
|
||||||
|
# SendResult.retryable = True explicitly.
|
||||||
_RETRYABLE_ERROR_PATTERNS = (
|
_RETRYABLE_ERROR_PATTERNS = (
|
||||||
"connecterror",
|
"connecterror",
|
||||||
"connectionerror",
|
"connectionerror",
|
||||||
"connectionreset",
|
"connectionreset",
|
||||||
"connectionrefused",
|
"connectionrefused",
|
||||||
"timeout",
|
"connecttimeout",
|
||||||
"timed out",
|
|
||||||
"network",
|
"network",
|
||||||
"broken pipe",
|
"broken pipe",
|
||||||
"remotedisconnected",
|
"remotedisconnected",
|
||||||
"eoferror",
|
"eoferror",
|
||||||
"readtimeout",
|
|
||||||
"writetimeout",
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -927,6 +930,18 @@ class BasePlatformAdapter(ABC):
|
||||||
lowered = error.lower()
|
lowered = error.lower()
|
||||||
return any(pat in lowered for pat in _RETRYABLE_ERROR_PATTERNS)
|
return any(pat in lowered for pat in _RETRYABLE_ERROR_PATTERNS)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _is_timeout_error(error: Optional[str]) -> bool:
|
||||||
|
"""Return True if the error string indicates a read/write timeout.
|
||||||
|
|
||||||
|
Timeout errors are NOT retryable and should NOT trigger plain-text
|
||||||
|
fallback — the request may have already been delivered.
|
||||||
|
"""
|
||||||
|
if not error:
|
||||||
|
return False
|
||||||
|
lowered = error.lower()
|
||||||
|
return "timed out" in lowered or "readtimeout" in lowered or "writetimeout" in lowered
|
||||||
|
|
||||||
async def _send_with_retry(
|
async def _send_with_retry(
|
||||||
self,
|
self,
|
||||||
chat_id: str,
|
chat_id: str,
|
||||||
|
|
@ -958,6 +973,11 @@ class BasePlatformAdapter(ABC):
|
||||||
error_str = result.error or ""
|
error_str = result.error or ""
|
||||||
is_network = result.retryable or self._is_retryable_error(error_str)
|
is_network = result.retryable or self._is_retryable_error(error_str)
|
||||||
|
|
||||||
|
# Timeout errors are not safe to retry (message may have been
|
||||||
|
# delivered) and not formatting errors — return the failure as-is.
|
||||||
|
if not is_network and self._is_timeout_error(error_str):
|
||||||
|
return result
|
||||||
|
|
||||||
if is_network:
|
if is_network:
|
||||||
# Retry with exponential backoff for transient errors
|
# Retry with exponential backoff for transient errors
|
||||||
for attempt in range(1, max_retries + 1):
|
for attempt in range(1, max_retries + 1):
|
||||||
|
|
|
||||||
|
|
@ -772,6 +772,11 @@ class TelegramAdapter(BasePlatformAdapter):
|
||||||
except ImportError:
|
except ImportError:
|
||||||
_BadReq = None # type: ignore[assignment,misc]
|
_BadReq = None # type: ignore[assignment,misc]
|
||||||
|
|
||||||
|
try:
|
||||||
|
from telegram.error import TimedOut as _TimedOut
|
||||||
|
except (ImportError, AttributeError):
|
||||||
|
_TimedOut = None # type: ignore[assignment,misc]
|
||||||
|
|
||||||
for i, chunk in enumerate(chunks):
|
for i, chunk in enumerate(chunks):
|
||||||
should_thread = self._should_thread_reply(reply_to, i)
|
should_thread = self._should_thread_reply(reply_to, i)
|
||||||
reply_to_id = int(reply_to) if should_thread else None
|
reply_to_id = int(reply_to) if should_thread else None
|
||||||
|
|
@ -833,6 +838,11 @@ class TelegramAdapter(BasePlatformAdapter):
|
||||||
continue
|
continue
|
||||||
# Other BadRequest errors are permanent — don't retry
|
# Other BadRequest errors are permanent — don't retry
|
||||||
raise
|
raise
|
||||||
|
# TimedOut is also a subclass of NetworkError but
|
||||||
|
# indicates the request may have reached the server —
|
||||||
|
# retrying risks duplicate message delivery.
|
||||||
|
if _TimedOut and isinstance(send_err, _TimedOut):
|
||||||
|
raise
|
||||||
if _send_attempt < 2:
|
if _send_attempt < 2:
|
||||||
wait = 2 ** _send_attempt
|
wait = 2 ** _send_attempt
|
||||||
logger.warning("[%s] Network error on send (attempt %d/3), retrying in %ds: %s",
|
logger.warning("[%s] Network error on send (attempt %d/3), retrying in %ds: %s",
|
||||||
|
|
@ -850,7 +860,12 @@ class TelegramAdapter(BasePlatformAdapter):
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error("[%s] Failed to send Telegram message: %s", self.name, e, exc_info=True)
|
logger.error("[%s] Failed to send Telegram message: %s", self.name, e, exc_info=True)
|
||||||
return SendResult(success=False, error=str(e))
|
# TimedOut means the request may have reached Telegram —
|
||||||
|
# mark as non-retryable so _send_with_retry() doesn't re-send.
|
||||||
|
_to = locals().get("_TimedOut")
|
||||||
|
err_str = str(e).lower()
|
||||||
|
is_timeout = (_to and isinstance(e, _to)) or "timed out" in err_str
|
||||||
|
return SendResult(success=False, error=str(e), retryable=not is_timeout)
|
||||||
|
|
||||||
async def edit_message(
|
async def edit_message(
|
||||||
self,
|
self,
|
||||||
|
|
|
||||||
|
|
@ -72,6 +72,43 @@ class TestIsRetryableError:
|
||||||
def test_case_insensitive(self):
|
def test_case_insensitive(self):
|
||||||
assert _StubAdapter._is_retryable_error("CONNECTERROR: host unreachable")
|
assert _StubAdapter._is_retryable_error("CONNECTERROR: host unreachable")
|
||||||
|
|
||||||
|
def test_timeout_not_retryable(self):
|
||||||
|
assert not _StubAdapter._is_retryable_error("ReadTimeout: request timed out")
|
||||||
|
|
||||||
|
def test_timed_out_not_retryable(self):
|
||||||
|
assert not _StubAdapter._is_retryable_error("Timed out waiting for response")
|
||||||
|
|
||||||
|
def test_connect_timeout_is_retryable(self):
|
||||||
|
assert _StubAdapter._is_retryable_error("ConnectTimeout: connection timed out")
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# _is_timeout_error
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
class TestIsTimeoutError:
|
||||||
|
def test_none_is_not_timeout(self):
|
||||||
|
assert not _StubAdapter._is_timeout_error(None)
|
||||||
|
|
||||||
|
def test_empty_is_not_timeout(self):
|
||||||
|
assert not _StubAdapter._is_timeout_error("")
|
||||||
|
|
||||||
|
def test_timed_out(self):
|
||||||
|
assert _StubAdapter._is_timeout_error("Timed out waiting for response")
|
||||||
|
|
||||||
|
def test_read_timeout(self):
|
||||||
|
assert _StubAdapter._is_timeout_error("ReadTimeout: request timed out")
|
||||||
|
|
||||||
|
def test_write_timeout(self):
|
||||||
|
assert _StubAdapter._is_timeout_error("WriteTimeout: send stalled")
|
||||||
|
|
||||||
|
def test_connect_timeout_not_flagged(self):
|
||||||
|
"""ConnectTimeout is a connection error, not a delivery-ambiguous timeout."""
|
||||||
|
assert not _StubAdapter._is_timeout_error("ConnectTimeout: host unreachable")
|
||||||
|
|
||||||
|
def test_connection_error_not_timeout(self):
|
||||||
|
assert not _StubAdapter._is_timeout_error("ConnectionError: host unreachable")
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
# _send_with_retry — success on first attempt
|
# _send_with_retry — success on first attempt
|
||||||
|
|
@ -112,17 +149,33 @@ class TestSendWithRetryNetworkRetry:
|
||||||
assert len(adapter._send_calls) == 2 # initial + 1 retry
|
assert len(adapter._send_calls) == 2 # initial + 1 retry
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_retries_on_timeout_and_succeeds(self):
|
async def test_timeout_not_retried_to_prevent_duplicates(self):
|
||||||
|
"""ReadTimeout is NOT retried because the request may have reached
|
||||||
|
the server — retrying a non-idempotent send risks duplicate delivery.
|
||||||
|
It also skips plain-text fallback (timeout is not a formatting issue)."""
|
||||||
adapter = _StubAdapter()
|
adapter = _StubAdapter()
|
||||||
adapter._send_results = [
|
adapter._send_results = [
|
||||||
SendResult(success=False, error="ReadTimeout: request timed out"),
|
SendResult(success=False, error="ReadTimeout: request timed out"),
|
||||||
SendResult(success=False, error="ReadTimeout: request timed out"),
|
]
|
||||||
|
with patch("asyncio.sleep", new_callable=AsyncMock) as mock_sleep:
|
||||||
|
result = await adapter._send_with_retry("chat1", "hello", max_retries=3, base_delay=0)
|
||||||
|
# No retry, no fallback — timeout returns failure immediately
|
||||||
|
mock_sleep.assert_not_called()
|
||||||
|
assert not result.success
|
||||||
|
assert len(adapter._send_calls) == 1
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_connect_timeout_still_retried(self):
|
||||||
|
"""ConnectTimeout is safe to retry — the connection was never established."""
|
||||||
|
adapter = _StubAdapter()
|
||||||
|
adapter._send_results = [
|
||||||
|
SendResult(success=False, error="ConnectTimeout: connection timed out"),
|
||||||
SendResult(success=True, message_id="ok"),
|
SendResult(success=True, message_id="ok"),
|
||||||
]
|
]
|
||||||
with patch("asyncio.sleep", new_callable=AsyncMock):
|
with patch("asyncio.sleep", new_callable=AsyncMock):
|
||||||
result = await adapter._send_with_retry("chat1", "hello", max_retries=3, base_delay=0)
|
result = await adapter._send_with_retry("chat1", "hello", max_retries=2, base_delay=0)
|
||||||
assert result.success
|
assert result.success
|
||||||
assert len(adapter._send_calls) == 3
|
assert len(adapter._send_calls) == 2
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_retryable_flag_respected(self):
|
async def test_retryable_flag_respected(self):
|
||||||
|
|
|
||||||
|
|
@ -33,11 +33,16 @@ class FakeBadRequest(FakeNetworkError):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class FakeTimedOut(FakeNetworkError):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
# Build a fake telegram module tree so the adapter's internal imports work
|
# Build a fake telegram module tree so the adapter's internal imports work
|
||||||
_fake_telegram = types.ModuleType("telegram")
|
_fake_telegram = types.ModuleType("telegram")
|
||||||
_fake_telegram_error = types.ModuleType("telegram.error")
|
_fake_telegram_error = types.ModuleType("telegram.error")
|
||||||
_fake_telegram_error.NetworkError = FakeNetworkError
|
_fake_telegram_error.NetworkError = FakeNetworkError
|
||||||
_fake_telegram_error.BadRequest = FakeBadRequest
|
_fake_telegram_error.BadRequest = FakeBadRequest
|
||||||
|
_fake_telegram_error.TimedOut = FakeTimedOut
|
||||||
_fake_telegram.error = _fake_telegram_error
|
_fake_telegram.error = _fake_telegram_error
|
||||||
_fake_telegram_constants = types.ModuleType("telegram.constants")
|
_fake_telegram_constants = types.ModuleType("telegram.constants")
|
||||||
_fake_telegram_constants.ParseMode = SimpleNamespace(MARKDOWN_V2="MarkdownV2")
|
_fake_telegram_constants.ParseMode = SimpleNamespace(MARKDOWN_V2="MarkdownV2")
|
||||||
|
|
@ -168,6 +173,34 @@ async def test_send_retries_network_errors_normally():
|
||||||
assert attempt[0] == 3 # Two retries then success
|
assert attempt[0] == 3 # Two retries then success
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_send_does_not_retry_timeout():
|
||||||
|
"""TimedOut (subclass of NetworkError) should NOT be retried in send().
|
||||||
|
|
||||||
|
The request may have already been delivered to the user — retrying
|
||||||
|
would send duplicate messages.
|
||||||
|
"""
|
||||||
|
adapter = _make_adapter()
|
||||||
|
|
||||||
|
attempt = [0]
|
||||||
|
|
||||||
|
async def mock_send_message(**kwargs):
|
||||||
|
attempt[0] += 1
|
||||||
|
raise FakeTimedOut("Timed out waiting for Telegram response")
|
||||||
|
|
||||||
|
adapter._bot = SimpleNamespace(send_message=mock_send_message)
|
||||||
|
|
||||||
|
result = await adapter.send(
|
||||||
|
chat_id="123",
|
||||||
|
content="test message",
|
||||||
|
)
|
||||||
|
|
||||||
|
assert result.success is False
|
||||||
|
assert "Timed out" in result.error
|
||||||
|
# CRITICAL: only 1 attempt — no retry for TimedOut
|
||||||
|
assert attempt[0] == 1
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_thread_fallback_only_fires_once():
|
async def test_thread_fallback_only_fires_once():
|
||||||
"""After clearing thread_id, subsequent chunks should also use None."""
|
"""After clearing thread_id, subsequent chunks should also use None."""
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue