mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-22 10:32:00 +00:00
Fix silent delivery failures in Signal live adapter (#49260)
This commit is contained in:
parent
5f55f0ff85
commit
5649b8649a
3 changed files with 161 additions and 9 deletions
|
|
@ -796,7 +796,16 @@ class SignalAdapter(BasePlatformAdapter):
|
|||
logger.debug("Signal RPC error (%s): %s", method, err)
|
||||
return None
|
||||
|
||||
return data.get("result")
|
||||
result = data.get("result")
|
||||
if isinstance(result, dict) and raise_on_rate_limit:
|
||||
results = result.get("results")
|
||||
if isinstance(results, list):
|
||||
for r in results:
|
||||
if isinstance(r, dict) and r.get("type") == "RATE_LIMIT_FAILURE":
|
||||
retry_after = r.get("retryAfterSeconds")
|
||||
raise SignalRateLimitError("Rate limit exceeded for recipient", retry_after=retry_after)
|
||||
|
||||
return result
|
||||
|
||||
except SignalRateLimitError:
|
||||
raise
|
||||
|
|
@ -960,6 +969,29 @@ class SignalAdapter(BasePlatformAdapter):
|
|||
# Our send() override bypasses this entirely.
|
||||
return content
|
||||
|
||||
def _validate_send_result(self, result: Any) -> tuple[bool, Optional[str]]:
|
||||
"""Validate signal-cli send response results.
|
||||
|
||||
Returns (success, error_message).
|
||||
"""
|
||||
if not result or not isinstance(result, dict):
|
||||
return True, None
|
||||
|
||||
results = result.get("results")
|
||||
if isinstance(results, list):
|
||||
for r in results:
|
||||
if not isinstance(r, dict):
|
||||
continue
|
||||
rtype = r.get("type")
|
||||
if rtype and rtype != "SUCCESS":
|
||||
return False, str(rtype)
|
||||
if "success" in r and not r.get("success"):
|
||||
fail = r.get("failure")
|
||||
if fail:
|
||||
return False, str(fail)
|
||||
return False, "Recipient delivery failed"
|
||||
return True, None
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Sending
|
||||
# ------------------------------------------------------------------
|
||||
|
|
@ -995,6 +1027,9 @@ class SignalAdapter(BasePlatformAdapter):
|
|||
result = await self._rpc("send", params)
|
||||
|
||||
if result is not None:
|
||||
success, err_msg = self._validate_send_result(result)
|
||||
if not success:
|
||||
return SendResult(success=False, error=err_msg, raw_response=result)
|
||||
self._track_sent_timestamp(result)
|
||||
# Signal has no editable message identifier. Returning None keeps the
|
||||
# stream consumer on the non-edit fallback path instead of pretending
|
||||
|
|
@ -1171,14 +1206,33 @@ class SignalAdapter(BasePlatformAdapter):
|
|||
)
|
||||
_rpc_duration = time.monotonic() - _rpc_t0
|
||||
if result is not None:
|
||||
self._track_sent_timestamp(result)
|
||||
await scheduler.report_rpc_duration(_rpc_duration, n)
|
||||
logger.info(
|
||||
"Signal batch %d/%d: %d attachments sent in %.1fs "
|
||||
"(attempt %d/%d)",
|
||||
idx + 1, len(att_batches), n, _rpc_duration,
|
||||
attempt, SIGNAL_RATE_LIMIT_MAX_ATTEMPTS,
|
||||
)
|
||||
success, err_msg = self._validate_send_result(result)
|
||||
if success:
|
||||
self._track_sent_timestamp(result)
|
||||
await scheduler.report_rpc_duration(_rpc_duration, n)
|
||||
logger.info(
|
||||
"Signal batch %d/%d: %d attachments sent in %.1fs "
|
||||
"(attempt %d/%d)",
|
||||
idx + 1, len(att_batches), n, _rpc_duration,
|
||||
attempt, SIGNAL_RATE_LIMIT_MAX_ATTEMPTS,
|
||||
)
|
||||
else:
|
||||
logger.error(
|
||||
"Signal: RPC send failed for batch %d/%d (%d attachments, "
|
||||
"attempt %d/%d, rpc_duration=%.1fs): %s",
|
||||
idx + 1, len(att_batches), n,
|
||||
attempt, SIGNAL_RATE_LIMIT_MAX_ATTEMPTS,
|
||||
_rpc_duration, err_msg,
|
||||
)
|
||||
# Retry transient (non-rate-limit) failures once
|
||||
if attempt < SIGNAL_RATE_LIMIT_MAX_ATTEMPTS:
|
||||
backoff = 2.0 ** attempt
|
||||
logger.info(
|
||||
"Signal: retrying batch %d/%d after %.1fs backoff",
|
||||
idx + 1, len(att_batches), backoff,
|
||||
)
|
||||
await asyncio.sleep(backoff)
|
||||
continue
|
||||
else:
|
||||
# Assume the server didn't accept the batch, don't deduce tokens
|
||||
logger.error(
|
||||
|
|
@ -1277,6 +1331,9 @@ class SignalAdapter(BasePlatformAdapter):
|
|||
|
||||
result = await self._rpc("send", params)
|
||||
if result is not None:
|
||||
success, err_msg = self._validate_send_result(result)
|
||||
if not success:
|
||||
return SendResult(success=False, error=err_msg, raw_response=result)
|
||||
self._track_sent_timestamp(result)
|
||||
return SendResult(success=True)
|
||||
return SendResult(success=False, error="RPC send with attachment failed")
|
||||
|
|
@ -1316,6 +1373,9 @@ class SignalAdapter(BasePlatformAdapter):
|
|||
|
||||
result = await self._rpc("send", params)
|
||||
if result is not None:
|
||||
success, err_msg = self._validate_send_result(result)
|
||||
if not success:
|
||||
return SendResult(success=False, error=err_msg, raw_response=result)
|
||||
self._track_sent_timestamp(result)
|
||||
return SendResult(success=True)
|
||||
return SendResult(success=False, error=f"RPC send {media_label.lower()} failed")
|
||||
|
|
|
|||
|
|
@ -48,6 +48,7 @@ AUTHOR_MAP = {
|
|||
"charles@salesondemand.io": "salesondemandio",
|
||||
"victor@rocketfueldev.com": "victor-kyriazakos",
|
||||
"87440198+JoaoMarcos44@users.noreply.github.com": "JoaoMarcos44",
|
||||
"joaomarcosdias444@gmail.com": "JoaoMarcos44",
|
||||
"286497132+srojk34@users.noreply.github.com": "srojk34",
|
||||
"59806492+sitkarev@users.noreply.github.com": "sitkarev",
|
||||
"zheng@omegasys.eu": "omegazheng",
|
||||
|
|
|
|||
|
|
@ -1009,6 +1009,97 @@ class TestSignalSendReturnsMessageId:
|
|||
assert result.message_id is None
|
||||
|
||||
|
||||
class TestSignalSendResultValidation:
|
||||
"""Verify that send() validates recipient-level delivery results."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_send_success_when_results_has_success(self, monkeypatch):
|
||||
adapter = _make_signal_adapter(monkeypatch)
|
||||
mock_rpc, _ = _stub_rpc({
|
||||
"timestamp": 1712345678000,
|
||||
"results": [
|
||||
{
|
||||
"recipientAddress": {"number": "+155****4567"},
|
||||
"type": "SUCCESS"
|
||||
}
|
||||
]
|
||||
})
|
||||
adapter._rpc = mock_rpc
|
||||
adapter._stop_typing_indicator = AsyncMock()
|
||||
|
||||
result = await adapter.send(chat_id="+155****4567", content="hello")
|
||||
assert result.success is True
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_send_failure_when_results_has_failure_type(self, monkeypatch):
|
||||
adapter = _make_signal_adapter(monkeypatch)
|
||||
mock_rpc, _ = _stub_rpc({
|
||||
"timestamp": 1712345678000,
|
||||
"results": [
|
||||
{
|
||||
"recipientAddress": {"number": "+155****4567"},
|
||||
"type": "UNREGISTERED_FAILURE"
|
||||
}
|
||||
]
|
||||
})
|
||||
adapter._rpc = mock_rpc
|
||||
adapter._stop_typing_indicator = AsyncMock()
|
||||
|
||||
result = await adapter.send(chat_id="+155****4567", content="hello")
|
||||
assert result.success is False
|
||||
assert result.error == "UNREGISTERED_FAILURE"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_send_failure_when_results_has_success_false(self, monkeypatch):
|
||||
adapter = _make_signal_adapter(monkeypatch)
|
||||
mock_rpc, _ = _stub_rpc({
|
||||
"timestamp": 1712345678000,
|
||||
"results": [
|
||||
{
|
||||
"recipientAddress": {"number": "+155****4567"},
|
||||
"success": False,
|
||||
"failure": "Some connection error"
|
||||
}
|
||||
]
|
||||
})
|
||||
adapter._rpc = mock_rpc
|
||||
adapter._stop_typing_indicator = AsyncMock()
|
||||
|
||||
result = await adapter.send(chat_id="+155****4567", content="hello")
|
||||
assert result.success is False
|
||||
assert result.error == "Some connection error"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_rpc_raises_rate_limit_on_results_failure(self, monkeypatch):
|
||||
adapter = _make_signal_adapter(monkeypatch)
|
||||
mock_client = AsyncMock()
|
||||
mock_response = MagicMock()
|
||||
mock_response.status_code = 200
|
||||
mock_response.json.return_value = {
|
||||
"jsonrpc": "2.0",
|
||||
"result": {
|
||||
"timestamp": 1712345678000,
|
||||
"results": [
|
||||
{
|
||||
"recipientAddress": {"number": "+155****4567"},
|
||||
"type": "RATE_LIMIT_FAILURE",
|
||||
"retryAfterSeconds": 15
|
||||
}
|
||||
]
|
||||
},
|
||||
"id": "1"
|
||||
}
|
||||
mock_client.post = AsyncMock(return_value=mock_response)
|
||||
adapter.client = mock_client
|
||||
|
||||
from gateway.platforms.signal_rate_limit import SignalRateLimitError
|
||||
with pytest.raises(SignalRateLimitError) as exc_info:
|
||||
await adapter._rpc("send", {"recipient": ["+155****4567"]}, raise_on_rate_limit=True)
|
||||
|
||||
assert "Rate limit exceeded for recipient" in str(exc_info.value)
|
||||
assert exc_info.value.retry_after == 15
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# stop_typing() delegates to _stop_typing_indicator (#4647)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue