From 04ea895ffb4f8e7b4f2bb3d7959db07679bd09f9 Mon Sep 17 00:00:00 2001 From: Maxence Groine Date: Thu, 30 Apr 2026 12:11:07 +0200 Subject: [PATCH] feat(gateway/signal): add support for multiple images sending Adds a new `send_multiple_images` method to the ``BasePlatformAdapter`` that implements the default "One image per message" loop and allows for platform-specific overriding. Implements such an override for the Signal adapter, batching images and trying (best-effort) to work around rate-limits for voluminous batches using a specific scheduler. Also implements batching + rate-limit handling in the `send_message` tool. New tests added for the Signal adapter, its rate-limit scheduler and the `send_message` tool --- gateway/platforms/base.py | 142 +++-- gateway/platforms/signal.py | 207 +++++++- gateway/platforms/signal_rate_limit.py | 369 +++++++++++++ gateway/run.py | 44 +- tests/gateway/test_signal.py | 547 ++++++++++++++++++++ tests/gateway/test_signal_rate_limit.py | 233 +++++++++ tests/tools/test_send_message_tool.py | 371 +++++++++++++ tools/send_message_tool.py | 176 ++++++- website/docs/user-guide/messaging/signal.md | 5 +- 9 files changed, 2010 insertions(+), 84 deletions(-) create mode 100644 gateway/platforms/signal_rate_limit.py create mode 100644 tests/gateway/test_signal_rate_limit.py diff --git a/gateway/platforms/base.py b/gateway/platforms/base.py index fe97eb69c5..417893fea2 100644 --- a/gateway/platforms/base.py +++ b/gateway/platforms/base.py @@ -1505,7 +1505,64 @@ class BasePlatformAdapter(ABC): Default is a no-op for platforms with one-shot typing indicators. """ pass - + + async def send_multiple_images( + self, + chat_id: str, + images: List[Tuple[str, str]], + metadata: Optional[Dict[str, Any]] = None, + human_delay: float = 0.0, + ) -> None: + """Send a batch of images. + + Accepts ``http(s)://``, ``file://`` URIs in the first tuple + element. + + Default implementation sends each item individually, + routing animated GIFs through ``send_animation`` and local + files through ``send_image_file``. + + Override in subclasses to bundle into a single native API call + (e.g. Signal's multi-attachment RPC) + """ + from urllib.parse import unquote as _unquote + + for image_url, alt_text in images: + if human_delay > 0: + await asyncio.sleep(human_delay) + try: + logger.info( + "[%s] Sending image: %s (alt=%s)", + self.name, + safe_url_for_log(image_url), + alt_text[:30] if alt_text else "", + ) + if image_url.startswith("file://"): + img_result = await self.send_image_file( + chat_id=chat_id, + image_path=_unquote(image_url[7:]), + caption=alt_text if alt_text else None, + metadata=metadata, + ) + elif self._is_animation_url(image_url): + img_result = await self.send_animation( + chat_id=chat_id, + animation_url=image_url, + caption=alt_text if alt_text else None, + metadata=metadata, + ) + else: + img_result = await self.send_image( + chat_id=chat_id, + image_url=image_url, + caption=alt_text if alt_text else None, + metadata=metadata, + ) + if not img_result.success: + logger.error("[%s] Failed to send image: %s", self.name, img_result.error) + except Exception as img_err: + logger.error("[%s] Error sending image: %s", self.name, img_err, exc_info=True) + async def send_image( self, chat_id: str, @@ -2587,41 +2644,52 @@ class BasePlatformAdapter(ABC): # Send extracted images as native attachments if images: logger.info("[%s] Extracted %d image(s) to send as attachments", self.name, len(images)) - for image_url, alt_text in images: - if human_delay > 0: - await asyncio.sleep(human_delay) try: - logger.info( - "[%s] Sending image: %s (alt=%s)", - self.name, - safe_url_for_log(image_url), - alt_text[:30] if alt_text else "", + await self.send_multiple_images( + chat_id=event.source.chat_id, + images=images, + metadata=_thread_metadata, + human_delay=human_delay, ) - # Route animated GIFs through send_animation for proper playback - if self._is_animation_url(image_url): - img_result = await self.send_animation( - chat_id=event.source.chat_id, - animation_url=image_url, - caption=alt_text if alt_text else None, - metadata=_thread_metadata, - ) - else: - img_result = await self.send_image( - chat_id=event.source.chat_id, - image_url=image_url, - caption=alt_text if alt_text else None, - metadata=_thread_metadata, - ) - if not img_result.success: - logger.error("[%s] Failed to send image: %s", self.name, img_result.error) - except Exception as img_err: - logger.error("[%s] Error sending image: %s", self.name, img_err, exc_info=True) + except Exception as batch_err: + logger.warning("[%s] Error batching images: %s", self.name, batch_err, exc_info=True) + # Send extracted media files — route by file type _VIDEO_EXTS = {'.mp4', '.mov', '.avi', '.mkv', '.webm', '.3gp'} _IMAGE_EXTS = {'.jpg', '.jpeg', '.png', '.webp', '.gif'} + # Partition images out of media_files + local_files so they + # can be sent as a single batch (Signal RPC) + from urllib.parse import quote as _quote + _image_paths: list = [] + _non_image_media: list = [] for media_path, is_voice in media_files: + _ext = Path(media_path).suffix.lower() + if _ext in _IMAGE_EXTS and not is_voice: + _image_paths.append(media_path) + else: + _non_image_media.append((media_path, is_voice)) + _non_image_local: list = [] + for file_path in local_files: + if Path(file_path).suffix.lower() in _IMAGE_EXTS: + _image_paths.append(file_path) + else: + _non_image_local.append(file_path) + + if _image_paths: + try: + _batch = [(f"file://{_quote(p)}", "") for p in _image_paths] + await self.send_multiple_images( + chat_id=event.source.chat_id, + images=_batch, + metadata=_thread_metadata, + human_delay=human_delay, + ) + except Exception as batch_err: + logger.warning("[%s] Error batching images: %s", self.name, batch_err, exc_info=True) + + for media_path, is_voice in _non_image_media: if human_delay > 0: await asyncio.sleep(human_delay) try: @@ -2638,12 +2706,6 @@ class BasePlatformAdapter(ABC): video_path=media_path, metadata=_thread_metadata, ) - elif ext in _IMAGE_EXTS: - media_result = await self.send_image_file( - chat_id=event.source.chat_id, - image_path=media_path, - metadata=_thread_metadata, - ) else: media_result = await self.send_document( chat_id=event.source.chat_id, @@ -2656,19 +2718,13 @@ class BasePlatformAdapter(ABC): except Exception as media_err: logger.warning("[%s] Error sending media: %s", self.name, media_err) - # Send auto-detected local files as native attachments - for file_path in local_files: + # Send auto-detected local non-image files as native attachments + for file_path in _non_image_local: if human_delay > 0: await asyncio.sleep(human_delay) try: ext = Path(file_path).suffix.lower() - if ext in _IMAGE_EXTS: - await self.send_image_file( - chat_id=event.source.chat_id, - image_path=file_path, - metadata=_thread_metadata, - ) - elif ext in _VIDEO_EXTS: + if ext in _VIDEO_EXTS: await self.send_video( chat_id=event.source.chat_id, video_path=file_path, diff --git a/gateway/platforms/signal.py b/gateway/platforms/signal.py index 3dd1e349bd..0ad1ef751c 100644 --- a/gateway/platforms/signal.py +++ b/gateway/platforms/signal.py @@ -21,7 +21,7 @@ import time import uuid from datetime import datetime, timezone from pathlib import Path -from typing import Dict, List, Optional, Any +from typing import Any, Dict, List, Optional, Tuple from urllib.parse import quote, unquote import httpx @@ -39,6 +39,17 @@ from gateway.platforms.base import ( cache_image_from_url, ) from gateway.platforms.helpers import redact_phone +from gateway.platforms.signal_rate_limit import ( + SIGNAL_BATCH_PACING_NOTICE_THRESHOLD, + SIGNAL_MAX_ATTACHMENTS_PER_MSG, + SIGNAL_RATE_LIMIT_MAX_ATTEMPTS, + SignalRateLimitError, + _extract_retry_after_seconds, + _format_wait, + _is_signal_rate_limit_error, + _signal_send_timeout, + get_scheduler, +) logger = logging.getLogger(__name__) @@ -53,6 +64,7 @@ SSE_RETRY_DELAY_MAX = 60.0 HEALTH_CHECK_INTERVAL = 30.0 # seconds between health checks HEALTH_CHECK_STALE_THRESHOLD = 120.0 # seconds without SSE activity before concern + # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- @@ -674,6 +686,8 @@ class SignalAdapter(BasePlatformAdapter): rpc_id: str = None, *, log_failures: bool = True, + raise_on_rate_limit: bool = False, + timeout: float = 30.0, ) -> Any: """Send a JSON-RPC 2.0 request to signal-cli daemon. @@ -682,6 +696,11 @@ class SignalAdapter(BasePlatformAdapter): repeated NETWORK_FAILURE spam for unreachable recipients while still preserving visibility for the first occurrence and for unrelated RPCs. + + When ``raise_on_rate_limit=True``, a Signal ``[429]`` / + ``RateLimitException`` response raises ``SignalRateLimitError`` + instead of being swallowed — lets callers (multi-attachment send) + opt into backoff-retry without changing default behaviour. """ if not self.client: logger.warning("Signal: RPC called but client not connected") @@ -701,20 +720,28 @@ class SignalAdapter(BasePlatformAdapter): resp = await self.client.post( f"{self.http_url}/api/v1/rpc", json=payload, - timeout=30.0, + timeout=timeout, ) resp.raise_for_status() data = resp.json() if "error" in data: + err = data["error"] + if raise_on_rate_limit: + if _is_signal_rate_limit_error(err): + err_msg = str(err.get("message", "")) if isinstance(err, dict) else str(err) + retry_after = _extract_retry_after_seconds(err) + raise SignalRateLimitError(err_msg, retry_after=retry_after) if log_failures: - logger.warning("Signal RPC error (%s): %s", method, data["error"]) + logger.warning("Signal RPC error (%s): %s", method, err) else: - logger.debug("Signal RPC error (%s): %s", method, data["error"]) + logger.debug("Signal RPC error (%s): %s", method, err) return None return data.get("result") + except SignalRateLimitError: + raise except Exception as e: if log_failures: logger.warning("Signal RPC %s failed: %s", method, e) @@ -978,6 +1005,178 @@ class SignalAdapter(BasePlatformAdapter): self._typing_failures.pop(chat_id, None) self._typing_skip_until.pop(chat_id, None) + async def send_multiple_images( + self, + chat_id: str, + images: List[Tuple[str, str]], + metadata: Optional[Dict[str, Any]] = None, + human_delay: float = 0.0, + ) -> None: + """Send a batch of images via chunked Signal RPC calls. + + Per-image alt texts are dropped — Signal's send RPC only carries + one shared message body. Bad images (download failure, missing + file, oversize) are skipped with a warning so one bad URL + doesn't lose the rest of the batch. ``human_delay`` is ignored: + the rate-limit scheduler handles inter-batch pacing. + """ + if not images: + return + + scheduler = get_scheduler() + logger.info( + "Signal send_multiple_images: received %d image(s) for %s — " + "scheduler state: %s", + len(images), chat_id[:30], scheduler.state(), + ) + + await self._stop_typing_indicator(chat_id) + + attachments: List[str] = [] + skipped_download = 0 + skipped_missing = 0 + skipped_oversize = 0 + for image_url, _alt_text in images: + if image_url.startswith("file://"): + file_path = unquote(image_url[7:]) + else: + try: + file_path = await cache_image_from_url(image_url) + except Exception as e: + logger.warning("Signal: failed to download image %s: %s", image_url, e) + skipped_download += 1 + continue + + if not file_path or not Path(file_path).exists(): + logger.warning("Signal: image file not found for %s", image_url) + skipped_missing += 1 + continue + + file_size = Path(file_path).stat().st_size + if file_size > SIGNAL_MAX_ATTACHMENT_SIZE: + logger.warning( + "Signal: image too large (%d bytes), skipping %s", file_size, image_url + ) + skipped_oversize += 1 + continue + + attachments.append(file_path) + + if not attachments: + logger.error( + "Signal: no valid images in batch of %d " + "(download=%d missing=%d oversize=%d)", + len(images), skipped_download, skipped_missing, skipped_oversize, + ) + return + + logger.info( + "Signal send_multiple_images: %d/%d images valid, sending in chunks", + len(attachments), len(images), + ) + + base_params: Dict[str, Any] = { + "account": self.account, + "message": "", + } + if chat_id.startswith("group:"): + base_params["groupId"] = chat_id[6:] + else: + base_params["recipient"] = [await self._resolve_recipient(chat_id)] + + att_batches = [ + attachments[i:i + SIGNAL_MAX_ATTACHMENTS_PER_MSG] + for i in range(0, len(attachments), SIGNAL_MAX_ATTACHMENTS_PER_MSG) + ] + + for idx, att_batch in enumerate(att_batches): + n = len(att_batch) + estimated = scheduler.estimate_wait(n) + logger.debug( + "Signal batch %d/%d: %d attachments, estimated wait=%.1fs", + idx + 1, len(att_batches), n, estimated, + ) + if estimated >= SIGNAL_BATCH_PACING_NOTICE_THRESHOLD: + await self._notify_batch_pacing( + chat_id, idx + 1, len(att_batches), estimated + ) + + params = dict(base_params, attachments=att_batch) + send_timeout = _signal_send_timeout(n) + + for attempt in range(1, SIGNAL_RATE_LIMIT_MAX_ATTEMPTS + 1): + await scheduler.acquire(n) + try: + _rpc_t0 = time.monotonic() + result = await self._rpc( + "send", params, raise_on_rate_limit=True, timeout=send_timeout, + ) + _rpc_duration = time.monotonic() - _rpc_t0 + if result is not None: + self._track_sent_timestamp(result) + await scheduler.report_rpc_duration(_rpc_duration, n) + logger.info( + "Signal batch %d/%d: %d attachments sent in %.1fs " + "(attempt %d/%d)", + idx + 1, len(att_batches), n, _rpc_duration, + attempt, SIGNAL_RATE_LIMIT_MAX_ATTEMPTS, + ) + else: + # Assume the server didn't accept the batch, don't deduce tokens + logger.error( + "Signal: RPC send failed for batch %d/%d (%d attachments, " + "attempt %d/%d, rpc_duration=%.1fs)", + idx + 1, len(att_batches), n, + attempt, SIGNAL_RATE_LIMIT_MAX_ATTEMPTS, + _rpc_duration, + ) + # Retry transient (non-rate-limit) failures once + if attempt < SIGNAL_RATE_LIMIT_MAX_ATTEMPTS: + backoff = 2.0 ** attempt + logger.info( + "Signal: retrying batch %d/%d after %.1fs backoff", + idx + 1, len(att_batches), backoff, + ) + await asyncio.sleep(backoff) + continue + break + except SignalRateLimitError as e: + scheduler.feedback(e.retry_after, n) + if attempt >= SIGNAL_RATE_LIMIT_MAX_ATTEMPTS: + logger.error( + "Signal: rate-limit retries exhausted on batch %d/%d " + "(%d attachments lost, server retry_after=%s)", + idx + 1, len(att_batches), n, + f"{e.retry_after:.0f}s" if e.retry_after else "unknown", + ) + break + logger.warning( + "Signal: rate-limited on batch %d/%d " + "(attempt %d/%d, server retry_after=%s); " + "scheduler will pace the retry", + idx + 1, len(att_batches), + attempt, SIGNAL_RATE_LIMIT_MAX_ATTEMPTS, + f"{e.retry_after:.0f}s" if e.retry_after else "unknown", + ) + + async def _notify_batch_pacing( + self, + chat_id: str, + next_batch_idx: int, + total_batches: int, + wait_s: float, + ) -> None: + """Inform the user when an inter-batch pacing wait crosses the + notice threshold. Best-effort; logs and continues on failure.""" + try: + await self.send( + chat_id, + f"(More images coming — pausing ~{_format_wait(wait_s)} " + f"for Signal rate limit, batch {next_batch_idx}/{total_batches}.)", + ) + except Exception as e: + logger.warning("Signal: failed to send pacing notice: %s", e) + async def send_image( self, chat_id: str, diff --git a/gateway/platforms/signal_rate_limit.py b/gateway/platforms/signal_rate_limit.py new file mode 100644 index 0000000000..5cb8b3d69e --- /dev/null +++ b/gateway/platforms/signal_rate_limit.py @@ -0,0 +1,369 @@ +""" +Signal attachment rate-limit scheduler. + +Process-wide token-bucket simulator that mirrors the per-account +attachment rate limit signal-cli/Signal-Server enforce. Producers +(``SignalAdapter.send_multiple_images`` and the ``send_message`` tool's +Signal path) call ``acquire(n)`` before an attachment send; on a 429 +they call ``feedback(retry_after, n)`` so the model recalibrates from +the server's authoritative hint. + +The scheduler serializes concurrent calls through an ``asyncio.Lock``, +giving FIFO fairness across agent sessions sharing one signal-cli +daemon. +""" + +from __future__ import annotations + +import asyncio +import logging +import re +import time +from typing import Any, Optional + +logger = logging.getLogger(__name__) + + +# --------------------------------------------------------------------------- +# Constants +# --------------------------------------------------------------------------- + +SIGNAL_MAX_ATTACHMENTS_PER_MSG = 32 # per-message attachment cap (source: Signal-{Android,Desktop} source code) +SIGNAL_RATE_LIMIT_BUCKET_CAPACITY = 50 # server-side token-bucket capacity for attachments rate limiting +SIGNAL_RATE_LIMIT_DEFAULT_RETRY_AFTER = 4 # fallback token refill interval for signal-cli < v0.14.3 +SIGNAL_RATE_LIMIT_MAX_ATTEMPTS = 2 # initial attempt + 1 retry +SIGNAL_BATCH_PACING_NOTICE_THRESHOLD = 10.0 # if estimated waiting time > 10s, notify the user about the delay +SIGNAL_RPC_ERROR_RATELIMIT = -5 # signal-cli (v0.14.3+) JSON-RPC error code for RateLimitException + + +# --------------------------------------------------------------------------- +# Errors +# --------------------------------------------------------------------------- + +class SignalRateLimitError(Exception): + """ + Raised by ``SignalAdapter._rpc`` for rate-limit responses when the + caller has opted in via ``raise_on_rate_limit=True``. + + Carries the server-supplied per-token Retry-After (in seconds) on + signal-cli ≥ v0.14.3 + ``retry_after`` is None when the version doesn't expose it. + """ + + def __init__(self, message: str, retry_after: Optional[float] = None) -> None: + super().__init__(message) + self.retry_after = retry_after + + +class SignalSchedulerError(Exception): + pass + +# --------------------------------------------------------------------------- +# Detection helpers — used to fish a 429 out of signal-cli's various error +# shapes (typed code, [429] substring, libsignal-net RetryLaterException +# leaked through AttachmentInvalidException). +# --------------------------------------------------------------------------- + +# "Retry after 4 seconds" / "retry after 4 second" — libsignal-net's +# RetryLaterException string form, surfaced when 429s hit during +# attachment upload (signal-cli wraps these as AttachmentInvalidException +# rather than RateLimitException, so the typed path doesn't fire). +_RETRY_AFTER_RE = re.compile(r"Retry after (\d+(?:\.\d+)?)\s*second", re.IGNORECASE) + + +def _extract_retry_after_seconds(err: Any) -> Optional[float]: + """Pull the per-token Retry-After window from a signal-cli rate-limit error. + + Tries two sources, in order: + 1. ``error.data.response.results[*].retryAfterSeconds`` — the + structured field signal-cli ≥ v0.14.3 surfaces for plain + RateLimitException. + 2. ``"Retry after N seconds"`` parsed out of the message — covers + libsignal-net's RetryLaterException that gets wrapped as + AttachmentInvalidException during attachment upload, where the + structured field stays null. + + Returns None when neither yields a value. + """ + msg = "" + if isinstance(err, dict): + data = err.get("data") or {} + response = data.get("response") or {} + results = response.get("results") or [] + candidates = [ + r.get("retryAfterSeconds") for r in results + if isinstance(r, dict) and r.get("retryAfterSeconds") + ] + if candidates: + return float(max(candidates)) + msg = str(err.get("message", "")) + else: + msg = str(err) + match = _RETRY_AFTER_RE.search(msg) + return float(match.group(1)) if match else None + + +def _is_signal_rate_limit_error(err: Any) -> bool: + """True if a signal-cli RPC error reflects a rate-limit failure. + + Matches three layers: + - typed ``RATELIMIT_ERROR`` code (signal-cli ≥ v0.14.3, plain + RateLimitException) + - legacy ``[429] / RateLimitException`` substrings + - libsignal-net's ``RetryLaterException`` / ``Retry after N seconds`` + surfaced inside ``AttachmentInvalidException`` when the rate + limit is hit during attachment upload — signal-cli never re-tags + these as RateLimitException, so substring is the only signal. + """ + if isinstance(err, dict) and err.get("code") == SIGNAL_RPC_ERROR_RATELIMIT: + return True + + message = ( + str(err.get("message", "")) + if isinstance(err, dict) + else str(err) + ) + msg_lower = message.lower() + return ( + "[429]" in message + or "ratelimit" in msg_lower + or "retrylaterexception" in msg_lower + or "retry after" in msg_lower + ) + + +# --------------------------------------------------------------------------- +# Misc helpers +# --------------------------------------------------------------------------- + +def _format_wait(seconds: float) -> str: + """Human-friendly wait label for user-facing pacing notices.""" + s = max(0.0, seconds) + if s < 90: + return f"{int(round(s))}s" + return f"{max(1, int(round(s / 60)))} min" + + +def _signal_send_timeout(num_attachments: int) -> float: + """HTTP timeout for a Signal ``send`` RPC. + + signal-cli uploads attachments serially during the call, so the + server-side time scales with batch size. Default 30s is fine for + text-only sends but truncates large attachment batches mid-upload — + we then log a phantom failure even though signal-cli completes the + send a few seconds later. Scale at 5s/attachment with a 60s floor. + """ + if num_attachments <= 0: + return 30.0 + return max(60.0, 5.0 * num_attachments) + + +# --------------------------------------------------------------------------- +# Scheduler +# --------------------------------------------------------------------------- + +class SignalAttachmentScheduler: + """Process-wide token-bucket simulator for Signal attachment sends. + + The bucket holds up to ``capacity`` tokens (default 50, matching + Signal's server-side rate-limit bucket size). Each attachment consumes one + token. Tokens refill at ``refill_rate`` tokens/second, calibrated + from the per-token Retry-After hint we get from the server when a + 429 fires. Until we've observed one, we use the documented default + (1 token / 4 seconds). + + Concurrent ``acquire(n)`` calls serialize through an + ``asyncio.Lock`` — natural FIFO across agent sessions hitting the + same daemon. + """ + + def __init__( + self, + capacity: float = float(SIGNAL_RATE_LIMIT_BUCKET_CAPACITY), + default_retry_after: float = float(SIGNAL_RATE_LIMIT_DEFAULT_RETRY_AFTER), + ) -> None: + self.capacity = float(capacity) + self.tokens = float(capacity) + self.refill_rate = 1.0 / float(default_retry_after) + self.last_refill = time.monotonic() + self._lock = asyncio.Lock() + + # ------------------------------------------------------------------ + # Internals + # ------------------------------------------------------------------ + + def _refill(self) -> None: + now = time.monotonic() + elapsed = now - self.last_refill + if elapsed > 0 and self.tokens < self.capacity: + self.tokens = min(self.capacity, self.tokens + elapsed * self.refill_rate) + self.last_refill = now + + # ------------------------------------------------------------------ + # Public API + # ------------------------------------------------------------------ + + def estimate_wait(self, n: int) -> float: + """Best-effort estimate of the seconds until ``n`` tokens would + be available. Used to decide whether to emit a user-facing + pacing notice *before* committing to an ``acquire`` that may + block silently. Lock-free; small races vs. concurrent acquires + are benign for an informational notice. + """ + now = time.monotonic() + elapsed = now - self.last_refill + projected = self.tokens + if elapsed > 0 and projected < self.capacity: + projected = min(self.capacity, projected + elapsed * self.refill_rate) + deficit = n - projected + if deficit <= 0: + return 0.0 + return deficit / self.refill_rate + + async def acquire(self, n: int) -> float: + """Block until at least ``n`` tokens are available, return the + seconds slept. + + Does **not** deduct tokens — the bucket is a read-only model of + server-side capacity. Call ``report_rpc_duration()`` after the + RPC to synchronise the model with the server timeline. + + Not perfect in case lots of coroutines try to acquire for big + uploads (``report_rpc_duration`` will take a long time to get hit) + but this is just a simulation. Signal server is ground truth and + will raise rate-limit exceptions triggering requeues. + + The lock is released during ``asyncio.sleep`` so other callers + can interleave. A retry loop re-checks after each sleep in + case the deadline was pessimistic. + """ + if n <= 0: + return 0.0 + if n > self.capacity: + raise SignalSchedulerError( + f"Signal scheduler was called requesting {n} tokens " + f"(max is {self.capacity})", + ) + + total_slept = 0.0 + first_pass = True + while True: + async with self._lock: + self._refill() + if self.tokens >= n: + if not first_pass or total_slept > 0: + logger.debug( + "Signal scheduler: tokens sufficient for %d " + "(remaining=%.1f, total_slept=%.1fs)", + n, self.tokens, total_slept, + ) + return total_slept + deficit = n - self.tokens + wait = deficit / self.refill_rate + if first_pass: + logger.info( + "Signal scheduler: pausing %.1fs for %d tokens " + "(available=%.1f, deficit=%.1f, refill=%.4f/s ≈ %.1fs/token)", + wait, n, self.tokens, deficit, + self.refill_rate, 1.0 / self.refill_rate, + ) + first_pass = False + await asyncio.sleep(wait) + total_slept += wait + + async def report_rpc_duration(self, rpc_duration: float, n_attachments: int) -> None: + """Record an attachment-send RPC that just completed. + + Deducts ``n_attachments`` tokens without crediting refill during + the upload window. Signal's server checks the bucket at RPC start + and does *not* refill during request processing — refill resumes + after the response. Crediting upload-time refill causes cumulative + drift that eventually triggers 429s. + + Advances ``last_refill`` so the next ``acquire`` / ``_refill`` + starts counting from this point. + """ + if n_attachments <= 0: + return + + async with self._lock: + now = time.monotonic() + token_before = self.tokens + self.tokens = max(0.0, token_before - float(n_attachments)) + self.last_refill = now + logger.log( + logging.INFO if rpc_duration > 10 and n_attachments > 5 else logging.DEBUG, + "Signal scheduler: RPC for %d att took %.1fs — " + "tokens %.1f → %.1f (deducted=%d, no upload refill credited, refill=%.4fs⁻¹)", + n_attachments, rpc_duration, + token_before, self.tokens, + n_attachments, self.refill_rate, + ) + + def feedback(self, retry_after: Optional[float], n_attempted: int) -> None: + """Apply server feedback after a 429. + + ``retry_after`` is the per-*token* refill window the server + reports (None when signal-cli is older than v0.14.3 and didn't + surface it). + + When present we calibrate ``refill_rate`` from it: + the server is authoritative. + """ + if retry_after and retry_after > 0: + new_rate = 1.0 / float(retry_after) + if new_rate != self.refill_rate: + logger.info( + "Signal scheduler: calibrating refill_rate to %.4f tokens/sec " + "(server retry_after=%.1fs per token)", + new_rate, retry_after, + ) + self.refill_rate = new_rate + self.tokens = 0.0 + self.last_refill = time.monotonic() + + def state(self) -> dict: + """Return current scheduler state for diagnostic logging (read-only). + + Does not advance ``last_refill`` — safe to call from logging paths + without perturbing the bucket. + """ + now = time.monotonic() + elapsed = now - self.last_refill + projected = self.tokens + if elapsed > 0 and projected < self.capacity: + projected = min(self.capacity, projected + elapsed * self.refill_rate) + return { + "tokens": round(projected, 1), + "capacity": int(self.capacity), + "refill_rate": round(self.refill_rate, 4), + "refill_seconds_per_token": round(1.0 / self.refill_rate, 1) if self.refill_rate > 0 else float("inf"), + } + + +# --------------------------------------------------------------------------- +# Process-wide singleton +# --------------------------------------------------------------------------- + +_scheduler: Optional[SignalAttachmentScheduler] = None + + +def get_scheduler() -> SignalAttachmentScheduler: + """Return the process-wide scheduler, creating it on first access.""" + global _scheduler + if _scheduler is None: + _scheduler = SignalAttachmentScheduler() + logger.info( + "Signal scheduler: created (capacity=%d tokens, refill=%.4f/s ≈ %.1fs/token)", + int(_scheduler.capacity), + _scheduler.refill_rate, + 1.0 / _scheduler.refill_rate, + ) + return _scheduler + + +def _reset_scheduler() -> None: + """Drop the cached scheduler so the next ``get_scheduler`` call + builds a fresh one. Test-only — never call from production paths.""" + global _scheduler + _scheduler = None diff --git a/gateway/run.py b/gateway/run.py index af654021f0..3cefc80b75 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -7186,6 +7186,7 @@ class GatewayRunner: that the normal _process_message_background path would have caught. """ from pathlib import Path + from urllib.parse import quote as _quote try: media_files, _ = adapter.extract_media(response) @@ -7199,7 +7200,36 @@ class GatewayRunner: _VIDEO_EXTS = {'.mp4', '.mov', '.avi', '.mkv', '.webm', '.3gp'} _IMAGE_EXTS = {'.jpg', '.jpeg', '.png', '.webp', '.gif'} + # Partition out images so they can be sent as a single batch + # (e.g. Signal's multi-attachment RPC) + image_paths: list = [] + non_image_media: list = [] for media_path, is_voice in media_files: + ext = Path(media_path).suffix.lower() + if ext in _IMAGE_EXTS and not is_voice: + image_paths.append(media_path) + else: + non_image_media.append((media_path, is_voice)) + + non_image_local: list = [] + for file_path in local_files: + if Path(file_path).suffix.lower() in _IMAGE_EXTS: + image_paths.append(file_path) + else: + non_image_local.append(file_path) + + if image_paths: + try: + images = [(f"file://{_quote(p)}", "") for p in image_paths] + await adapter.send_multiple_images( + chat_id=event.source.chat_id, + images=images, + metadata=_thread_meta, + ) + except Exception as e: + logger.warning("[%s] Post-stream image batch delivery failed: %s", adapter.name, e) + + for media_path, is_voice in non_image_media: try: ext = Path(media_path).suffix.lower() if should_send_media_as_audio(event.source.platform, ext, is_voice=is_voice): @@ -7214,12 +7244,6 @@ class GatewayRunner: video_path=media_path, metadata=_thread_meta, ) - elif ext in _IMAGE_EXTS: - await adapter.send_image_file( - chat_id=event.source.chat_id, - image_path=media_path, - metadata=_thread_meta, - ) else: await adapter.send_document( chat_id=event.source.chat_id, @@ -7229,13 +7253,13 @@ class GatewayRunner: except Exception as e: logger.warning("[%s] Post-stream media delivery failed: %s", adapter.name, e) - for file_path in local_files: + for file_path in non_image_local: try: ext = Path(file_path).suffix.lower() - if ext in _IMAGE_EXTS: - await adapter.send_image_file( + if ext in _VIDEO_EXTS: + await adapter.send_video( chat_id=event.source.chat_id, - image_path=file_path, + video_path=file_path, metadata=_thread_meta, ) else: diff --git a/tests/gateway/test_signal.py b/tests/gateway/test_signal.py index ca8f458a27..8aab559a19 100644 --- a/tests/gateway/test_signal.py +++ b/tests/gateway/test_signal.py @@ -1,4 +1,5 @@ """Tests for Signal messenger platform adapter.""" +import asyncio import base64 import json import pytest @@ -9,6 +10,16 @@ from urllib.parse import quote from gateway.config import Platform, PlatformConfig +@pytest.fixture(autouse=True) +def _reset_signal_scheduler(): + """The attachment scheduler is process-wide; drop it between tests + so a fresh token bucket greets each case.""" + from gateway.platforms.signal_rate_limit import _reset_scheduler + _reset_scheduler() + yield + _reset_scheduler() + + # --------------------------------------------------------------------------- # Shared Helpers # --------------------------------------------------------------------------- @@ -1102,3 +1113,539 @@ class TestSignalQuoteExtraction: event = captured["event"] assert event.reply_to_message_id == "123" assert event.reply_to_text is None + +# --------------------------------------------------------------------------- +# _rpc rate-limit detection +# --------------------------------------------------------------------------- + +class _FakeHttpResponse: + """Minimal stand-in for httpx.Response — only what _rpc touches.""" + + def __init__(self, json_data): + self._json = json_data + + def raise_for_status(self): + return None + + def json(self): + return self._json + + +def _install_fake_client(adapter, json_data): + """Replace adapter.client.post with an async fn returning json_data.""" + from types import SimpleNamespace + + async def _post(url, json=None, timeout=None): + return _FakeHttpResponse(json_data) + + adapter.client = SimpleNamespace(post=_post) + + +class TestSignalRpcRateLimit: + """_rpc opt-in 429 detection and SignalRateLimitError propagation.""" + + @pytest.mark.asyncio + async def test_raises_on_429_when_opted_in(self, monkeypatch): + from gateway.platforms.signal import SignalRateLimitError + + adapter = _make_signal_adapter(monkeypatch) + _install_fake_client(adapter, { + "error": {"message": "Failed to send: [429] Rate Limited"}, + }) + + with pytest.raises(SignalRateLimitError): + await adapter._rpc("send", {}, raise_on_rate_limit=True) + + @pytest.mark.asyncio + async def test_raises_on_rate_limit_exception_substring(self, monkeypatch): + """Some signal-cli builds emit 'RateLimitException' without a literal [429].""" + from gateway.platforms.signal import SignalRateLimitError + + adapter = _make_signal_adapter(monkeypatch) + _install_fake_client(adapter, { + "error": {"message": "RateLimitException occurred"}, + }) + + with pytest.raises(SignalRateLimitError): + await adapter._rpc("send", {}, raise_on_rate_limit=True) + + @pytest.mark.asyncio + async def test_default_swallows_rate_limit_returns_none(self, monkeypatch): + """Without opt-in, 429 stays swallowed — preserves backwards compat.""" + adapter = _make_signal_adapter(monkeypatch) + _install_fake_client(adapter, { + "error": {"message": "[429] Rate Limited"}, + }) + + result = await adapter._rpc("send", {}) + assert result is None + + @pytest.mark.asyncio + async def test_non_rate_limit_error_does_not_raise_when_opted_in(self, monkeypatch): + """Opt-in only escalates 429s; other errors still return None.""" + adapter = _make_signal_adapter(monkeypatch) + _install_fake_client(adapter, { + "error": {"message": "Recipient unknown (UntrustedIdentityException)"}, + }) + + result = await adapter._rpc("send", {}, raise_on_rate_limit=True) + assert result is None + + @pytest.mark.asyncio + async def test_raises_with_retry_after_from_v0_14_3_payload(self, monkeypatch): + """signal-cli ≥ v0.14.3 surfaces server Retry-After under + ``error.data.response.results[*].retryAfterSeconds`` — _rpc + carries that value through SignalRateLimitError.retry_after.""" + from gateway.platforms.signal_rate_limit import ( + SignalRateLimitError, SIGNAL_RPC_ERROR_RATELIMIT, + ) + + adapter = _make_signal_adapter(monkeypatch) + _install_fake_client(adapter, { + "error": { + "code": SIGNAL_RPC_ERROR_RATELIMIT, + "message": "Failed to send message due to rate limiting", + "data": { + "response": { + "timestamp": 0, + "results": [ + {"type": "RATE_LIMIT_FAILURE", "retryAfterSeconds": 90}, + ], + } + }, + }, + }) + + with pytest.raises(SignalRateLimitError) as exc_info: + await adapter._rpc("send", {}, raise_on_rate_limit=True) + + assert exc_info.value.retry_after == 90.0 + + @pytest.mark.asyncio + async def test_raises_with_retry_after_none_for_old_signal_cli(self, monkeypatch): + """Older signal-cli builds emit only the substring; retry_after=None.""" + from gateway.platforms.signal import SignalRateLimitError + + adapter = _make_signal_adapter(monkeypatch) + _install_fake_client(adapter, { + "error": {"message": "Failed: [429] Rate Limited"}, + }) + + with pytest.raises(SignalRateLimitError) as exc_info: + await adapter._rpc("send", {}, raise_on_rate_limit=True) + + assert exc_info.value.retry_after is None + + @pytest.mark.asyncio + async def test_raises_on_retry_later_inside_attachment_invalid(self, monkeypatch): + """Production case: 429 during attachment upload surfaces as + AttachmentInvalidException → UnexpectedErrorException (code + -32603), with the libsignal-net 'Retry after N seconds' + message embedded. _rpc must still detect this as rate-limit + AND parse the seconds out of the message.""" + from gateway.platforms.signal import SignalRateLimitError + + adapter = _make_signal_adapter(monkeypatch) + _install_fake_client(adapter, { + "error": { + "code": -32603, + "message": ( + "Failed to send message: /home/max/sync/Memes/fengshui.jpeg: " + "org.signal.libsignal.net.RetryLaterException: Retry after 4 seconds " + "(AttachmentInvalidException) (UnexpectedErrorException)" + ), + "data": None, + }, + }) + + with pytest.raises(SignalRateLimitError) as exc_info: + await adapter._rpc("send", {}, raise_on_rate_limit=True) + + assert exc_info.value.retry_after == 4.0 + + +# --------------------------------------------------------------------------- +# send_multiple_images — chunking, pacing, rate-limit retry +# --------------------------------------------------------------------------- + + +def _make_image_files(tmp_path, count, prefix="img"): + """Materialize `count` tiny PNG files and return file:// URIs for them.""" + uris = [] + for i in range(count): + p = tmp_path / f"{prefix}_{i}.png" + p.write_bytes(b"\x89PNG" + b"\x00" * 32) + uris.append((f"file://{p}", "")) + return uris + + +def _stub_rpc_responses(responses): + """Build an _rpc replacement that pops a response per call. + + Each entry in `responses` is either: + * a return value (dict / None) → returned to the caller, or + * an Exception subclass instance → raised. + Captures (params, kwargs) per call for inspection. + """ + captured = [] + queue = list(responses) + + async def mock_rpc(method, params, rpc_id=None, **kwargs): + captured.append({"method": method, "params": dict(params), "kwargs": kwargs}) + await asyncio.sleep(0) + if not queue: + raise AssertionError("Unexpected extra _rpc call") + item = queue.pop(0) + if isinstance(item, BaseException): + raise item + return item + + return mock_rpc, captured + + +def _patch_scheduler_sleep(monkeypatch, capture: list): + """Capture sleeps inside the scheduler so tests don't actually wait. + Zero-second sleeps (e.g. event-loop yields from mock RPCs) are + delegated to the real asyncio.sleep so they don't pollute the + capture list.""" + _real_sleep = asyncio.sleep + offset = [0.0] + + async def fake_sleep(seconds): + if seconds > 0: + capture.append(seconds) + offset[0] += seconds + else: + await _real_sleep(0) + + monkeypatch.setattr( + "gateway.platforms.signal_rate_limit.asyncio.sleep", fake_sleep + ) + monkeypatch.setattr( + "gateway.platforms.signal_rate_limit.time.monotonic", lambda: offset[0] + ) + + +class TestSignalSendMultipleImages: + @pytest.mark.asyncio + async def test_empty_list_is_noop(self, monkeypatch): + adapter = _make_signal_adapter(monkeypatch) + mock_rpc, captured = _stub_rpc_responses([]) + adapter._rpc = mock_rpc + adapter._stop_typing_indicator = AsyncMock() + + await adapter.send_multiple_images(chat_id="+155****4567", images=[]) + + assert captured == [] + adapter._stop_typing_indicator.assert_not_awaited() + + @pytest.mark.asyncio + async def test_all_bad_files_no_rpc(self, monkeypatch, tmp_path): + """If every image is missing/invalid, no RPC fires.""" + adapter = _make_signal_adapter(monkeypatch) + mock_rpc, captured = _stub_rpc_responses([]) + adapter._rpc = mock_rpc + adapter._stop_typing_indicator = AsyncMock() + + await adapter.send_multiple_images( + chat_id="+155****4567", + images=[(f"file://{tmp_path}/missing_a.png", ""), + (f"file://{tmp_path}/missing_b.png", "")], + ) + + assert captured == [] + + @pytest.mark.asyncio + async def test_single_batch_under_limit(self, monkeypatch, tmp_path): + adapter = _make_signal_adapter(monkeypatch) + mock_rpc, captured = _stub_rpc_responses([{"timestamp": 1}]) + adapter._rpc = mock_rpc + adapter._stop_typing_indicator = AsyncMock() + + images = _make_image_files(tmp_path, 5) + await adapter.send_multiple_images(chat_id="+155****4567", images=images) + + assert len(captured) == 1 + params = captured[0]["params"] + assert params["recipient"] == ["+155****4567"] + assert params["message"] == "" + assert len(params["attachments"]) == 5 + # raise_on_rate_limit must be opted into so the retry loop sees 429s + assert captured[0]["kwargs"].get("raise_on_rate_limit") is True + + @pytest.mark.asyncio + async def test_skips_bad_images_in_mixed_batch(self, monkeypatch, tmp_path): + adapter = _make_signal_adapter(monkeypatch) + mock_rpc, captured = _stub_rpc_responses([{"timestamp": 1}]) + adapter._rpc = mock_rpc + adapter._stop_typing_indicator = AsyncMock() + + good = _make_image_files(tmp_path, 2, prefix="ok") + bad = [(f"file://{tmp_path}/missing.png", "")] + await adapter.send_multiple_images( + chat_id="+155****4567", images=good[:1] + bad + good[1:] + ) + + assert len(captured) == 1 + assert len(captured[0]["params"]["attachments"]) == 2 + + @pytest.mark.asyncio + async def test_429_calibrates_scheduler_then_retries(self, monkeypatch, tmp_path): + """Server says retry_after=27 per token. After feedback, the + scheduler's refill_rate becomes 1/27. Re-acquiring n=3 tokens + therefore waits 3 × 27 = 81s — pulled from the server's + authoritative rate, not a `× 32` defensive multiplier.""" + from gateway.platforms.signal import SignalRateLimitError + + adapter = _make_signal_adapter(monkeypatch) + mock_rpc, captured = _stub_rpc_responses([ + SignalRateLimitError("Failed: rate limit", retry_after=27.0), + {"timestamp": 99}, + ]) + adapter._rpc = mock_rpc + adapter._stop_typing_indicator = AsyncMock() + + sleep_calls: list = [] + _patch_scheduler_sleep(monkeypatch, sleep_calls) + + images = _make_image_files(tmp_path, 3) + await adapter.send_multiple_images(chat_id="+155****4567", images=images) + + assert len(captured) == 2 # initial 429 + retry success + assert sleep_calls == [pytest.approx(3 * 27.0, abs=1.0)] + + @pytest.mark.asyncio + async def test_429_without_retry_after_uses_default_rate( + self, monkeypatch, tmp_path + ): + """signal-cli < v0.14.3 doesn't surface Retry-After. The + scheduler keeps its default refill rate (1 token / 4s), so a + retry of n=3 waits 12s.""" + from gateway.platforms.signal_rate_limit import ( + SIGNAL_RATE_LIMIT_DEFAULT_RETRY_AFTER, + SignalRateLimitError, + ) + + adapter = _make_signal_adapter(monkeypatch) + mock_rpc, captured = _stub_rpc_responses([ + SignalRateLimitError("[429] Rate Limited", retry_after=None), + {"timestamp": 99}, + ]) + adapter._rpc = mock_rpc + adapter._stop_typing_indicator = AsyncMock() + + sleep_calls: list = [] + _patch_scheduler_sleep(monkeypatch, sleep_calls) + + await adapter.send_multiple_images( + chat_id="+155****4567", + images=_make_image_files(tmp_path, 3), + ) + + assert len(captured) == 2 + assert sleep_calls == [ + pytest.approx(3 * SIGNAL_RATE_LIMIT_DEFAULT_RETRY_AFTER, abs=1.0) + ] + + @pytest.mark.asyncio + async def test_rate_limit_exhaust_continues_to_next_batch( + self, monkeypatch, tmp_path + ): + """Both attempts on batch 0 fail; batch 1 still gets a chance. + The scheduler's natural pacing on the next acquire stands in for + the old explicit cooldown.""" + from gateway.platforms.signal import SignalRateLimitError + + adapter = _make_signal_adapter(monkeypatch) + responses = [ + SignalRateLimitError("[429]", retry_after=4.0), + SignalRateLimitError("[429]", retry_after=4.0), + {"timestamp": 7}, + ] + mock_rpc, captured = _stub_rpc_responses(responses) + adapter._rpc = mock_rpc + adapter._stop_typing_indicator = AsyncMock() + + sleep_calls: list = [] + _patch_scheduler_sleep(monkeypatch, sleep_calls) + + images = _make_image_files(tmp_path, 33) # forces 2 batches + await adapter.send_multiple_images(chat_id="+155****4567", images=images) + + # 2 attempts on batch 0 + 1 on batch 1 + assert len(captured) == 3 + + @pytest.mark.asyncio + async def test_full_batch_emits_pacing_notice_for_followup( + self, monkeypatch, tmp_path + ): + """Two full batches of 32. Batch 1 needs 14 more tokens than the + 18 remaining after batch 0, so the scheduler sleeps 56s — + crossing the 10s user-facing pacing-notice threshold.""" + from gateway.platforms.signal import SIGNAL_MAX_ATTACHMENTS_PER_MSG + from gateway.platforms.signal_rate_limit import ( + SIGNAL_RATE_LIMIT_BUCKET_CAPACITY, + SIGNAL_RATE_LIMIT_DEFAULT_RETRY_AFTER + ) + + adapter = _make_signal_adapter(monkeypatch) + mock_rpc, captured = _stub_rpc_responses([ + {"timestamp": 1}, {"timestamp": 2}, + ]) + adapter._rpc = mock_rpc + adapter._stop_typing_indicator = AsyncMock() + adapter._notify_batch_pacing = AsyncMock() + + sleep_calls: list = [] + _patch_scheduler_sleep(monkeypatch, sleep_calls) + + images = _make_image_files(tmp_path, 64) + await adapter.send_multiple_images(chat_id="+155****4567", images=images) + + assert len(captured) == 2 + assert len(captured[0]["params"]["attachments"]) == SIGNAL_MAX_ATTACHMENTS_PER_MSG + assert len(captured[1]["params"]["attachments"]) == SIGNAL_MAX_ATTACHMENTS_PER_MSG + assert len(sleep_calls) == 1 + # Batch 1 deficit: 32 - (50 - 32) = 14 tokens × 4s = 56s + expected_wait = ( + SIGNAL_MAX_ATTACHMENTS_PER_MSG + - (SIGNAL_RATE_LIMIT_BUCKET_CAPACITY - SIGNAL_MAX_ATTACHMENTS_PER_MSG) + ) * SIGNAL_RATE_LIMIT_DEFAULT_RETRY_AFTER + assert sleep_calls[0] == pytest.approx(expected_wait, abs=1.0) + adapter._notify_batch_pacing.assert_awaited_once() + + @pytest.mark.asyncio + async def test_short_followup_wait_skips_pacing_notice( + self, monkeypatch, tmp_path + ): + """Batch 1 only needs 1 token but 18 remain after batch 0 + (50 capacity − 32 batch 0). No wait, no pacing notice.""" + adapter = _make_signal_adapter(monkeypatch) + mock_rpc, captured = _stub_rpc_responses([ + {"timestamp": 1}, {"timestamp": 2}, + ]) + adapter._rpc = mock_rpc + adapter._stop_typing_indicator = AsyncMock() + adapter._notify_batch_pacing = AsyncMock() + + sleep_calls: list = [] + _patch_scheduler_sleep(monkeypatch, sleep_calls) + + images = _make_image_files(tmp_path, 33) + await adapter.send_multiple_images(chat_id="+155****4567", images=images) + + assert len(captured) == 2 + assert len(sleep_calls) == 0 + adapter._notify_batch_pacing.assert_not_awaited() + + @pytest.mark.asyncio + async def test_single_batch_send_does_not_pace(self, monkeypatch, tmp_path): + """A single-batch send (≤32 attachments) leaves the scheduler + with tokens to spare — no follow-up acquire, no sleep.""" + adapter = _make_signal_adapter(monkeypatch) + mock_rpc, captured = _stub_rpc_responses([{"timestamp": 1}]) + adapter._rpc = mock_rpc + adapter._stop_typing_indicator = AsyncMock() + + sleep_calls: list = [] + _patch_scheduler_sleep(monkeypatch, sleep_calls) + + images = _make_image_files(tmp_path, 10) + await adapter.send_multiple_images(chat_id="+155****4567", images=images) + + assert len(captured) == 1 + assert sleep_calls == [] + + +class TestSignalRateLimitDetection: + """Coverage for the typed-code + substring detection helpers.""" + + def test_detect_typed_code(self): + from gateway.platforms.signal_rate_limit import ( + _is_signal_rate_limit_error, + SIGNAL_RPC_ERROR_RATELIMIT, + ) + err = {"code": SIGNAL_RPC_ERROR_RATELIMIT, "message": "any text"} + assert _is_signal_rate_limit_error(err) is True + + def test_detect_substring_fallback(self): + from gateway.platforms.signal import _is_signal_rate_limit_error + err = {"code": -32603, "message": "Failed: [429] Rate Limited (RateLimitException) (UnexpectedErrorException)"} + assert _is_signal_rate_limit_error(err) is True + + def test_detect_non_rate_limit(self): + from gateway.platforms.signal import _is_signal_rate_limit_error + err = {"code": -32603, "message": "UntrustedIdentityException"} + assert _is_signal_rate_limit_error(err) is False + + def test_extract_retry_after_from_results(self): + from gateway.platforms.signal import _extract_retry_after_seconds + err = { + "code": -5, + "message": "Failed to send message due to rate limiting", + "data": { + "response": { + "timestamp": 0, + "results": [ + {"type": "RATE_LIMIT_FAILURE", "retryAfterSeconds": 30}, + {"type": "RATE_LIMIT_FAILURE", "retryAfterSeconds": 45}, + ], + } + }, + } + assert _extract_retry_after_seconds(err) == 45.0 + + def test_extract_retry_after_missing(self): + """Old signal-cli builds don't expose retryAfterSeconds — return None.""" + from gateway.platforms.signal import _extract_retry_after_seconds + err = {"code": -32603, "message": "[429] Rate Limited"} + assert _extract_retry_after_seconds(err) is None + + def test_detect_retry_later_exception_substring(self): + """libsignal-net's RetryLaterException leaks through as + AttachmentInvalidException → UnexpectedErrorException when the + rate-limit fires inside attachment upload. Detect it by substring.""" + from gateway.platforms.signal import _is_signal_rate_limit_error + err = { + "code": -32603, + "message": ( + "Failed to send message: /home/max/sync/Memes/fengshui.jpeg: " + "org.signal.libsignal.net.RetryLaterException: Retry after 4 seconds " + "(AttachmentInvalidException) (UnexpectedErrorException)" + ), + } + assert _is_signal_rate_limit_error(err) is True + + def test_extract_retry_after_parses_message_string(self): + """When the structured field is missing, parse the seconds out + of the human 'Retry after N seconds' substring.""" + from gateway.platforms.signal import _extract_retry_after_seconds + err = { + "code": -32603, + "message": ( + "Failed to send message: /home/max/sync/Memes/fengshui.jpeg: " + "org.signal.libsignal.net.RetryLaterException: Retry after 4 seconds " + "(AttachmentInvalidException) (UnexpectedErrorException)" + ), + } + assert _extract_retry_after_seconds(err) == 4.0 + + +class TestSignalSendTimeout: + """Timeout scaling for batched attachment sends.""" + + def test_zero_attachments_uses_default(self): + from gateway.platforms.signal import _signal_send_timeout + assert _signal_send_timeout(0) == 30.0 + + def test_floor_at_60s(self): + from gateway.platforms.signal import _signal_send_timeout + # Few attachments (would be 5×N=5s) should still get 60s floor. + assert _signal_send_timeout(1) == 60.0 + assert _signal_send_timeout(5) == 60.0 + + def test_scales_with_batch_size(self): + from gateway.platforms.signal import _signal_send_timeout + # 32 attachments × 5s = 160s; ought to comfortably outlast a + # serial upload of an attachment-heavy batch. + assert _signal_send_timeout(32) == 160.0 diff --git a/tests/gateway/test_signal_rate_limit.py b/tests/gateway/test_signal_rate_limit.py new file mode 100644 index 0000000000..963f8b9303 --- /dev/null +++ b/tests/gateway/test_signal_rate_limit.py @@ -0,0 +1,233 @@ +"""Tests for the SignalAttachmentScheduler token-bucket simulator.""" +import asyncio +import time + +import pytest + +from gateway.platforms.signal_rate_limit import ( + SIGNAL_MAX_ATTACHMENTS_PER_MSG, + SIGNAL_RATE_LIMIT_BUCKET_CAPACITY, + SIGNAL_RATE_LIMIT_DEFAULT_RETRY_AFTER, + SignalAttachmentScheduler, + get_scheduler, + _reset_scheduler, +) + + +@pytest.fixture(autouse=True) +def _reset_signal_scheduler(): + """Drop the process-wide scheduler so each test gets a clean bucket.""" + _reset_scheduler() + yield + _reset_scheduler() + + +def _patch_sleep_and_time(monkeypatch, capture: list): + """Replace asyncio.sleep inside the scheduler module so tests don't + actually wait and advances time.monotonic to simulate time passing. + Captures the requested duration per call.""" + offset = 0.0 + async def _fake_sleep(seconds): + capture.append(seconds) + nonlocal offset + offset += seconds + + monkeypatch.setattr( + "gateway.platforms.signal_rate_limit.asyncio.sleep", _fake_sleep + ) + monkeypatch.setattr( + "gateway.platforms.signal_rate_limit.time.monotonic", lambda: offset + ) + + +class TestSchedulerInitialState: + def test_default_capacity_matches_signal_cap(self): + s = SignalAttachmentScheduler() + assert s.capacity == SIGNAL_RATE_LIMIT_BUCKET_CAPACITY + + def test_default_refill_rate_from_default_retry_after(self): + s = SignalAttachmentScheduler() + assert s.refill_rate == pytest.approx(1.0 / SIGNAL_RATE_LIMIT_DEFAULT_RETRY_AFTER) + + def test_starts_full(self): + s = SignalAttachmentScheduler() + assert s.tokens == s.capacity + + +class TestEstimateWait: + def test_zero_when_bucket_has_enough(self): + s = SignalAttachmentScheduler() + assert s.estimate_wait(10) == 0.0 + assert s.estimate_wait(int(s.capacity)) == 0.0 + + def test_proportional_to_deficit_when_empty(self, monkeypatch): + """Freeze monotonic so estimate_wait doesn't see fractional refill.""" + s = SignalAttachmentScheduler() + s.tokens = 0.0 + frozen = s.last_refill + monkeypatch.setattr( + "gateway.platforms.signal_rate_limit.time.monotonic", lambda: frozen + ) + # 32 tokens at 0.25 tokens/sec = 128s + assert s.estimate_wait(32) == pytest.approx(32 / s.refill_rate) + assert s.estimate_wait(1) == pytest.approx(1 / s.refill_rate) + + +class TestAcquire: + @pytest.mark.asyncio + async def test_acquire_zero_is_noop(self, monkeypatch): + sleeps: list = [] + _patch_sleep_and_time(monkeypatch, sleeps) + s = SignalAttachmentScheduler() + original = s.tokens + wait = await s.acquire(0) + assert wait == 0.0 + assert sleeps == [] + assert s.tokens == original + + @pytest.mark.asyncio + async def test_acquire_within_capacity_no_sleep(self, monkeypatch): + sleeps: list = [] + _patch_sleep_and_time(monkeypatch, sleeps) + + s = SignalAttachmentScheduler() + wait = await s.acquire(10) + await s.report_rpc_duration(0.001, 10) # actually deduct tokens + + assert wait == 0.0 + assert sleeps == [] + assert s.tokens == s.capacity - 10 + + @pytest.mark.asyncio + async def test_acquire_when_empty_sleeps_for_deficit(self, monkeypatch): + sleeps: list = [] + _patch_sleep_and_time(monkeypatch, sleeps) + s = SignalAttachmentScheduler() + + s.tokens = 0.0 + wait = await s.acquire(32) + await s.report_rpc_duration(1e-12, 32) + + # 32 tokens at default 0.25 tokens/sec = 128s + expected = 32 / s.refill_rate + assert wait == pytest.approx(expected) + assert sleeps == [pytest.approx(expected)] + # After sleep+acquire+rpc call, the bucket is empty again. + assert s.tokens == pytest.approx(0.0) + + @pytest.mark.asyncio + async def test_back_to_back_acquires_drain_then_wait(self, monkeypatch): + """Two sequential acquires of capacity each: first immediate, + second waits a full refill window.""" + sleeps: list = [] + _patch_sleep_and_time(monkeypatch, sleeps) + s = SignalAttachmentScheduler() + + await s.acquire(int(s.capacity)) + await s.report_rpc_duration(1e-12, int(s.capacity)) + + assert sleeps == [] # first batch had a full bucket + + await s.acquire(int(s.capacity)) + await s.report_rpc_duration(1e-12, int(s.capacity)) + # Second batch: no time elapsed (mocked sleep doesn't advance + # monotonic), tokens still 0 → wait the full capacity / rate. + assert sleeps == [pytest.approx(s.capacity / s.refill_rate)] + + @pytest.mark.asyncio + async def test_acquire_more_tokens_than_capacity(self, monkeypatch): + s = SignalAttachmentScheduler() + + with pytest.raises(Exception): + await s.acquire(int(s.capacity) + 1) + +class TestFeedback: + def test_calibrates_refill_rate_from_retry_after(self): + s = SignalAttachmentScheduler() + original = s.refill_rate + s.feedback(retry_after=42.0, n_attempted=1) + assert s.refill_rate == pytest.approx(1.0 / 42.0) + assert s.refill_rate != original + + def test_none_retry_after_leaves_rate(self): + s = SignalAttachmentScheduler() + original = s.refill_rate + s.feedback(retry_after=None, n_attempted=5) + assert s.refill_rate == original + + def test_zeros_tokens(self): + s = SignalAttachmentScheduler() + assert s.tokens > 0 + s.feedback(retry_after=4.0, n_attempted=1) + assert s.tokens == 0.0 + + @pytest.mark.asyncio + async def test_acquire_after_feedback_uses_calibrated_rate(self, monkeypatch): + """signal-cli ≥v0.14.3: server says 'retry_after=42 for one + token' → next acquire(1) waits 42s. Drops the old defensive + ``retry_after * 32`` heuristic in favor of the server's + authoritative per-token value.""" + sleeps: list = [] + _patch_sleep_and_time(monkeypatch, sleeps) + s = SignalAttachmentScheduler() + + # Initial acquire empties enough; 429 fires. + await s.acquire(1) + s.feedback(retry_after=42.0, n_attempted=1) + + # Re-acquire: bucket empty, calibrated rate = 1/42. + await s.acquire(1) + assert sleeps == [pytest.approx(42.0)] + + +class TestRefillClamping: + def test_refill_does_not_exceed_capacity(self, monkeypatch): + """Even after a long elapsed window, refill clamps at capacity.""" + s = SignalAttachmentScheduler() + s.tokens = 0.0 + # Pretend a year passed. + monkeypatch.setattr( + "gateway.platforms.signal_rate_limit.time.monotonic", + lambda: s.last_refill + 365 * 24 * 3600, + ) + s._refill() + assert s.tokens == s.capacity + + +class TestFifoAcquire: + @pytest.mark.asyncio + async def test_concurrent_acquires_serialize(self, monkeypatch): + """Two coroutines acquiring full capacity each: the second waits + in the lock queue until the first finishes its bucket math + sleep. + Demonstrates the FIFO fairness across sessions.""" + sleeps: list = [] + _patch_sleep_and_time(monkeypatch, sleeps) + s = SignalAttachmentScheduler() + + results: list = [] + + async def worker(label: str): + wait = await s.acquire(int(s.capacity)) + await s.report_rpc_duration(1e-12, int(s.capacity)) + results.append((label, wait)) + + # Launch in order; FIFO means A finishes first, then B. + await asyncio.gather(worker("A"), worker("B")) + + assert [r[0] for r in results] == ["A", "B"] + # A had a full bucket (no wait). B waited a full refill. + assert results[0][1] == 0.0 + assert results[1][1] == pytest.approx(s.capacity / s.refill_rate) + + +class TestSingleton: + def test_get_scheduler_returns_same_instance(self): + s1 = get_scheduler() + s2 = get_scheduler() + assert s1 is s2 + + def test_reset_scheduler_yields_new_instance(self): + s1 = get_scheduler() + _reset_scheduler() + s2 = get_scheduler() + assert s1 is not s2 diff --git a/tests/tools/test_send_message_tool.py b/tests/tools/test_send_message_tool.py index ff539f63e3..48bf2568ac 100644 --- a/tests/tools/test_send_message_tool.py +++ b/tests/tools/test_send_message_tool.py @@ -8,12 +8,25 @@ from pathlib import Path from types import SimpleNamespace from unittest.mock import AsyncMock, MagicMock, patch +import pytest + + +@pytest.fixture(autouse=True) +def _reset_signal_scheduler(): + """Drop the process-wide attachment scheduler so each test gets a + fresh token bucket.""" + from gateway.platforms.signal_rate_limit import _reset_scheduler + _reset_scheduler() + yield + _reset_scheduler() + from gateway.config import Platform from tools.send_message_tool import ( _derive_forum_thread_name, _parse_target_ref, _send_discord, _send_matrix_via_adapter, + _send_signal, _send_telegram, _send_to_platform, send_message_tool, @@ -1621,3 +1634,361 @@ class TestForumProbeCache: assert result2["success"] is True # Only one session opened (thread creation) — no probe session this time # (verified by not raising from our side_effect exhaustion) + + +# --------------------------------------------------------------------------- +# _send_signal — chunking + 429 retry (mirrors gateway adapter behavior) +# --------------------------------------------------------------------------- + + +class _FakeSignalHttp: + """Stand-in for httpx.AsyncClient used as an async context manager. + + Pops a response from the queue per `post` call. Each entry is either + a dict (returned from .json()) or an exception instance (raised). + Captures (url, payload) per call. + """ + + def __init__(self, responses): + self.responses = list(responses) + self.calls = [] + + def __call__(self, *_a, **_kw): + return self + + async def __aenter__(self): + return self + + async def __aexit__(self, *_a): + return False + + async def post(self, url, json=None): + self.calls.append({"url": url, "payload": json}) + if not self.responses: + raise AssertionError("Unexpected extra POST") + item = self.responses.pop(0) + if isinstance(item, BaseException): + raise item + resp = SimpleNamespace( + raise_for_status=lambda: None, + json=lambda data=item: data, + ) + return resp + + +def _install_signal_http(monkeypatch, fake): + """Patch httpx.AsyncClient at the module level so the lazy import in + _send_signal picks it up. + """ + import httpx + monkeypatch.setattr(httpx, "AsyncClient", fake) + + +def _patch_sendmsg_sleep_and_time(monkeypatch, capture: list): + """Mock asyncio.sleep + time.monotonic in the signal_rate_limit + module so the scheduler's acquire loop sees synthetic time advancing + during sleep calls, and report_rpc_duration sees the same clock. + + Zero-second sleeps (event-loop yields from fake HTTP posts) are + delegated to the real asyncio.sleep so they don't pollute the + capture list. + """ + import asyncio as _aio + _real_sleep = _aio.sleep + offset = [0.0] + + async def fake_sleep(seconds): + if seconds > 0: + capture.append(seconds) + offset[0] += seconds + else: + await _real_sleep(0) + + monkeypatch.setattr( + "gateway.platforms.signal_rate_limit.asyncio.sleep", fake_sleep + ) + monkeypatch.setattr( + "gateway.platforms.signal_rate_limit.time.monotonic", lambda: offset[0] + ) + + +class TestSendSignalChunking: + def test_text_only_single_rpc(self, monkeypatch): + fake = _FakeSignalHttp([{"result": {"timestamp": 1}}]) + _install_signal_http(monkeypatch, fake) + + result = asyncio.run( + _send_signal( + {"http_url": "http://localhost:8080", "account": "+15551234567"}, + "+15557654321", + "hello", + ) + ) + + assert result == {"success": True, "platform": "signal", "chat_id": "+15557654321"} + assert len(fake.calls) == 1 + params = fake.calls[0]["payload"]["params"] + assert params["message"] == "hello" + assert "attachments" not in params + + def test_chunks_attachments_above_max(self, tmp_path, monkeypatch): + """33 attachments → 2 batches; text only on first batch. Batch 1 + only needs 1 token and 18 remain after batch 0, so no sleep.""" + from gateway.platforms.signal_rate_limit import ( + SIGNAL_MAX_ATTACHMENTS_PER_MSG, + ) + + paths = [] + for i in range(33): + p = tmp_path / f"img_{i}.png" + p.write_bytes(b"\x89PNG" + b"\x00" * 16) + paths.append((str(p), False)) + + fake = _FakeSignalHttp([ + {"result": {"timestamp": 1}}, # batch 0 + {"result": {"timestamp": 2}}, # batch 1 + ]) + _install_signal_http(monkeypatch, fake) + + sleep_calls = [] + _patch_sendmsg_sleep_and_time(monkeypatch, sleep_calls) + + result = asyncio.run( + _send_signal( + {"http_url": "http://localhost:8080", "account": "+15551234567"}, + "+15557654321", + "Caption goes here", + media_files=paths, + ) + ) + + assert result["success"] is True + assert len(fake.calls) == 2 + assert len(sleep_calls) == 0 + + first = fake.calls[0]["payload"]["params"] + assert first["message"] == "Caption goes here" + assert len(first["attachments"]) == SIGNAL_MAX_ATTACHMENTS_PER_MSG + + second = fake.calls[1]["payload"]["params"] + assert second["message"] == "" # caption only on batch 0 + assert len(second["attachments"]) == 33 - SIGNAL_MAX_ATTACHMENTS_PER_MSG + + def test_full_followup_batch_emits_pacing_notice(self, tmp_path, monkeypatch): + """64 attachments → 2 full batches. Batch 1 needs 14 more tokens + than the 18 remaining after batch 0 — 56s wait crossing the 10s + notice threshold.""" + from gateway.platforms.signal_rate_limit import ( + SIGNAL_MAX_ATTACHMENTS_PER_MSG, + SIGNAL_RATE_LIMIT_BUCKET_CAPACITY, + SIGNAL_RATE_LIMIT_DEFAULT_RETRY_AFTER, + ) + + paths = [] + for i in range(64): + p = tmp_path / f"img_{i}.png" + p.write_bytes(b"\x89PNG" + b"\x00" * 16) + paths.append((str(p), False)) + + fake = _FakeSignalHttp([ + {"result": {"timestamp": 1}}, # batch 0 + {"result": {"timestamp": 99}}, # pacing notice + {"result": {"timestamp": 2}}, # batch 1 + ]) + _install_signal_http(monkeypatch, fake) + + sleep_calls = [] + _patch_sendmsg_sleep_and_time(monkeypatch, sleep_calls) + + result = asyncio.run( + _send_signal( + {"http_url": "http://localhost:8080", "account": "+15551234567"}, + "+15557654321", + "", + media_files=paths, + ) + ) + + assert result["success"] is True + assert len(fake.calls) == 3 + notice = fake.calls[1]["payload"]["params"] + assert "More images coming" in notice["message"] + assert "attachments" not in notice + # Batch 1 deficit: 32 - (50 - 32) = 14 tokens × 4s = 56s + expected = ( + SIGNAL_MAX_ATTACHMENTS_PER_MSG + - (SIGNAL_RATE_LIMIT_BUCKET_CAPACITY - SIGNAL_MAX_ATTACHMENTS_PER_MSG) + ) * SIGNAL_RATE_LIMIT_DEFAULT_RETRY_AFTER + assert sleep_calls == [pytest.approx(expected, abs=1.0)] + + def test_429_with_retry_after_drives_exact_backoff(self, tmp_path, monkeypatch): + """signal-cli ≥ v0.14.3 surfaces Retry-After under + error.data.response.results[*].retryAfterSeconds. The scheduler + calibrates its refill rate from that value; the retry of n=1 + sleeps the per-token interval.""" + from gateway.platforms.signal_rate_limit import SIGNAL_RPC_ERROR_RATELIMIT + + p = tmp_path / "img.png" + p.write_bytes(b"\x89PNG" + b"\x00" * 16) + + fake = _FakeSignalHttp([ + { + "error": { + "code": SIGNAL_RPC_ERROR_RATELIMIT, + "message": "Failed to send message due to rate limiting", + "data": { + "response": { + "timestamp": 0, + "results": [ + {"type": "RATE_LIMIT_FAILURE", "retryAfterSeconds": 42}, + ], + } + }, + } + }, + {"result": {"timestamp": 7}}, + ]) + _install_signal_http(monkeypatch, fake) + + sleep_calls = [] + _patch_sendmsg_sleep_and_time(monkeypatch, sleep_calls) + + result = asyncio.run( + _send_signal( + {"http_url": "http://localhost:8080", "account": "+15551234567"}, + "+15557654321", + "", + media_files=[(str(p), False)], + ) + ) + + assert result["success"] is True + assert len(fake.calls) == 2 # initial + retry + assert sleep_calls == [pytest.approx(42.0, abs=1.0)] + + def test_429_without_retry_after_falls_back_to_default(self, tmp_path, monkeypatch): + """Older signal-cli (< v0.14.3) doesn't surface Retry-After. + The scheduler keeps its default rate (1 token / 4s).""" + from gateway.platforms.signal_rate_limit import SIGNAL_RATE_LIMIT_DEFAULT_RETRY_AFTER + + p = tmp_path / "img.png" + p.write_bytes(b"\x89PNG" + b"\x00" * 16) + + fake = _FakeSignalHttp([ + {"error": {"message": "Failed: [429] Rate Limited"}}, + {"result": {"timestamp": 7}}, + ]) + _install_signal_http(monkeypatch, fake) + + sleep_calls = [] + _patch_sendmsg_sleep_and_time(monkeypatch, sleep_calls) + + result = asyncio.run( + _send_signal( + {"http_url": "http://localhost:8080", "account": "+15551234567"}, + "+15557654321", + "", + media_files=[(str(p), False)], + ) + ) + + assert result["success"] is True + assert sleep_calls == [pytest.approx(SIGNAL_RATE_LIMIT_DEFAULT_RETRY_AFTER, abs=1.0)] + + def test_429_retry_exhaust_continues_to_next_batch(self, tmp_path, monkeypatch): + """Both attempts on batch 0 fail; batch 1 still gets a chance. + The scheduler's natural pacing (no more cooldown gate) lets the + second batch through after its acquire wait.""" + from gateway.platforms.signal_rate_limit import SIGNAL_RPC_ERROR_RATELIMIT + + paths = [] + for i in range(33): # forces 2 batches + p = tmp_path / f"img_{i}.png" + p.write_bytes(b"\x89PNG" + b"\x00" * 16) + paths.append((str(p), False)) + + rate_limit_err = { + "error": { + "code": SIGNAL_RPC_ERROR_RATELIMIT, + "message": "Failed to send message due to rate limiting", + "data": { + "response": { + "timestamp": 0, + "results": [ + {"type": "RATE_LIMIT_FAILURE", "retryAfterSeconds": 4}, + ], + } + }, + } + } + + fake = _FakeSignalHttp([ + rate_limit_err, # batch 0, attempt 1 + rate_limit_err, # batch 0, attempt 2 (exhaust) + {"result": {"timestamp": 9}}, # batch 1 succeeds + ]) + _install_signal_http(monkeypatch, fake) + + sleep_calls = [] + _patch_sendmsg_sleep_and_time(monkeypatch, sleep_calls) + + result = asyncio.run( + _send_signal( + {"http_url": "http://localhost:8080", "account": "+15551234567"}, + "+15557654321", + "many", + media_files=paths, + ) + ) + + # Partial success: batch 0 lost but batch 1 went through. + assert result["success"] is True + assert "warnings" in result + assert any("rate-limited" in w for w in result["warnings"]) + # 2 attempts on batch 0 + 1 successful batch 1 = 3 calls + assert len(fake.calls) == 3 + + def test_non_rate_limit_error_returns_immediately(self, tmp_path, monkeypatch): + """A non-429 RPC error should not retry — it returns an error result.""" + p = tmp_path / "img.png" + p.write_bytes(b"\x89PNG" + b"\x00" * 16) + + fake = _FakeSignalHttp([ + {"error": {"message": "UntrustedIdentityException"}}, + ]) + _install_signal_http(monkeypatch, fake) + + result = asyncio.run( + _send_signal( + {"http_url": "http://localhost:8080", "account": "+15551234567"}, + "+15557654321", + "", + media_files=[(str(p), False)], + ) + ) + + assert "error" in result + assert "UntrustedIdentityException" in result["error"] + assert len(fake.calls) == 1 # no retry on non-429 + + def test_skipped_missing_files_reported_in_warnings(self, tmp_path, monkeypatch): + good = tmp_path / "ok.png" + good.write_bytes(b"\x89PNG" + b"\x00" * 16) + + fake = _FakeSignalHttp([{"result": {"timestamp": 1}}]) + _install_signal_http(monkeypatch, fake) + + result = asyncio.run( + _send_signal( + {"http_url": "http://localhost:8080", "account": "+15551234567"}, + "+15557654321", + "msg", + media_files=[(str(good), False), (str(tmp_path / "missing.png"), False)], + ) + ) + + assert result["success"] is True + assert "warnings" in result + # Only the existing file made it into the RPC + params = fake.calls[0]["payload"]["params"] + assert len(params["attachments"]) == 1 diff --git a/tools/send_message_tool.py b/tools/send_message_tool.py index 1a3ede29d6..62712e4581 100644 --- a/tools/send_message_tool.py +++ b/tools/send_message_tool.py @@ -1054,25 +1054,33 @@ async def _send_signal(extra, chat_id, message, media_files=None): """Send via signal-cli JSON-RPC API. Supports both text-only and text-with-attachments (images/audio/documents). - Attachments are sent as an 'attachments' array in the JSON-RPC params. + Multi-attachment sends are chunked into batches of + SIGNAL_MAX_ATTACHMENTS_PER_MSG and metered by the process-wide + SignalAttachmentScheduler — same bucket the gateway adapter uses, so + sends from this tool and inbound-driven replies share rate-limit state. """ try: import httpx except ImportError: return {"error": "httpx not installed"} + + from gateway.platforms.signal_rate_limit import ( + SIGNAL_BATCH_PACING_NOTICE_THRESHOLD, + SIGNAL_MAX_ATTACHMENTS_PER_MSG, + SIGNAL_RATE_LIMIT_MAX_ATTEMPTS, + _extract_retry_after_seconds, + _format_wait, + _is_signal_rate_limit_error, + _signal_send_timeout, + get_scheduler, + ) + try: http_url = extra.get("http_url", "http://127.0.0.1:8080").rstrip("/") account = extra.get("account", "") if not account: return {"error": "Signal account not configured"} - params = {"account": account, "message": message} - if chat_id.startswith("group:"): - params["groupId"] = chat_id[6:] - else: - params["recipient"] = [chat_id] - - # Add attachments if media_files are present valid_media = media_files or [] attachment_paths = [] for media_path, _is_voice in valid_media: @@ -1081,28 +1089,144 @@ async def _send_signal(extra, chat_id, message, media_files=None): else: logger.warning("Signal media file not found, skipping: %s", media_path) + # Chunk attachments. With no attachments we still emit one batch + # (text only). With attachments, the text rides on batch #0 so the + # caption isn't repeated across every chunk. if attachment_paths: - params["attachments"] = attachment_paths + att_batches = [ + attachment_paths[i:i + SIGNAL_MAX_ATTACHMENTS_PER_MSG] + for i in range(0, len(attachment_paths), SIGNAL_MAX_ATTACHMENTS_PER_MSG) + ] + else: + att_batches = [[]] - payload = { - "jsonrpc": "2.0", - "method": "send", - "params": params, - "id": f"send_{int(time.time() * 1000)}", - } + async def _post(batch_attachments, batch_message): + params = {"account": account, "message": batch_message} + if chat_id.startswith("group:"): + params["groupId"] = chat_id[6:] + else: + params["recipient"] = [chat_id] + if batch_attachments: + params["attachments"] = batch_attachments - async with httpx.AsyncClient(timeout=30.0) as client: - resp = await client.post(f"{http_url}/api/v1/rpc", json=payload) - resp.raise_for_status() - data = resp.json() - if "error" in data: - return _error(f"Signal RPC error: {data['error']}") + payload = { + "jsonrpc": "2.0", + "method": "send", + "params": params, + "id": f"send_{int(time.time() * 1000)}", + } + timeout = _signal_send_timeout(len(batch_attachments) if batch_attachments else 0) + async with httpx.AsyncClient(timeout=timeout) as client: + resp = await client.post(f"{http_url}/api/v1/rpc", json=payload) + resp.raise_for_status() + return resp.json() - # Return warning for any skipped media files - result = {"success": True, "platform": "signal", "chat_id": chat_id} - if len(attachment_paths) < len(valid_media): - result["warnings"] = [f"Some media files were skipped (not found on disk)"] - return result + async def _send_inline_notice(text: str) -> None: + """Best-effort one-shot RPC for a user-facing pacing notice.""" + notice_params = {"account": account, "message": text} + if chat_id.startswith("group:"): + notice_params["groupId"] = chat_id[6:] + else: + notice_params["recipient"] = [chat_id] + try: + async with httpx.AsyncClient(timeout=30.0) as _client: + await _client.post( + f"{http_url}/api/v1/rpc", + json={ + "jsonrpc": "2.0", + "method": "send", + "params": notice_params, + "id": f"notice_{int(time.time() * 1000)}", + }, + ) + except Exception as _e: + logger.warning("Signal: inline notice failed: %s", _e) + + scheduler = get_scheduler() + logger.info( + "send_message Signal: scheduler state=%s, %d attachment(s) in %d batch(es)", + scheduler.state(), len(attachment_paths), len(att_batches), + ) + failed_batches: list[int] = [] + for idx, att_batch in enumerate(att_batches): + n = len(att_batch) + if n > 0: + estimated = scheduler.estimate_wait(n) + if estimated >= SIGNAL_BATCH_PACING_NOTICE_THRESHOLD: + await _send_inline_notice( + f"(More images coming — pausing ~{_format_wait(estimated)} " + f"for Signal rate limit, batch {idx + 1}/{len(att_batches)}.)" + ) + + batch_message = message if idx == 0 else "" + + for attempt in range(1, SIGNAL_RATE_LIMIT_MAX_ATTEMPTS + 1): + try: + await scheduler.acquire(n) + _rpc_t0 = time.monotonic() + data = await _post(att_batch, batch_message) + _rpc_duration = time.monotonic() - _rpc_t0 + if "error" not in data: + await scheduler.report_rpc_duration(_rpc_duration, n) + break + + err = data["error"] + + if not _is_signal_rate_limit_error(err): + return _error(f"Signal RPC error on batch {idx + 1}/{len(att_batches)}: {err}") + + server_retry_after = _extract_retry_after_seconds(err) + scheduler.feedback(server_retry_after, n) + + if attempt >= SIGNAL_RATE_LIMIT_MAX_ATTEMPTS: + failed_batches.append(idx + 1) + logger.error( + "Signal: rate-limit retries exhausted on batch %d/%d " + "(%d attachments lost, server retry_after=%s)", + idx + 1, len(att_batches), n, + f"{server_retry_after:.0f}s" if server_retry_after else "unknown", + ) + break + logger.warning( + "Signal: rate-limited on batch %d/%d " + "(attempt %d/%d, server retry_after=%s); " + "scheduler will pace the retry", + idx + 1, len(att_batches), + attempt, SIGNAL_RATE_LIMIT_MAX_ATTEMPTS, + f"{server_retry_after:.0f}s" if server_retry_after else "unknown", + ) + except Exception as e: + if attempt >= SIGNAL_RATE_LIMIT_MAX_ATTEMPTS: + failed_batches.append(idx + 1) + logger.error( + "Signal: send error on batch %d/%d after %d attempts: %s", + idx + 1, len(att_batches), attempt, str(e) + ) + break + logger.warning( + "Signal: transient error on batch %d/%d (attempt %d/%d): %s; will retry", + idx + 1, len(att_batches), attempt, SIGNAL_RATE_LIMIT_MAX_ATTEMPTS, str(e) + ) + + warnings = [] + if len(attachment_paths) < len(valid_media): + warnings.append("Some media files were skipped (not found on disk)") + if failed_batches: + warnings.append( + f"Signal rate-limited {len(failed_batches)} batch(es) " + f"(#{', #'.join(str(b) for b in failed_batches)})" + ) + + if failed_batches and len(failed_batches) == len(att_batches): + return _error( + f"Signal: every batch ({len(att_batches)}) hit rate limit; " + f"no attachments delivered" + ) + + result = {"success": True, "platform": "signal", "chat_id": chat_id} + if warnings: + result["warnings"] = warnings + return result except Exception as e: return _error(f"Signal send failed: {e}") diff --git a/website/docs/user-guide/messaging/signal.md b/website/docs/user-guide/messaging/signal.md index f765381946..acb607cfeb 100644 --- a/website/docs/user-guide/messaging/signal.md +++ b/website/docs/user-guide/messaging/signal.md @@ -159,7 +159,7 @@ The adapter supports sending and receiving media in both directions. The agent can send media files via `MEDIA:` tags in responses. The following delivery methods are supported: -- **Images** — `send_image_file` sends PNG, JPEG, GIF, WebP as native Signal attachments +- **Images** — `send_multiple_images` and `send_image_file` send PNG, JPEG, GIF, WebP as native Signal attachments - **Voice** — `send_voice` sends audio files (OGG, MP3, WAV, M4A, AAC) as attachments - **Video** — `send_video` sends MP4 video files - **Documents** — `send_document` sends any file type (PDF, ZIP, etc.) @@ -167,6 +167,9 @@ The agent can send media files via `MEDIA:` tags in responses. The following del All outgoing media goes through Signal's standard attachment API. Unlike some platforms, Signal does not distinguish between voice messages and file attachments at the protocol level. Attachment size limit: **100 MB** (both directions). +:::warning +**Signal servers will rate-limit attachment uploads**, the adapter uses a scheduler for multiple image sending that batches images in groups of 32 and throttles uploads to match the Signal server policy. +::: ### Native Formatting, Reply Quotes, and Reactions