add basic twilio signature checking and tests

This commit is contained in:
Mariano Nicolini 2026-04-11 15:11:42 -03:00
parent af9caec44f
commit 5eae976c40
2 changed files with 267 additions and 5 deletions

View file

@ -10,6 +10,8 @@ Shares credentials with the optional telephony skill — same env vars:
Gateway-specific env vars:
- SMS_WEBHOOK_PORT (default 8080)
- SMS_WEBHOOK_HOST (default 0.0.0.0)
- SMS_WEBHOOK_URL (public URL for Twilio signature validation)
- SMS_ALLOWED_USERS (comma-separated E.164 phone numbers)
- SMS_ALLOW_ALL_USERS (true/false)
- SMS_HOME_CHANNEL (phone number for cron delivery)
@ -17,6 +19,8 @@ Gateway-specific env vars:
import asyncio
import base64
import hashlib
import hmac
import logging
import os
import re
@ -29,6 +33,7 @@ from gateway.platforms.base import (
MessageEvent,
MessageType,
SendResult,
is_network_accessible,
)
logger = logging.getLogger(__name__)
@ -36,6 +41,7 @@ logger = logging.getLogger(__name__)
TWILIO_API_BASE = "https://api.twilio.com/2010-04-01/Accounts"
MAX_SMS_LENGTH = 1600 # ~10 SMS segments
DEFAULT_WEBHOOK_PORT = 8080
DEFAULT_WEBHOOK_HOST = "0.0.0.0"
# E.164 phone number pattern for redaction
_PHONE_RE = re.compile(r"\+[1-9]\d{6,14}")
@ -77,6 +83,8 @@ class SmsAdapter(BasePlatformAdapter):
self._webhook_port: int = int(
os.getenv("SMS_WEBHOOK_PORT", str(DEFAULT_WEBHOOK_PORT))
)
self._webhook_host: str = os.getenv("SMS_WEBHOOK_HOST", DEFAULT_WEBHOOK_HOST)
self._webhook_url: str = os.getenv("SMS_WEBHOOK_URL", "").strip()
self._runner = None
self._http_session: Optional["aiohttp.ClientSession"] = None
@ -98,13 +106,21 @@ class SmsAdapter(BasePlatformAdapter):
logger.error("[sms] TWILIO_PHONE_NUMBER not set — cannot send replies")
return False
if not self._webhook_url:
logger.warning(
"[sms] SMS_WEBHOOK_URL not set — Twilio signature validation is "
"DISABLED. Any client that can reach port %d can inject messages. "
"Set SMS_WEBHOOK_URL to enable signature validation.",
self._webhook_port,
)
app = web.Application()
app.router.add_post("/webhooks/twilio", self._handle_webhook)
app.router.add_get("/health", lambda _: web.Response(text="ok"))
self._runner = web.AppRunner(app)
await self._runner.setup()
site = web.TCPSite(self._runner, "0.0.0.0", self._webhook_port)
site = web.TCPSite(self._runner, self._webhook_host, self._webhook_port)
await site.start()
self._http_session = aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=30),
@ -112,7 +128,8 @@ class SmsAdapter(BasePlatformAdapter):
self._running = True
logger.info(
"[sms] Twilio webhook server listening on port %d, from: %s",
"[sms] Twilio webhook server listening on %s:%d, from: %s",
self._webhook_host,
self._webhook_port,
_redact_phone(self._from_number),
)
@ -203,6 +220,28 @@ class SmsAdapter(BasePlatformAdapter):
content = re.sub(r"\n{3,}", "\n\n", content)
return content.strip()
# ------------------------------------------------------------------
# Twilio signature validation
# ------------------------------------------------------------------
def _validate_twilio_signature(
self, url: str, post_params: dict, signature: str,
) -> bool:
"""Validate ``X-Twilio-Signature`` header (HMAC-SHA1, base64).
Algorithm: https://www.twilio.com/docs/usage/security#validating-requests
"""
data_to_sign = url
for key in sorted(post_params.keys()):
data_to_sign += key + post_params[key]
mac = hmac.new(
self._auth_token.encode("utf-8"),
data_to_sign.encode("utf-8"),
hashlib.sha1,
)
computed = base64.b64encode(mac.digest()).decode("utf-8")
return hmac.compare_digest(computed, signature)
# ------------------------------------------------------------------
# Twilio webhook handler
# ------------------------------------------------------------------
@ -213,7 +252,7 @@ class SmsAdapter(BasePlatformAdapter):
try:
raw = await request.read()
# Twilio sends form-encoded data, not JSON
form = urllib.parse.parse_qs(raw.decode("utf-8"))
form = urllib.parse.parse_qs(raw.decode("utf-8"), keep_blank_values=True)
except Exception as e:
logger.error("[sms] webhook parse error: %s", e)
return web.Response(
@ -222,6 +261,27 @@ class SmsAdapter(BasePlatformAdapter):
status=400,
)
# Validate Twilio request signature when SMS_WEBHOOK_URL is configured
if self._webhook_url:
twilio_sig = request.headers.get("X-Twilio-Signature", "")
if not twilio_sig:
logger.warning("[sms] Rejected: missing X-Twilio-Signature header")
return web.Response(
text='<?xml version="1.0" encoding="UTF-8"?><Response></Response>',
content_type="application/xml",
status=403,
)
flat_params = {k: v[0] for k, v in form.items() if v}
if not self._validate_twilio_signature(
self._webhook_url, flat_params, twilio_sig
):
logger.warning("[sms] Rejected: invalid Twilio signature")
return web.Response(
text='<?xml version="1.0" encoding="UTF-8"?><Response></Response>',
content_type="application/xml",
status=403,
)
# Extract fields (parse_qs returns lists)
from_number = (form.get("From", [""]))[0].strip()
to_number = (form.get("To", [""]))[0].strip()