From 5eae976c4039a6ff8d42c85e113ba4cc182a9015 Mon Sep 17 00:00:00 2001 From: Mariano Nicolini Date: Sat, 11 Apr 2026 15:11:42 -0300 Subject: [PATCH] add basic twilio signature checking and tests --- gateway/platforms/sms.py | 66 +++++++++++- tests/gateway/test_sms.py | 206 +++++++++++++++++++++++++++++++++++++- 2 files changed, 267 insertions(+), 5 deletions(-) diff --git a/gateway/platforms/sms.py b/gateway/platforms/sms.py index a0760199ba..3a55c31c85 100644 --- a/gateway/platforms/sms.py +++ b/gateway/platforms/sms.py @@ -10,6 +10,8 @@ Shares credentials with the optional telephony skill — same env vars: Gateway-specific env vars: - SMS_WEBHOOK_PORT (default 8080) + - SMS_WEBHOOK_HOST (default 0.0.0.0) + - SMS_WEBHOOK_URL (public URL for Twilio signature validation) - SMS_ALLOWED_USERS (comma-separated E.164 phone numbers) - SMS_ALLOW_ALL_USERS (true/false) - SMS_HOME_CHANNEL (phone number for cron delivery) @@ -17,6 +19,8 @@ Gateway-specific env vars: import asyncio import base64 +import hashlib +import hmac import logging import os import re @@ -29,6 +33,7 @@ from gateway.platforms.base import ( MessageEvent, MessageType, SendResult, + is_network_accessible, ) logger = logging.getLogger(__name__) @@ -36,6 +41,7 @@ logger = logging.getLogger(__name__) TWILIO_API_BASE = "https://api.twilio.com/2010-04-01/Accounts" MAX_SMS_LENGTH = 1600 # ~10 SMS segments DEFAULT_WEBHOOK_PORT = 8080 +DEFAULT_WEBHOOK_HOST = "0.0.0.0" # E.164 phone number pattern for redaction _PHONE_RE = re.compile(r"\+[1-9]\d{6,14}") @@ -77,6 +83,8 @@ class SmsAdapter(BasePlatformAdapter): self._webhook_port: int = int( os.getenv("SMS_WEBHOOK_PORT", str(DEFAULT_WEBHOOK_PORT)) ) + self._webhook_host: str = os.getenv("SMS_WEBHOOK_HOST", DEFAULT_WEBHOOK_HOST) + self._webhook_url: str = os.getenv("SMS_WEBHOOK_URL", "").strip() self._runner = None self._http_session: Optional["aiohttp.ClientSession"] = None @@ -98,13 +106,21 @@ class SmsAdapter(BasePlatformAdapter): logger.error("[sms] TWILIO_PHONE_NUMBER not set — cannot send replies") return False + if not self._webhook_url: + logger.warning( + "[sms] SMS_WEBHOOK_URL not set — Twilio signature validation is " + "DISABLED. Any client that can reach port %d can inject messages. " + "Set SMS_WEBHOOK_URL to enable signature validation.", + self._webhook_port, + ) + app = web.Application() app.router.add_post("/webhooks/twilio", self._handle_webhook) app.router.add_get("/health", lambda _: web.Response(text="ok")) self._runner = web.AppRunner(app) await self._runner.setup() - site = web.TCPSite(self._runner, "0.0.0.0", self._webhook_port) + site = web.TCPSite(self._runner, self._webhook_host, self._webhook_port) await site.start() self._http_session = aiohttp.ClientSession( timeout=aiohttp.ClientTimeout(total=30), @@ -112,7 +128,8 @@ class SmsAdapter(BasePlatformAdapter): self._running = True logger.info( - "[sms] Twilio webhook server listening on port %d, from: %s", + "[sms] Twilio webhook server listening on %s:%d, from: %s", + self._webhook_host, self._webhook_port, _redact_phone(self._from_number), ) @@ -203,6 +220,28 @@ class SmsAdapter(BasePlatformAdapter): content = re.sub(r"\n{3,}", "\n\n", content) return content.strip() + # ------------------------------------------------------------------ + # Twilio signature validation + # ------------------------------------------------------------------ + + def _validate_twilio_signature( + self, url: str, post_params: dict, signature: str, + ) -> bool: + """Validate ``X-Twilio-Signature`` header (HMAC-SHA1, base64). + + Algorithm: https://www.twilio.com/docs/usage/security#validating-requests + """ + data_to_sign = url + for key in sorted(post_params.keys()): + data_to_sign += key + post_params[key] + mac = hmac.new( + self._auth_token.encode("utf-8"), + data_to_sign.encode("utf-8"), + hashlib.sha1, + ) + computed = base64.b64encode(mac.digest()).decode("utf-8") + return hmac.compare_digest(computed, signature) + # ------------------------------------------------------------------ # Twilio webhook handler # ------------------------------------------------------------------ @@ -213,7 +252,7 @@ class SmsAdapter(BasePlatformAdapter): try: raw = await request.read() # Twilio sends form-encoded data, not JSON - form = urllib.parse.parse_qs(raw.decode("utf-8")) + form = urllib.parse.parse_qs(raw.decode("utf-8"), keep_blank_values=True) except Exception as e: logger.error("[sms] webhook parse error: %s", e) return web.Response( @@ -222,6 +261,27 @@ class SmsAdapter(BasePlatformAdapter): status=400, ) + # Validate Twilio request signature when SMS_WEBHOOK_URL is configured + if self._webhook_url: + twilio_sig = request.headers.get("X-Twilio-Signature", "") + if not twilio_sig: + logger.warning("[sms] Rejected: missing X-Twilio-Signature header") + return web.Response( + text='', + content_type="application/xml", + status=403, + ) + flat_params = {k: v[0] for k, v in form.items() if v} + if not self._validate_twilio_signature( + self._webhook_url, flat_params, twilio_sig + ): + logger.warning("[sms] Rejected: invalid Twilio signature") + return web.Response( + text='', + content_type="application/xml", + status=403, + ) + # Extract fields (parse_qs returns lists) from_number = (form.get("From", [""]))[0].strip() to_number = (form.get("To", [""]))[0].strip() diff --git a/tests/gateway/test_sms.py b/tests/gateway/test_sms.py index 54c1edf237..cfe06df983 100644 --- a/tests/gateway/test_sms.py +++ b/tests/gateway/test_sms.py @@ -1,11 +1,14 @@ """Tests for SMS (Twilio) platform integration. Covers config loading, format/truncate, echo prevention, -requirements check, and toolset verification. +requirements check, toolset verification, and Twilio signature validation. """ +import base64 +import hashlib +import hmac import os -from unittest.mock import patch +from unittest.mock import AsyncMock, MagicMock, patch import pytest @@ -213,3 +216,202 @@ class TestSmsToolset: from tools.cronjob_tools import CRONJOB_SCHEMA deliver_desc = CRONJOB_SCHEMA["parameters"]["properties"]["deliver"]["description"] assert "sms" in deliver_desc.lower() + + +# ── Webhook host configuration ───────────────────────────────────── + +class TestWebhookHostConfig: + """Verify SMS_WEBHOOK_HOST env var and default.""" + + def test_default_host_is_all_interfaces(self): + from gateway.platforms.sms import DEFAULT_WEBHOOK_HOST + assert DEFAULT_WEBHOOK_HOST == "0.0.0.0" + + def test_host_from_env(self): + from gateway.platforms.sms import SmsAdapter + + env = { + "TWILIO_ACCOUNT_SID": "ACtest", + "TWILIO_AUTH_TOKEN": "tok", + "TWILIO_PHONE_NUMBER": "+15550001111", + "SMS_WEBHOOK_HOST": "127.0.0.1", + } + with patch.dict(os.environ, env): + pc = PlatformConfig(enabled=True, api_key="tok") + adapter = SmsAdapter(pc) + assert adapter._webhook_host == "127.0.0.1" + + def test_webhook_url_from_env(self): + from gateway.platforms.sms import SmsAdapter + + env = { + "TWILIO_ACCOUNT_SID": "ACtest", + "TWILIO_AUTH_TOKEN": "tok", + "TWILIO_PHONE_NUMBER": "+15550001111", + "SMS_WEBHOOK_URL": "https://example.com/webhooks/twilio", + } + with patch.dict(os.environ, env): + pc = PlatformConfig(enabled=True, api_key="tok") + adapter = SmsAdapter(pc) + assert adapter._webhook_url == "https://example.com/webhooks/twilio" + + def test_webhook_url_stripped(self): + from gateway.platforms.sms import SmsAdapter + + env = { + "TWILIO_ACCOUNT_SID": "ACtest", + "TWILIO_AUTH_TOKEN": "tok", + "TWILIO_PHONE_NUMBER": "+15550001111", + "SMS_WEBHOOK_URL": " https://example.com/webhooks/twilio ", + } + with patch.dict(os.environ, env): + pc = PlatformConfig(enabled=True, api_key="tok") + adapter = SmsAdapter(pc) + assert adapter._webhook_url == "https://example.com/webhooks/twilio" + + +# ── Twilio signature validation ──────────────────────────────────── + +def _compute_twilio_signature(auth_token, url, params): + """Reference implementation of Twilio's signature algorithm.""" + data_to_sign = url + for key in sorted(params.keys()): + data_to_sign += key + params[key] + mac = hmac.new( + auth_token.encode("utf-8"), + data_to_sign.encode("utf-8"), + hashlib.sha1, + ) + return base64.b64encode(mac.digest()).decode("utf-8") + + +class TestTwilioSignatureValidation: + """Unit tests for SmsAdapter._validate_twilio_signature.""" + + def _make_adapter(self, auth_token="test_token_secret"): + from gateway.platforms.sms import SmsAdapter + + env = { + "TWILIO_ACCOUNT_SID": "ACtest", + "TWILIO_AUTH_TOKEN": auth_token, + "TWILIO_PHONE_NUMBER": "+15550001111", + } + with patch.dict(os.environ, env): + pc = PlatformConfig(enabled=True, api_key=auth_token) + adapter = SmsAdapter(pc) + return adapter + + def test_valid_signature_accepted(self): + adapter = self._make_adapter() + url = "https://example.com/webhooks/twilio" + params = {"From": "+15551234567", "Body": "hello", "To": "+15550001111"} + sig = _compute_twilio_signature("test_token_secret", url, params) + assert adapter._validate_twilio_signature(url, params, sig) is True + + def test_invalid_signature_rejected(self): + adapter = self._make_adapter() + url = "https://example.com/webhooks/twilio" + params = {"From": "+15551234567", "Body": "hello"} + assert adapter._validate_twilio_signature(url, params, "badsig") is False + + def test_wrong_token_rejected(self): + adapter = self._make_adapter(auth_token="correct_token") + url = "https://example.com/webhooks/twilio" + params = {"From": "+15551234567", "Body": "hello"} + sig = _compute_twilio_signature("wrong_token", url, params) + assert adapter._validate_twilio_signature(url, params, sig) is False + + def test_params_sorted_by_key(self): + """Signature must be computed with params sorted alphabetically.""" + adapter = self._make_adapter() + url = "https://example.com/webhooks/twilio" + params = {"Zebra": "last", "Alpha": "first", "Middle": "mid"} + sig = _compute_twilio_signature("test_token_secret", url, params) + assert adapter._validate_twilio_signature(url, params, sig) is True + + def test_empty_param_values_included(self): + """Blank values must be included in signature computation.""" + adapter = self._make_adapter() + url = "https://example.com/webhooks/twilio" + params = {"From": "+15551234567", "Body": "", "SmsStatus": "received"} + sig = _compute_twilio_signature("test_token_secret", url, params) + assert adapter._validate_twilio_signature(url, params, sig) is True + + def test_url_matters(self): + """Different URLs produce different signatures.""" + adapter = self._make_adapter() + params = {"Body": "hello"} + sig = _compute_twilio_signature( + "test_token_secret", "https://a.com/webhooks/twilio", params + ) + assert adapter._validate_twilio_signature( + "https://b.com/webhooks/twilio", params, sig + ) is False + + +# ── Webhook signature enforcement (handler-level) ────────────────── + +class TestWebhookSignatureEnforcement: + """Integration tests for signature validation in _handle_webhook.""" + + def _make_adapter(self, webhook_url=""): + from gateway.platforms.sms import SmsAdapter + + env = { + "TWILIO_ACCOUNT_SID": "ACtest", + "TWILIO_AUTH_TOKEN": "test_token_secret", + "TWILIO_PHONE_NUMBER": "+15550001111", + "SMS_WEBHOOK_URL": webhook_url, + } + with patch.dict(os.environ, env): + pc = PlatformConfig(enabled=True, api_key="test_token_secret") + adapter = SmsAdapter(pc) + adapter._message_handler = AsyncMock() + return adapter + + def _mock_request(self, body, headers=None): + request = MagicMock() + request.read = AsyncMock(return_value=body) + request.headers = headers or {} + return request + + @pytest.mark.asyncio + async def test_no_webhook_url_skips_validation(self): + """Without SMS_WEBHOOK_URL, all requests are accepted.""" + adapter = self._make_adapter(webhook_url="") + body = b"From=%2B15551234567&To=%2B15550001111&Body=hello&MessageSid=SM123" + request = self._mock_request(body) + resp = await adapter._handle_webhook(request) + assert resp.status == 200 + + @pytest.mark.asyncio + async def test_missing_signature_returns_403(self): + adapter = self._make_adapter(webhook_url="https://example.com/webhooks/twilio") + body = b"From=%2B15551234567&To=%2B15550001111&Body=hello&MessageSid=SM123" + request = self._mock_request(body, headers={}) + resp = await adapter._handle_webhook(request) + assert resp.status == 403 + + @pytest.mark.asyncio + async def test_invalid_signature_returns_403(self): + adapter = self._make_adapter(webhook_url="https://example.com/webhooks/twilio") + body = b"From=%2B15551234567&To=%2B15550001111&Body=hello&MessageSid=SM123" + request = self._mock_request(body, headers={"X-Twilio-Signature": "invalid"}) + resp = await adapter._handle_webhook(request) + assert resp.status == 403 + + @pytest.mark.asyncio + async def test_valid_signature_returns_200(self): + webhook_url = "https://example.com/webhooks/twilio" + adapter = self._make_adapter(webhook_url=webhook_url) + params = { + "From": "+15551234567", + "To": "+15550001111", + "Body": "hello", + "MessageSid": "SM123", + } + sig = _compute_twilio_signature("test_token_secret", webhook_url, params) + body = b"From=%2B15551234567&To=%2B15550001111&Body=hello&MessageSid=SM123" + request = self._mock_request(body, headers={"X-Twilio-Signature": sig}) + resp = await adapter._handle_webhook(request) + assert resp.status == 200