diff --git a/gateway/platforms/signal.py b/gateway/platforms/signal.py index 99153034848..9a7b93ef936 100644 --- a/gateway/platforms/signal.py +++ b/gateway/platforms/signal.py @@ -796,7 +796,16 @@ class SignalAdapter(BasePlatformAdapter): logger.debug("Signal RPC error (%s): %s", method, err) return None - return data.get("result") + result = data.get("result") + if isinstance(result, dict) and raise_on_rate_limit: + results = result.get("results") + if isinstance(results, list): + for r in results: + if isinstance(r, dict) and r.get("type") == "RATE_LIMIT_FAILURE": + retry_after = r.get("retryAfterSeconds") + raise SignalRateLimitError("Rate limit exceeded for recipient", retry_after=retry_after) + + return result except SignalRateLimitError: raise @@ -960,6 +969,29 @@ class SignalAdapter(BasePlatformAdapter): # Our send() override bypasses this entirely. return content + def _validate_send_result(self, result: Any) -> tuple[bool, Optional[str]]: + """Validate signal-cli send response results. + + Returns (success, error_message). + """ + if not result or not isinstance(result, dict): + return True, None + + results = result.get("results") + if isinstance(results, list): + for r in results: + if not isinstance(r, dict): + continue + rtype = r.get("type") + if rtype and rtype != "SUCCESS": + return False, str(rtype) + if "success" in r and not r.get("success"): + fail = r.get("failure") + if fail: + return False, str(fail) + return False, "Recipient delivery failed" + return True, None + # ------------------------------------------------------------------ # Sending # ------------------------------------------------------------------ @@ -995,6 +1027,9 @@ class SignalAdapter(BasePlatformAdapter): result = await self._rpc("send", params) if result is not None: + success, err_msg = self._validate_send_result(result) + if not success: + return SendResult(success=False, error=err_msg, raw_response=result) self._track_sent_timestamp(result) # Signal has no editable message identifier. Returning None keeps the # stream consumer on the non-edit fallback path instead of pretending @@ -1171,14 +1206,33 @@ class SignalAdapter(BasePlatformAdapter): ) _rpc_duration = time.monotonic() - _rpc_t0 if result is not None: - self._track_sent_timestamp(result) - await scheduler.report_rpc_duration(_rpc_duration, n) - logger.info( - "Signal batch %d/%d: %d attachments sent in %.1fs " - "(attempt %d/%d)", - idx + 1, len(att_batches), n, _rpc_duration, - attempt, SIGNAL_RATE_LIMIT_MAX_ATTEMPTS, - ) + success, err_msg = self._validate_send_result(result) + if success: + self._track_sent_timestamp(result) + await scheduler.report_rpc_duration(_rpc_duration, n) + logger.info( + "Signal batch %d/%d: %d attachments sent in %.1fs " + "(attempt %d/%d)", + idx + 1, len(att_batches), n, _rpc_duration, + attempt, SIGNAL_RATE_LIMIT_MAX_ATTEMPTS, + ) + else: + logger.error( + "Signal: RPC send failed for batch %d/%d (%d attachments, " + "attempt %d/%d, rpc_duration=%.1fs): %s", + idx + 1, len(att_batches), n, + attempt, SIGNAL_RATE_LIMIT_MAX_ATTEMPTS, + _rpc_duration, err_msg, + ) + # Retry transient (non-rate-limit) failures once + if attempt < SIGNAL_RATE_LIMIT_MAX_ATTEMPTS: + backoff = 2.0 ** attempt + logger.info( + "Signal: retrying batch %d/%d after %.1fs backoff", + idx + 1, len(att_batches), backoff, + ) + await asyncio.sleep(backoff) + continue else: # Assume the server didn't accept the batch, don't deduce tokens logger.error( @@ -1277,6 +1331,9 @@ class SignalAdapter(BasePlatformAdapter): result = await self._rpc("send", params) if result is not None: + success, err_msg = self._validate_send_result(result) + if not success: + return SendResult(success=False, error=err_msg, raw_response=result) self._track_sent_timestamp(result) return SendResult(success=True) return SendResult(success=False, error="RPC send with attachment failed") @@ -1316,6 +1373,9 @@ class SignalAdapter(BasePlatformAdapter): result = await self._rpc("send", params) if result is not None: + success, err_msg = self._validate_send_result(result) + if not success: + return SendResult(success=False, error=err_msg, raw_response=result) self._track_sent_timestamp(result) return SendResult(success=True) return SendResult(success=False, error=f"RPC send {media_label.lower()} failed") diff --git a/scripts/release.py b/scripts/release.py index f047394416a..8811dab4b08 100755 --- a/scripts/release.py +++ b/scripts/release.py @@ -48,6 +48,7 @@ AUTHOR_MAP = { "charles@salesondemand.io": "salesondemandio", "victor@rocketfueldev.com": "victor-kyriazakos", "87440198+JoaoMarcos44@users.noreply.github.com": "JoaoMarcos44", + "joaomarcosdias444@gmail.com": "JoaoMarcos44", "286497132+srojk34@users.noreply.github.com": "srojk34", "59806492+sitkarev@users.noreply.github.com": "sitkarev", "zheng@omegasys.eu": "omegazheng", diff --git a/tests/gateway/test_signal.py b/tests/gateway/test_signal.py index afaaeb843a0..b95a16d5409 100644 --- a/tests/gateway/test_signal.py +++ b/tests/gateway/test_signal.py @@ -1009,6 +1009,97 @@ class TestSignalSendReturnsMessageId: assert result.message_id is None +class TestSignalSendResultValidation: + """Verify that send() validates recipient-level delivery results.""" + + @pytest.mark.asyncio + async def test_send_success_when_results_has_success(self, monkeypatch): + adapter = _make_signal_adapter(monkeypatch) + mock_rpc, _ = _stub_rpc({ + "timestamp": 1712345678000, + "results": [ + { + "recipientAddress": {"number": "+155****4567"}, + "type": "SUCCESS" + } + ] + }) + adapter._rpc = mock_rpc + adapter._stop_typing_indicator = AsyncMock() + + result = await adapter.send(chat_id="+155****4567", content="hello") + assert result.success is True + + @pytest.mark.asyncio + async def test_send_failure_when_results_has_failure_type(self, monkeypatch): + adapter = _make_signal_adapter(monkeypatch) + mock_rpc, _ = _stub_rpc({ + "timestamp": 1712345678000, + "results": [ + { + "recipientAddress": {"number": "+155****4567"}, + "type": "UNREGISTERED_FAILURE" + } + ] + }) + adapter._rpc = mock_rpc + adapter._stop_typing_indicator = AsyncMock() + + result = await adapter.send(chat_id="+155****4567", content="hello") + assert result.success is False + assert result.error == "UNREGISTERED_FAILURE" + + @pytest.mark.asyncio + async def test_send_failure_when_results_has_success_false(self, monkeypatch): + adapter = _make_signal_adapter(monkeypatch) + mock_rpc, _ = _stub_rpc({ + "timestamp": 1712345678000, + "results": [ + { + "recipientAddress": {"number": "+155****4567"}, + "success": False, + "failure": "Some connection error" + } + ] + }) + adapter._rpc = mock_rpc + adapter._stop_typing_indicator = AsyncMock() + + result = await adapter.send(chat_id="+155****4567", content="hello") + assert result.success is False + assert result.error == "Some connection error" + + @pytest.mark.asyncio + async def test_rpc_raises_rate_limit_on_results_failure(self, monkeypatch): + adapter = _make_signal_adapter(monkeypatch) + mock_client = AsyncMock() + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "jsonrpc": "2.0", + "result": { + "timestamp": 1712345678000, + "results": [ + { + "recipientAddress": {"number": "+155****4567"}, + "type": "RATE_LIMIT_FAILURE", + "retryAfterSeconds": 15 + } + ] + }, + "id": "1" + } + mock_client.post = AsyncMock(return_value=mock_response) + adapter.client = mock_client + + from gateway.platforms.signal_rate_limit import SignalRateLimitError + with pytest.raises(SignalRateLimitError) as exc_info: + await adapter._rpc("send", {"recipient": ["+155****4567"]}, raise_on_rate_limit=True) + + assert "Rate limit exceeded for recipient" in str(exc_info.value) + assert exc_info.value.retry_after == 15 + + # --------------------------------------------------------------------------- # stop_typing() delegates to _stop_typing_indicator (#4647) # ---------------------------------------------------------------------------