"""SMS (Twilio) platform adapter. Connects to the Twilio REST API for outbound SMS and runs an aiohttp webhook server to receive inbound messages. Shares credentials with the optional telephony skill — same env vars: - TWILIO_ACCOUNT_SID - TWILIO_AUTH_TOKEN - TWILIO_PHONE_NUMBER (E.164 from-number, e.g. +15551234567) Gateway-specific env vars: - SMS_WEBHOOK_PORT (default 8080) - SMS_WEBHOOK_HOST (default 0.0.0.0) - SMS_WEBHOOK_URL (public URL for Twilio signature validation — required) - SMS_INSECURE_NO_SIGNATURE (true to disable signature validation — dev only) - SMS_ALLOWED_USERS (comma-separated E.164 phone numbers) - SMS_ALLOW_ALL_USERS (true/false) - SMS_HOME_CHANNEL (phone number for cron delivery) """ import asyncio import base64 import hashlib import hmac import logging import os import urllib.parse from typing import Any, Dict, Optional, TYPE_CHECKING if TYPE_CHECKING: import aiohttp from gateway.config import Platform, PlatformConfig from gateway.platforms.base import ( BasePlatformAdapter, MessageEvent, MessageType, SendResult, ) from gateway.platforms.helpers import redact_phone, strip_markdown logger = logging.getLogger(__name__) TWILIO_API_BASE = "https://api.twilio.com/2010-04-01/Accounts" MAX_SMS_LENGTH = 1600 # ~10 SMS segments DEFAULT_WEBHOOK_PORT = 8080 DEFAULT_WEBHOOK_HOST = "0.0.0.0" def check_sms_requirements() -> bool: """Check if SMS adapter dependencies are available.""" try: import aiohttp # noqa: F401 except ImportError: return False return bool(os.getenv("TWILIO_ACCOUNT_SID") and os.getenv("TWILIO_AUTH_TOKEN")) class SmsAdapter(BasePlatformAdapter): """ Twilio SMS <-> Hermes gateway adapter. Each inbound phone number gets its own Hermes session (multi-tenant). Replies are always sent from the configured TWILIO_PHONE_NUMBER. """ MAX_MESSAGE_LENGTH = MAX_SMS_LENGTH def __init__(self, config: PlatformConfig): super().__init__(config, Platform.SMS) self._account_sid: str = os.environ["TWILIO_ACCOUNT_SID"] self._auth_token: str = os.environ["TWILIO_AUTH_TOKEN"] self._from_number: str = os.getenv("TWILIO_PHONE_NUMBER", "") self._webhook_port: int = int( os.getenv("SMS_WEBHOOK_PORT", str(DEFAULT_WEBHOOK_PORT)) ) self._webhook_host: str = os.getenv("SMS_WEBHOOK_HOST", DEFAULT_WEBHOOK_HOST) self._webhook_url: str = os.getenv("SMS_WEBHOOK_URL", "").strip() self._runner = None self._http_session: Optional["aiohttp.ClientSession"] = None def _basic_auth_header(self) -> str: """Build HTTP Basic auth header value for Twilio.""" creds = f"{self._account_sid}:{self._auth_token}" encoded = base64.b64encode(creds.encode("ascii")).decode("ascii") return f"Basic {encoded}" # ------------------------------------------------------------------ # Required abstract methods # ------------------------------------------------------------------ async def connect(self) -> bool: import aiohttp from aiohttp import web if not self._from_number: logger.error("[sms] TWILIO_PHONE_NUMBER not set — cannot send replies") return False insecure_no_sig = os.getenv("SMS_INSECURE_NO_SIGNATURE", "").lower() == "true" if not self._webhook_url and not insecure_no_sig: logger.error( "[sms] Refusing to start: SMS_WEBHOOK_URL is required for Twilio " "signature validation. Set it to the public URL configured in your " "Twilio console (e.g. https://example.com/webhooks/twilio). " "For local development without validation, set " "SMS_INSECURE_NO_SIGNATURE=true (NOT recommended for production).", ) return False if insecure_no_sig and not self._webhook_url: logger.warning( "[sms] SMS_INSECURE_NO_SIGNATURE=true — Twilio signature validation " "is DISABLED. Any client that can reach port %d can inject messages. " "Do NOT use this in production.", self._webhook_port, ) app = web.Application() app.router.add_post("/webhooks/twilio", self._handle_webhook) app.router.add_get("/health", lambda _: web.Response(text="ok")) self._runner = web.AppRunner(app) await self._runner.setup() site = web.TCPSite(self._runner, self._webhook_host, self._webhook_port) await site.start() self._http_session = aiohttp.ClientSession( timeout=aiohttp.ClientTimeout(total=30), ) self._running = True logger.info( "[sms] Twilio webhook server listening on %s:%d, from: %s", self._webhook_host, self._webhook_port, redact_phone(self._from_number), ) return True async def disconnect(self) -> None: if self._http_session: await self._http_session.close() self._http_session = None if self._runner: await self._runner.cleanup() self._runner = None self._running = False logger.info("[sms] Disconnected") async def send( self, chat_id: str, content: str, reply_to: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None, ) -> SendResult: import aiohttp formatted = self.format_message(content) chunks = self.truncate_message(formatted) last_result = SendResult(success=True) url = f"{TWILIO_API_BASE}/{self._account_sid}/Messages.json" headers = { "Authorization": self._basic_auth_header(), } session = self._http_session or aiohttp.ClientSession( timeout=aiohttp.ClientTimeout(total=30), ) try: for chunk in chunks: form_data = aiohttp.FormData() form_data.add_field("From", self._from_number) form_data.add_field("To", chat_id) form_data.add_field("Body", chunk) try: async with session.post(url, data=form_data, headers=headers) as resp: body = await resp.json() if resp.status >= 400: error_msg = body.get("message", str(body)) logger.error( "[sms] send failed to %s: %s %s", redact_phone(chat_id), resp.status, error_msg, ) return SendResult( success=False, error=f"Twilio {resp.status}: {error_msg}", ) msg_sid = body.get("sid", "") last_result = SendResult(success=True, message_id=msg_sid) except Exception as e: logger.error("[sms] send error to %s: %s", redact_phone(chat_id), e) return SendResult(success=False, error=str(e)) finally: # Close session only if we created a fallback (no persistent session) if not self._http_session and session: await session.close() return last_result async def get_chat_info(self, chat_id: str) -> Dict[str, Any]: return {"name": chat_id, "type": "dm"} # ------------------------------------------------------------------ # SMS-specific formatting # ------------------------------------------------------------------ def format_message(self, content: str) -> str: """Strip markdown — SMS renders it as literal characters.""" return strip_markdown(content) # ------------------------------------------------------------------ # Twilio signature validation # ------------------------------------------------------------------ def _validate_twilio_signature( self, url: str, post_params: dict, signature: str, ) -> bool: """Validate ``X-Twilio-Signature`` header (HMAC-SHA1, base64). Tries both with and without the default port for the URL scheme, since Twilio may sign with either variant. Algorithm: https://www.twilio.com/docs/usage/security#validating-requests """ if self._check_signature(url, post_params, signature): return True variant = self._port_variant_url(url) if variant and self._check_signature(variant, post_params, signature): return True return False def _check_signature( self, url: str, post_params: dict, signature: str, ) -> bool: """Compute and compare a single Twilio signature.""" data_to_sign = url for key in sorted(post_params.keys()): data_to_sign += key + post_params[key] mac = hmac.new( self._auth_token.encode("utf-8"), data_to_sign.encode("utf-8"), hashlib.sha1, ) computed = base64.b64encode(mac.digest()).decode("utf-8") return hmac.compare_digest(computed, signature) @staticmethod def _port_variant_url(url: str) -> str | None: """Return the URL with the default port toggled, or None. Only toggles default ports (443 for https, 80 for http). Non-standard ports are never modified. """ parsed = urllib.parse.urlparse(url) default_ports = {"https": 443, "http": 80} default_port = default_ports.get(parsed.scheme) if default_port is None: return None if parsed.port == default_port: # Has explicit default port → strip it return urllib.parse.urlunparse( (parsed.scheme, parsed.hostname, parsed.path, parsed.params, parsed.query, parsed.fragment) ) elif parsed.port is None: # No port → add default netloc = f"{parsed.hostname}:{default_port}" return urllib.parse.urlunparse( (parsed.scheme, netloc, parsed.path, parsed.params, parsed.query, parsed.fragment) ) # Non-standard port — no variant return None # ------------------------------------------------------------------ # Twilio webhook handler # ------------------------------------------------------------------ async def _handle_webhook(self, request) -> "aiohttp.web.Response": from aiohttp import web try: raw = await request.read() # Twilio sends form-encoded data, not JSON form = urllib.parse.parse_qs(raw.decode("utf-8"), keep_blank_values=True) except Exception as e: logger.error("[sms] webhook parse error: %s", e) return web.Response( text='', content_type="application/xml", status=400, ) # Validate Twilio request signature when SMS_WEBHOOK_URL is configured if self._webhook_url: twilio_sig = request.headers.get("X-Twilio-Signature", "") if not twilio_sig: logger.warning("[sms] Rejected: missing X-Twilio-Signature header") return web.Response( text='', content_type="application/xml", status=403, ) flat_params = {k: v[0] for k, v in form.items() if v} if not self._validate_twilio_signature( self._webhook_url, flat_params, twilio_sig ): logger.warning("[sms] Rejected: invalid Twilio signature") return web.Response( text='', content_type="application/xml", status=403, ) # Extract fields (parse_qs returns lists) from_number = (form.get("From", [""]))[0].strip() to_number = (form.get("To", [""]))[0].strip() text = (form.get("Body", [""]))[0].strip() message_sid = (form.get("MessageSid", [""]))[0].strip() if not from_number or not text: return web.Response( text='', content_type="application/xml", ) # Ignore messages from our own number (echo prevention) if from_number == self._from_number: logger.debug("[sms] ignoring echo from own number %s", redact_phone(from_number)) return web.Response( text='', content_type="application/xml", ) logger.info( "[sms] inbound from %s -> %s: %s", redact_phone(from_number), redact_phone(to_number), text[:80], ) source = self.build_source( chat_id=from_number, chat_name=from_number, chat_type="dm", user_id=from_number, user_name=from_number, ) event = MessageEvent( text=text, message_type=MessageType.TEXT, source=source, raw_message=form, message_id=message_sid, ) # Non-blocking: Twilio expects a fast response task = asyncio.create_task(self.handle_message(event)) self._background_tasks.add(task) task.add_done_callback(self._background_tasks.discard) # Return empty TwiML — we send replies via the REST API, not inline TwiML return web.Response( text='', content_type="application/xml", )