diff --git a/gateway/config.py b/gateway/config.py index 935a50d74a..f93c6905a6 100644 --- a/gateway/config.py +++ b/gateway/config.py @@ -601,6 +601,14 @@ def _apply_env_overrides(config: GatewayConfig) -> None: config.platforms[Platform.TELEGRAM] = PlatformConfig() config.platforms[Platform.TELEGRAM].reply_to_mode = telegram_reply_mode + telegram_fallback_ips = os.getenv("TELEGRAM_FALLBACK_IPS", "") + if telegram_fallback_ips: + if Platform.TELEGRAM not in config.platforms: + config.platforms[Platform.TELEGRAM] = PlatformConfig() + config.platforms[Platform.TELEGRAM].extra["fallback_ips"] = [ + ip.strip() for ip in telegram_fallback_ips.split(",") if ip.strip() + ] + telegram_home = os.getenv("TELEGRAM_HOME_CHANNEL") if telegram_home and Platform.TELEGRAM in config.platforms: config.platforms[Platform.TELEGRAM].home_channel = HomeChannel( diff --git a/gateway/platforms/telegram.py b/gateway/platforms/telegram.py index 549d09ce3e..54787f25d5 100644 --- a/gateway/platforms/telegram.py +++ b/gateway/platforms/telegram.py @@ -11,7 +11,7 @@ import asyncio import logging import os import re -from typing import Dict, Optional, Any +from typing import Dict, List, Optional, Any logger = logging.getLogger(__name__) @@ -25,6 +25,7 @@ try: filters, ) from telegram.constants import ParseMode, ChatType + from telegram.request import HTTPXRequest TELEGRAM_AVAILABLE = True except ImportError: TELEGRAM_AVAILABLE = False @@ -34,6 +35,7 @@ except ImportError: Application = Any CommandHandler = Any TelegramMessageHandler = Any + HTTPXRequest = Any filters = None ParseMode = None ChatType = None @@ -59,6 +61,11 @@ from gateway.platforms.base import ( cache_document_from_bytes, SUPPORTED_DOCUMENT_TYPES, ) +from gateway.platforms.telegram_network import ( + TelegramFallbackTransport, + discover_fallback_ips, + parse_fallback_ip_env, +) def check_telegram_requirements() -> bool: @@ -138,6 +145,13 @@ class TelegramAdapter(BasePlatformAdapter): # DM Topics config from extra.dm_topics self._dm_topics_config: List[Dict[str, Any]] = self.config.extra.get("dm_topics", []) + def _fallback_ips(self) -> list[str]: + """Return validated fallback IPs from config (populated by _apply_env_overrides).""" + configured = self.config.extra.get("fallback_ips", []) if getattr(self.config, "extra", None) else [] + if isinstance(configured, str): + configured = configured.split(",") + return parse_fallback_ip_env(",".join(str(v) for v in configured) if configured else None) + @staticmethod def _looks_like_polling_conflict(error: Exception) -> bool: text = str(error).lower() @@ -474,7 +488,26 @@ class TelegramAdapter(BasePlatformAdapter): return False # Build the application - self._app = Application.builder().token(self.config.token).build() + builder = Application.builder().token(self.config.token) + fallback_ips = self._fallback_ips() + if not fallback_ips: + fallback_ips = await discover_fallback_ips() + logger.info( + "[%s] Auto-discovered Telegram fallback IPs: %s", + self.name, + ", ".join(fallback_ips), + ) + if fallback_ips: + logger.warning( + "[%s] Telegram fallback IPs active: %s", + self.name, + ", ".join(fallback_ips), + ) + transport = TelegramFallbackTransport(fallback_ips) + request = HTTPXRequest(httpx_kwargs={"transport": transport}) + get_updates_request = HTTPXRequest(httpx_kwargs={"transport": transport}) + builder = builder.request(request).get_updates_request(get_updates_request) + self._app = builder.build() self._bot = self._app.bot # Register handlers diff --git a/gateway/platforms/telegram_network.py b/gateway/platforms/telegram_network.py new file mode 100644 index 0000000000..7192369470 --- /dev/null +++ b/gateway/platforms/telegram_network.py @@ -0,0 +1,233 @@ +"""Telegram-specific network helpers. + +Provides a hostname-preserving fallback transport for networks where +api.telegram.org resolves to an endpoint that is unreachable from the current +host. The transport keeps the logical request host and TLS SNI as +api.telegram.org while retrying the TCP connection against one or more fallback +IPv4 addresses. +""" + +from __future__ import annotations + +import asyncio +import ipaddress +import logging +import socket +from typing import Iterable, Optional + +import httpx + +logger = logging.getLogger(__name__) + +_TELEGRAM_API_HOST = "api.telegram.org" + +# DNS-over-HTTPS providers used to discover Telegram API IPs that may differ +# from the (potentially unreachable) IP returned by the local system resolver. +_DOH_TIMEOUT = 4.0 # seconds — bounded so connect() isn't noticeably delayed + +_DOH_PROVIDERS: list[dict] = [ + { + "url": "https://dns.google/resolve", + "params": {"name": _TELEGRAM_API_HOST, "type": "A"}, + "headers": {}, + }, + { + "url": "https://cloudflare-dns.com/dns-query", + "params": {"name": _TELEGRAM_API_HOST, "type": "A"}, + "headers": {"Accept": "application/dns-json"}, + }, +] + +# Last-resort IPs when DoH is also blocked. These are stable Telegram Bot API +# endpoints in the 149.154.160.0/20 block (same seed used by OpenClaw). +_SEED_FALLBACK_IPS: list[str] = ["149.154.167.220"] + + +class TelegramFallbackTransport(httpx.AsyncBaseTransport): + """Retry Telegram Bot API requests via fallback IPs while preserving TLS/SNI. + + Requests continue to target https://api.telegram.org/... logically, but on + connect failures the underlying TCP connection is retried against a known + reachable IP. This is effectively the programmatic equivalent of + ``curl --resolve api.telegram.org:443:``. + """ + + def __init__(self, fallback_ips: Iterable[str], **transport_kwargs): + self._fallback_ips = [ip for ip in dict.fromkeys(_normalize_fallback_ips(fallback_ips))] + self._primary = httpx.AsyncHTTPTransport(**transport_kwargs) + self._fallbacks = { + ip: httpx.AsyncHTTPTransport(**transport_kwargs) for ip in self._fallback_ips + } + self._sticky_ip: Optional[str] = None + self._sticky_lock = asyncio.Lock() + + async def handle_async_request(self, request: httpx.Request) -> httpx.Response: + if request.url.host != _TELEGRAM_API_HOST or not self._fallback_ips: + return await self._primary.handle_async_request(request) + + sticky_ip = self._sticky_ip + attempt_order: list[Optional[str]] = [sticky_ip] if sticky_ip else [None] + for ip in self._fallback_ips: + if ip != sticky_ip: + attempt_order.append(ip) + + last_error: Exception | None = None + for ip in attempt_order: + candidate = request if ip is None else _rewrite_request_for_ip(request, ip) + transport = self._primary if ip is None else self._fallbacks[ip] + try: + response = await transport.handle_async_request(candidate) + if ip is not None and self._sticky_ip != ip: + async with self._sticky_lock: + if self._sticky_ip != ip: + self._sticky_ip = ip + logger.warning( + "[Telegram] Primary api.telegram.org path unreachable; using sticky fallback IP %s", + ip, + ) + return response + except Exception as exc: + last_error = exc + if not _is_retryable_connect_error(exc): + raise + if ip is None: + logger.warning( + "[Telegram] Primary api.telegram.org connection failed (%s); trying fallback IPs %s", + exc, + ", ".join(self._fallback_ips), + ) + continue + logger.warning("[Telegram] Fallback IP %s failed: %s", ip, exc) + continue + + assert last_error is not None + raise last_error + + async def aclose(self) -> None: + await self._primary.aclose() + for transport in self._fallbacks.values(): + await transport.aclose() + + +def _normalize_fallback_ips(values: Iterable[str]) -> list[str]: + normalized: list[str] = [] + for value in values: + raw = str(value).strip() + if not raw: + continue + try: + addr = ipaddress.ip_address(raw) + except ValueError: + logger.warning("Ignoring invalid Telegram fallback IP: %r", raw) + continue + if addr.version != 4: + logger.warning("Ignoring non-IPv4 Telegram fallback IP: %s", raw) + continue + normalized.append(str(addr)) + return normalized + + +def parse_fallback_ip_env(value: str | None) -> list[str]: + if not value: + return [] + parts = [part.strip() for part in value.split(",")] + return _normalize_fallback_ips(parts) + + +def _resolve_system_dns() -> set[str]: + """Return the IPv4 addresses that the OS resolver gives for api.telegram.org.""" + try: + results = socket.getaddrinfo(_TELEGRAM_API_HOST, 443, socket.AF_INET) + return {addr[4][0] for addr in results} + except Exception: + return set() + + +async def _query_doh_provider( + client: httpx.AsyncClient, provider: dict +) -> list[str]: + """Query one DoH provider and return A-record IPs.""" + try: + resp = await client.get( + provider["url"], params=provider["params"], headers=provider["headers"] + ) + resp.raise_for_status() + data = resp.json() + ips: list[str] = [] + for answer in data.get("Answer", []): + if answer.get("type") != 1: # A record + continue + raw = answer.get("data", "").strip() + try: + ipaddress.ip_address(raw) + ips.append(raw) + except ValueError: + continue + return ips + except Exception as exc: + logger.debug("DoH query to %s failed: %s", provider["url"], exc) + return [] + + +async def discover_fallback_ips() -> list[str]: + """Auto-discover Telegram API IPs via DNS-over-HTTPS. + + Resolves api.telegram.org through Google and Cloudflare DoH, collects all + unique IPs, and excludes the system-DNS-resolved IP (which is presumably + unreachable on this network). Falls back to a hardcoded seed list when DoH + is also unavailable. + """ + async with httpx.AsyncClient(timeout=httpx.Timeout(_DOH_TIMEOUT)) as client: + doh_tasks = [_query_doh_provider(client, p) for p in _DOH_PROVIDERS] + system_dns_task = asyncio.to_thread(_resolve_system_dns) + results = await asyncio.gather(system_dns_task, *doh_tasks, return_exceptions=True) + + # results[0] = system DNS IPs (set), results[1:] = DoH IP lists + system_ips: set[str] = results[0] if isinstance(results[0], set) else set() + + doh_ips: list[str] = [] + for r in results[1:]: + if isinstance(r, list): + doh_ips.extend(r) + + # Deduplicate preserving order, exclude system-DNS IPs + seen: set[str] = set() + candidates: list[str] = [] + for ip in doh_ips: + if ip not in seen and ip not in system_ips: + seen.add(ip) + candidates.append(ip) + + # Validate through existing normalization + validated = _normalize_fallback_ips(candidates) + + if validated: + logger.debug("Discovered Telegram fallback IPs via DoH: %s", ", ".join(validated)) + return validated + + logger.info( + "DoH discovery yielded no new IPs (system DNS: %s); using seed fallback IPs %s", + ", ".join(system_ips) or "unknown", + ", ".join(_SEED_FALLBACK_IPS), + ) + return list(_SEED_FALLBACK_IPS) + + +def _rewrite_request_for_ip(request: httpx.Request, ip: str) -> httpx.Request: + original_host = request.url.host or _TELEGRAM_API_HOST + url = request.url.copy_with(host=ip) + headers = request.headers.copy() + headers["host"] = original_host + extensions = dict(request.extensions) + extensions["sni_hostname"] = original_host + return httpx.Request( + method=request.method, + url=url, + headers=headers, + stream=request.stream, + extensions=extensions, + ) + + +def _is_retryable_connect_error(exc: Exception) -> bool: + return isinstance(exc, (httpx.ConnectTimeout, httpx.ConnectError)) diff --git a/tests/gateway/test_dm_topics.py b/tests/gateway/test_dm_topics.py index 98c6d4c063..168f1e817c 100644 --- a/tests/gateway/test_dm_topics.py +++ b/tests/gateway/test_dm_topics.py @@ -32,7 +32,7 @@ def _ensure_telegram_mock(): telegram_mod.constants.ChatType.CHANNEL = "channel" telegram_mod.constants.ChatType.PRIVATE = "private" - for name in ("telegram", "telegram.ext", "telegram.constants"): + for name in ("telegram", "telegram.ext", "telegram.constants", "telegram.request"): sys.modules.setdefault(name, telegram_mod) diff --git a/tests/gateway/test_send_image_file.py b/tests/gateway/test_send_image_file.py index 25a8417170..cb0e436739 100644 --- a/tests/gateway/test_send_image_file.py +++ b/tests/gateway/test_send_image_file.py @@ -76,7 +76,7 @@ def _ensure_telegram_mock(): telegram_mod.constants.ChatType.CHANNEL = "channel" telegram_mod.constants.ChatType.PRIVATE = "private" - for name in ("telegram", "telegram.ext", "telegram.constants"): + for name in ("telegram", "telegram.ext", "telegram.constants", "telegram.request"): sys.modules.setdefault(name, telegram_mod) diff --git a/tests/gateway/test_telegram_conflict.py b/tests/gateway/test_telegram_conflict.py index c96768de27..9f1074648a 100644 --- a/tests/gateway/test_telegram_conflict.py +++ b/tests/gateway/test_telegram_conflict.py @@ -20,7 +20,7 @@ def _ensure_telegram_mock(): telegram_mod.constants.ChatType.CHANNEL = "channel" telegram_mod.constants.ChatType.PRIVATE = "private" - for name in ("telegram", "telegram.ext", "telegram.constants"): + for name in ("telegram", "telegram.ext", "telegram.constants", "telegram.request"): sys.modules.setdefault(name, telegram_mod) @@ -29,6 +29,14 @@ _ensure_telegram_mock() from gateway.platforms.telegram import TelegramAdapter # noqa: E402 +@pytest.fixture(autouse=True) +def _no_auto_discovery(monkeypatch): + """Disable DoH auto-discovery so connect() uses the plain builder chain.""" + async def _noop(): + return [] + monkeypatch.setattr("gateway.platforms.telegram.discover_fallback_ips", _noop) + + @pytest.mark.asyncio async def test_connect_rejects_same_host_token_lock(monkeypatch): adapter = TelegramAdapter(PlatformConfig(enabled=True, token="secret-token")) diff --git a/tests/gateway/test_telegram_documents.py b/tests/gateway/test_telegram_documents.py index 0472bdbac9..11a8df5f88 100644 --- a/tests/gateway/test_telegram_documents.py +++ b/tests/gateway/test_telegram_documents.py @@ -45,7 +45,7 @@ def _ensure_telegram_mock(): telegram_mod.constants.ChatType.CHANNEL = "channel" telegram_mod.constants.ChatType.PRIVATE = "private" - for name in ("telegram", "telegram.ext", "telegram.constants"): + for name in ("telegram", "telegram.ext", "telegram.constants", "telegram.request"): sys.modules.setdefault(name, telegram_mod) diff --git a/tests/gateway/test_telegram_format.py b/tests/gateway/test_telegram_format.py index 446a3e1b96..7a50aded43 100644 --- a/tests/gateway/test_telegram_format.py +++ b/tests/gateway/test_telegram_format.py @@ -28,7 +28,7 @@ def _ensure_telegram_mock(): mod.constants.ChatType.SUPERGROUP = "supergroup" mod.constants.ChatType.CHANNEL = "channel" mod.constants.ChatType.PRIVATE = "private" - for name in ("telegram", "telegram.ext", "telegram.constants"): + for name in ("telegram", "telegram.ext", "telegram.constants", "telegram.request"): sys.modules.setdefault(name, mod) diff --git a/tests/gateway/test_telegram_network.py b/tests/gateway/test_telegram_network.py new file mode 100644 index 0000000000..7591ae85f2 --- /dev/null +++ b/tests/gateway/test_telegram_network.py @@ -0,0 +1,626 @@ +"""Tests for gateway.platforms.telegram_network – fallback transport layer. + +Background +---------- +api.telegram.org resolves to an IP (e.g. 149.154.166.110) that is unreachable +from some networks. The workaround: route TCP through a different IP in the +same Telegram-owned 149.154.160.0/20 block (e.g. 149.154.167.220) while +keeping TLS SNI and the Host header as api.telegram.org so Telegram's edge +servers still accept the request. This is the programmatic equivalent of: + + curl --resolve api.telegram.org:443:149.154.167.220 https://api.telegram.org/bot/getMe + +The TelegramFallbackTransport implements this: try the primary (DNS-resolved) +path first, and on ConnectTimeout / ConnectError fall through to configured +fallback IPs in order, then "stick" to whichever IP works. +""" + +import httpx +import pytest + +from gateway.platforms import telegram_network as tnet + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +class FakeTransport(httpx.AsyncBaseTransport): + """Records calls and raises / returns based on a host→action mapping.""" + + def __init__(self, calls, behavior): + self.calls = calls + self.behavior = behavior + self.closed = False + + async def handle_async_request(self, request: httpx.Request) -> httpx.Response: + self.calls.append( + { + "url_host": request.url.host, + "host_header": request.headers.get("host"), + "sni_hostname": request.extensions.get("sni_hostname"), + "path": request.url.path, + } + ) + action = self.behavior.get(request.url.host, "ok") + if action == "timeout": + raise httpx.ConnectTimeout("timed out") + if action == "connect_error": + raise httpx.ConnectError("connect error") + if isinstance(action, Exception): + raise action + return httpx.Response(200, request=request, text="ok") + + async def aclose(self) -> None: + self.closed = True + + +def _fake_transport_factory(calls, behavior): + """Returns a factory that creates FakeTransport instances.""" + instances = [] + + def factory(**kwargs): + t = FakeTransport(calls, behavior) + instances.append(t) + return t + + factory.instances = instances + return factory + + +def _telegram_request(path="/botTOKEN/getMe"): + return httpx.Request("GET", f"https://api.telegram.org{path}") + + +# ═══════════════════════════════════════════════════════════════════════════ +# IP parsing & validation +# ═══════════════════════════════════════════════════════════════════════════ + +class TestParseFallbackIpEnv: + def test_filters_invalid_and_ipv6(self, caplog): + ips = tnet.parse_fallback_ip_env("149.154.167.220, bad, 2001:67c:4e8:f004::9,149.154.167.220") + assert ips == ["149.154.167.220", "149.154.167.220"] + assert "Ignoring invalid Telegram fallback IP" in caplog.text + assert "Ignoring non-IPv4 Telegram fallback IP" in caplog.text + + def test_none_returns_empty(self): + assert tnet.parse_fallback_ip_env(None) == [] + + def test_empty_string_returns_empty(self): + assert tnet.parse_fallback_ip_env("") == [] + + def test_whitespace_only_returns_empty(self): + assert tnet.parse_fallback_ip_env(" , , ") == [] + + def test_single_valid_ip(self): + assert tnet.parse_fallback_ip_env("149.154.167.220") == ["149.154.167.220"] + + def test_multiple_valid_ips(self): + ips = tnet.parse_fallback_ip_env("149.154.167.220, 149.154.167.221") + assert ips == ["149.154.167.220", "149.154.167.221"] + + def test_rejects_leading_zeros(self, caplog): + """Leading zeros are ambiguous (octal?) so ipaddress rejects them.""" + ips = tnet.parse_fallback_ip_env("149.154.167.010") + assert ips == [] + assert "Ignoring invalid" in caplog.text + + +class TestNormalizeFallbackIps: + def test_deduplication_happens_at_transport_level(self): + """_normalize does not dedup; TelegramFallbackTransport.__init__ does.""" + raw = ["149.154.167.220", "149.154.167.220"] + assert tnet._normalize_fallback_ips(raw) == ["149.154.167.220", "149.154.167.220"] + + def test_empty_strings_skipped(self): + assert tnet._normalize_fallback_ips(["", " ", "149.154.167.220"]) == ["149.154.167.220"] + + +# ═══════════════════════════════════════════════════════════════════════════ +# Request rewriting +# ═══════════════════════════════════════════════════════════════════════════ + +class TestRewriteRequestForIp: + def test_preserves_host_and_sni(self): + request = _telegram_request() + rewritten = tnet._rewrite_request_for_ip(request, "149.154.167.220") + + assert rewritten.url.host == "149.154.167.220" + assert rewritten.headers["host"] == "api.telegram.org" + assert rewritten.extensions["sni_hostname"] == "api.telegram.org" + assert rewritten.url.path == "/botTOKEN/getMe" + + def test_preserves_method_and_path(self): + request = httpx.Request("POST", "https://api.telegram.org/botTOKEN/sendMessage") + rewritten = tnet._rewrite_request_for_ip(request, "149.154.167.220") + + assert rewritten.method == "POST" + assert rewritten.url.path == "/botTOKEN/sendMessage" + + +# ═══════════════════════════════════════════════════════════════════════════ +# Fallback transport – core behavior +# ═══════════════════════════════════════════════════════════════════════════ + +class TestFallbackTransport: + """Primary path fails → try fallback IPs → stick to whichever works.""" + + @pytest.mark.asyncio + async def test_falls_back_on_connect_timeout_and_becomes_sticky(self, monkeypatch): + calls = [] + behavior = {"api.telegram.org": "timeout", "149.154.167.220": "ok"} + monkeypatch.setattr(tnet.httpx, "AsyncHTTPTransport", _fake_transport_factory(calls, behavior)) + + transport = tnet.TelegramFallbackTransport(["149.154.167.220"]) + resp = await transport.handle_async_request(_telegram_request()) + + assert resp.status_code == 200 + assert transport._sticky_ip == "149.154.167.220" + # First attempt was primary (api.telegram.org), second was fallback + assert calls[0]["url_host"] == "api.telegram.org" + assert calls[1]["url_host"] == "149.154.167.220" + assert calls[1]["host_header"] == "api.telegram.org" + assert calls[1]["sni_hostname"] == "api.telegram.org" + + # Second request goes straight to sticky IP + calls.clear() + resp2 = await transport.handle_async_request(_telegram_request()) + assert resp2.status_code == 200 + assert calls[0]["url_host"] == "149.154.167.220" + + @pytest.mark.asyncio + async def test_falls_back_on_connect_error(self, monkeypatch): + calls = [] + behavior = {"api.telegram.org": "connect_error", "149.154.167.220": "ok"} + monkeypatch.setattr(tnet.httpx, "AsyncHTTPTransport", _fake_transport_factory(calls, behavior)) + + transport = tnet.TelegramFallbackTransport(["149.154.167.220"]) + resp = await transport.handle_async_request(_telegram_request()) + + assert resp.status_code == 200 + assert transport._sticky_ip == "149.154.167.220" + + @pytest.mark.asyncio + async def test_does_not_fallback_on_non_connect_error(self, monkeypatch): + """Errors like ReadTimeout are not connection issues — don't retry.""" + calls = [] + behavior = {"api.telegram.org": httpx.ReadTimeout("read timeout"), "149.154.167.220": "ok"} + monkeypatch.setattr(tnet.httpx, "AsyncHTTPTransport", _fake_transport_factory(calls, behavior)) + + transport = tnet.TelegramFallbackTransport(["149.154.167.220"]) + + with pytest.raises(httpx.ReadTimeout): + await transport.handle_async_request(_telegram_request()) + + assert [c["url_host"] for c in calls] == ["api.telegram.org"] + + @pytest.mark.asyncio + async def test_all_ips_fail_raises_last_error(self, monkeypatch): + calls = [] + behavior = {"api.telegram.org": "timeout", "149.154.167.220": "timeout"} + monkeypatch.setattr(tnet.httpx, "AsyncHTTPTransport", _fake_transport_factory(calls, behavior)) + + transport = tnet.TelegramFallbackTransport(["149.154.167.220"]) + + with pytest.raises(httpx.ConnectTimeout): + await transport.handle_async_request(_telegram_request()) + + assert [c["url_host"] for c in calls] == ["api.telegram.org", "149.154.167.220"] + assert transport._sticky_ip is None + + @pytest.mark.asyncio + async def test_multiple_fallback_ips_tried_in_order(self, monkeypatch): + calls = [] + behavior = { + "api.telegram.org": "timeout", + "149.154.167.220": "timeout", + "149.154.167.221": "ok", + } + monkeypatch.setattr(tnet.httpx, "AsyncHTTPTransport", _fake_transport_factory(calls, behavior)) + + transport = tnet.TelegramFallbackTransport(["149.154.167.220", "149.154.167.221"]) + resp = await transport.handle_async_request(_telegram_request()) + + assert resp.status_code == 200 + assert transport._sticky_ip == "149.154.167.221" + assert [c["url_host"] for c in calls] == [ + "api.telegram.org", + "149.154.167.220", + "149.154.167.221", + ] + + @pytest.mark.asyncio + async def test_sticky_ip_tried_first_but_falls_through_if_stale(self, monkeypatch): + """If the sticky IP stops working, the transport retries others.""" + calls = [] + behavior = { + "api.telegram.org": "timeout", + "149.154.167.220": "ok", + "149.154.167.221": "ok", + } + monkeypatch.setattr(tnet.httpx, "AsyncHTTPTransport", _fake_transport_factory(calls, behavior)) + + transport = tnet.TelegramFallbackTransport(["149.154.167.220", "149.154.167.221"]) + + # First request: primary fails → .220 works → becomes sticky + await transport.handle_async_request(_telegram_request()) + assert transport._sticky_ip == "149.154.167.220" + + # Now .220 goes bad too + calls.clear() + behavior["149.154.167.220"] = "timeout" + + resp = await transport.handle_async_request(_telegram_request()) + assert resp.status_code == 200 + # Tried sticky (.220) first, then fell through to .221 + assert [c["url_host"] for c in calls] == ["149.154.167.220", "149.154.167.221"] + assert transport._sticky_ip == "149.154.167.221" + + +class TestFallbackTransportPassthrough: + """Requests that don't need fallback behavior.""" + + @pytest.mark.asyncio + async def test_non_telegram_host_bypasses_fallback(self, monkeypatch): + calls = [] + behavior = {} + monkeypatch.setattr(tnet.httpx, "AsyncHTTPTransport", _fake_transport_factory(calls, behavior)) + + transport = tnet.TelegramFallbackTransport(["149.154.167.220"]) + request = httpx.Request("GET", "https://example.com/path") + resp = await transport.handle_async_request(request) + + assert resp.status_code == 200 + assert calls[0]["url_host"] == "example.com" + assert transport._sticky_ip is None + + @pytest.mark.asyncio + async def test_empty_fallback_list_uses_primary_only(self, monkeypatch): + calls = [] + behavior = {} + monkeypatch.setattr(tnet.httpx, "AsyncHTTPTransport", _fake_transport_factory(calls, behavior)) + + transport = tnet.TelegramFallbackTransport([]) + resp = await transport.handle_async_request(_telegram_request()) + + assert resp.status_code == 200 + assert calls[0]["url_host"] == "api.telegram.org" + + @pytest.mark.asyncio + async def test_primary_succeeds_no_fallback_needed(self, monkeypatch): + calls = [] + behavior = {"api.telegram.org": "ok"} + monkeypatch.setattr(tnet.httpx, "AsyncHTTPTransport", _fake_transport_factory(calls, behavior)) + + transport = tnet.TelegramFallbackTransport(["149.154.167.220"]) + resp = await transport.handle_async_request(_telegram_request()) + + assert resp.status_code == 200 + assert transport._sticky_ip is None + assert len(calls) == 1 + + +class TestFallbackTransportInit: + def test_deduplicates_fallback_ips(self, monkeypatch): + monkeypatch.setattr( + tnet.httpx, "AsyncHTTPTransport", lambda **kw: FakeTransport([], {}) + ) + transport = tnet.TelegramFallbackTransport(["149.154.167.220", "149.154.167.220"]) + assert transport._fallback_ips == ["149.154.167.220"] + + def test_filters_invalid_ips_at_init(self, monkeypatch): + monkeypatch.setattr( + tnet.httpx, "AsyncHTTPTransport", lambda **kw: FakeTransport([], {}) + ) + transport = tnet.TelegramFallbackTransport(["149.154.167.220", "not-an-ip"]) + assert transport._fallback_ips == ["149.154.167.220"] + + +class TestFallbackTransportClose: + @pytest.mark.asyncio + async def test_aclose_closes_all_transports(self, monkeypatch): + factory = _fake_transport_factory([], {}) + monkeypatch.setattr(tnet.httpx, "AsyncHTTPTransport", factory) + + transport = tnet.TelegramFallbackTransport(["149.154.167.220", "149.154.167.221"]) + await transport.aclose() + + # 1 primary + 2 fallback transports + assert len(factory.instances) == 3 + assert all(t.closed for t in factory.instances) + + +# ═══════════════════════════════════════════════════════════════════════════ +# Config layer – TELEGRAM_FALLBACK_IPS env → config.extra +# ═══════════════════════════════════════════════════════════════════════════ + +class TestConfigFallbackIps: + def test_env_var_populates_config_extra(self, monkeypatch): + from gateway.config import GatewayConfig, Platform, PlatformConfig, _apply_env_overrides + + monkeypatch.setenv("TELEGRAM_FALLBACK_IPS", "149.154.167.220,149.154.167.221") + config = GatewayConfig(platforms={Platform.TELEGRAM: PlatformConfig(enabled=True, token="tok")}) + _apply_env_overrides(config) + + assert config.platforms[Platform.TELEGRAM].extra["fallback_ips"] == [ + "149.154.167.220", "149.154.167.221", + ] + + def test_env_var_creates_platform_if_missing(self, monkeypatch): + from gateway.config import GatewayConfig, Platform, _apply_env_overrides + + monkeypatch.setenv("TELEGRAM_FALLBACK_IPS", "149.154.167.220") + config = GatewayConfig(platforms={}) + _apply_env_overrides(config) + + assert Platform.TELEGRAM in config.platforms + assert config.platforms[Platform.TELEGRAM].extra["fallback_ips"] == ["149.154.167.220"] + + def test_env_var_strips_whitespace(self, monkeypatch): + from gateway.config import GatewayConfig, Platform, PlatformConfig, _apply_env_overrides + + monkeypatch.setenv("TELEGRAM_FALLBACK_IPS", " 149.154.167.220 , 149.154.167.221 ") + config = GatewayConfig(platforms={Platform.TELEGRAM: PlatformConfig(enabled=True, token="tok")}) + _apply_env_overrides(config) + + assert config.platforms[Platform.TELEGRAM].extra["fallback_ips"] == [ + "149.154.167.220", "149.154.167.221", + ] + + def test_empty_env_var_does_not_populate(self, monkeypatch): + from gateway.config import GatewayConfig, Platform, PlatformConfig, _apply_env_overrides + + monkeypatch.setenv("TELEGRAM_FALLBACK_IPS", "") + config = GatewayConfig(platforms={Platform.TELEGRAM: PlatformConfig(enabled=True, token="tok")}) + _apply_env_overrides(config) + + assert "fallback_ips" not in config.platforms[Platform.TELEGRAM].extra + + +# ═══════════════════════════════════════════════════════════════════════════ +# Adapter layer – _fallback_ips() reads config correctly +# ═══════════════════════════════════════════════════════════════════════════ + +class TestAdapterFallbackIps: + def _make_adapter(self, extra=None): + import sys + from unittest.mock import MagicMock + + # Ensure telegram mock is in place + if "telegram" not in sys.modules or not hasattr(sys.modules["telegram"], "__file__"): + mod = MagicMock() + mod.ext.ContextTypes.DEFAULT_TYPE = type(None) + mod.constants.ParseMode.MARKDOWN_V2 = "MarkdownV2" + mod.constants.ChatType.GROUP = "group" + mod.constants.ChatType.SUPERGROUP = "supergroup" + mod.constants.ChatType.CHANNEL = "channel" + mod.constants.ChatType.PRIVATE = "private" + for name in ("telegram", "telegram.ext", "telegram.constants", "telegram.request"): + sys.modules.setdefault(name, mod) + + from gateway.config import PlatformConfig + from gateway.platforms.telegram import TelegramAdapter + + config = PlatformConfig(enabled=True, token="test-token") + if extra: + config.extra.update(extra) + return TelegramAdapter(config) + + def test_list_in_extra(self): + adapter = self._make_adapter(extra={"fallback_ips": ["149.154.167.220"]}) + assert adapter._fallback_ips() == ["149.154.167.220"] + + def test_csv_string_in_extra(self): + adapter = self._make_adapter(extra={"fallback_ips": "149.154.167.220,149.154.167.221"}) + assert adapter._fallback_ips() == ["149.154.167.220", "149.154.167.221"] + + def test_empty_extra(self): + adapter = self._make_adapter() + assert adapter._fallback_ips() == [] + + def test_no_extra_attr(self): + adapter = self._make_adapter() + adapter.config.extra = None + assert adapter._fallback_ips() == [] + + def test_invalid_ips_filtered(self): + adapter = self._make_adapter(extra={"fallback_ips": ["149.154.167.220", "not-valid"]}) + assert adapter._fallback_ips() == ["149.154.167.220"] + + +# ═══════════════════════════════════════════════════════════════════════════ +# DoH auto-discovery +# ═══════════════════════════════════════════════════════════════════════════ + +def _doh_answer(*ips: str) -> dict: + """Build a minimal DoH JSON response with A records.""" + return {"Answer": [{"type": 1, "data": ip} for ip in ips]} + + +class FakeDoHClient: + """Mock httpx.AsyncClient for DoH queries.""" + + def __init__(self, responses: dict): + # responses: URL prefix → (status, json_body) | Exception + self._responses = responses + self.requests_made: list[dict] = [] + + @staticmethod + def _make_response(status, body, url): + """Build an httpx.Response with a request attached (needed for raise_for_status).""" + request = httpx.Request("GET", url) + return httpx.Response(status, json=body, request=request) + + async def get(self, url, *, params=None, headers=None, **kwargs): + self.requests_made.append({"url": url, "params": params, "headers": headers}) + for prefix, action in self._responses.items(): + if url.startswith(prefix): + if isinstance(action, Exception): + raise action + status, body = action + return self._make_response(status, body, url) + return self._make_response(200, {}, url) + + async def __aenter__(self): + return self + + async def __aexit__(self, *args): + pass + + +class TestDiscoverFallbackIps: + """Tests for discover_fallback_ips() — DoH-based auto-discovery.""" + + def _patch_doh(self, monkeypatch, responses, system_dns_ips=None): + """Wire up fake DoH client and system DNS.""" + client = FakeDoHClient(responses) + monkeypatch.setattr(tnet.httpx, "AsyncClient", lambda **kw: client) + + if system_dns_ips is not None: + addrs = [(None, None, None, None, (ip, 443)) for ip in system_dns_ips] + monkeypatch.setattr(tnet.socket, "getaddrinfo", lambda *a, **kw: addrs) + else: + def _fail(*a, **kw): + raise OSError("dns failed") + monkeypatch.setattr(tnet.socket, "getaddrinfo", _fail) + return client + + @pytest.mark.asyncio + async def test_google_and_cloudflare_ips_collected(self, monkeypatch): + self._patch_doh(monkeypatch, { + "https://dns.google": (200, _doh_answer("149.154.167.220")), + "https://cloudflare-dns.com": (200, _doh_answer("149.154.167.221")), + }, system_dns_ips=["149.154.166.110"]) + + ips = await tnet.discover_fallback_ips() + assert "149.154.167.220" in ips + assert "149.154.167.221" in ips + + @pytest.mark.asyncio + async def test_system_dns_ip_excluded(self, monkeypatch): + """The IP from system DNS is the one that doesn't work — exclude it.""" + self._patch_doh(monkeypatch, { + "https://dns.google": (200, _doh_answer("149.154.166.110", "149.154.167.220")), + "https://cloudflare-dns.com": (200, _doh_answer("149.154.166.110")), + }, system_dns_ips=["149.154.166.110"]) + + ips = await tnet.discover_fallback_ips() + assert ips == ["149.154.167.220"] + + @pytest.mark.asyncio + async def test_doh_results_deduplicated(self, monkeypatch): + self._patch_doh(monkeypatch, { + "https://dns.google": (200, _doh_answer("149.154.167.220")), + "https://cloudflare-dns.com": (200, _doh_answer("149.154.167.220")), + }, system_dns_ips=["149.154.166.110"]) + + ips = await tnet.discover_fallback_ips() + assert ips == ["149.154.167.220"] + + @pytest.mark.asyncio + async def test_doh_timeout_falls_back_to_seed(self, monkeypatch): + self._patch_doh(monkeypatch, { + "https://dns.google": httpx.TimeoutException("timeout"), + "https://cloudflare-dns.com": httpx.TimeoutException("timeout"), + }, system_dns_ips=["149.154.166.110"]) + + ips = await tnet.discover_fallback_ips() + assert ips == tnet._SEED_FALLBACK_IPS + + @pytest.mark.asyncio + async def test_doh_connect_error_falls_back_to_seed(self, monkeypatch): + self._patch_doh(monkeypatch, { + "https://dns.google": httpx.ConnectError("refused"), + "https://cloudflare-dns.com": httpx.ConnectError("refused"), + }, system_dns_ips=["149.154.166.110"]) + + ips = await tnet.discover_fallback_ips() + assert ips == tnet._SEED_FALLBACK_IPS + + @pytest.mark.asyncio + async def test_doh_malformed_json_falls_back_to_seed(self, monkeypatch): + self._patch_doh(monkeypatch, { + "https://dns.google": (200, {"Status": 0}), # no Answer key + "https://cloudflare-dns.com": (200, {"garbage": True}), + }, system_dns_ips=["149.154.166.110"]) + + ips = await tnet.discover_fallback_ips() + assert ips == tnet._SEED_FALLBACK_IPS + + @pytest.mark.asyncio + async def test_one_provider_fails_other_succeeds(self, monkeypatch): + self._patch_doh(monkeypatch, { + "https://dns.google": httpx.TimeoutException("timeout"), + "https://cloudflare-dns.com": (200, _doh_answer("149.154.167.220")), + }, system_dns_ips=["149.154.166.110"]) + + ips = await tnet.discover_fallback_ips() + assert ips == ["149.154.167.220"] + + @pytest.mark.asyncio + async def test_system_dns_failure_keeps_all_doh_ips(self, monkeypatch): + """If system DNS fails, nothing gets excluded — all DoH IPs kept.""" + self._patch_doh(monkeypatch, { + "https://dns.google": (200, _doh_answer("149.154.166.110", "149.154.167.220")), + "https://cloudflare-dns.com": (200, _doh_answer()), + }, system_dns_ips=None) # triggers OSError + + ips = await tnet.discover_fallback_ips() + assert "149.154.166.110" in ips + assert "149.154.167.220" in ips + + @pytest.mark.asyncio + async def test_all_doh_ips_same_as_system_dns_uses_seed(self, monkeypatch): + """DoH returns only the same blocked IP — seed list is the fallback.""" + self._patch_doh(monkeypatch, { + "https://dns.google": (200, _doh_answer("149.154.166.110")), + "https://cloudflare-dns.com": (200, _doh_answer("149.154.166.110")), + }, system_dns_ips=["149.154.166.110"]) + + ips = await tnet.discover_fallback_ips() + assert ips == tnet._SEED_FALLBACK_IPS + + @pytest.mark.asyncio + async def test_cloudflare_gets_accept_header(self, monkeypatch): + client = self._patch_doh(monkeypatch, { + "https://dns.google": (200, _doh_answer("149.154.167.220")), + "https://cloudflare-dns.com": (200, _doh_answer("149.154.167.221")), + }, system_dns_ips=["149.154.166.110"]) + + await tnet.discover_fallback_ips() + + cf_reqs = [r for r in client.requests_made if "cloudflare" in r["url"]] + assert cf_reqs + assert cf_reqs[0]["headers"]["Accept"] == "application/dns-json" + + @pytest.mark.asyncio + async def test_non_a_records_ignored(self, monkeypatch): + """AAAA records (type 28) and CNAME (type 5) should be skipped.""" + answer = { + "Answer": [ + {"type": 5, "data": "telegram.org"}, # CNAME + {"type": 28, "data": "2001:67c:4e8:f004::9"}, # AAAA + {"type": 1, "data": "149.154.167.220"}, # A ✓ + ] + } + self._patch_doh(monkeypatch, { + "https://dns.google": (200, answer), + "https://cloudflare-dns.com": (200, _doh_answer()), + }, system_dns_ips=["149.154.166.110"]) + + ips = await tnet.discover_fallback_ips() + assert ips == ["149.154.167.220"] + + @pytest.mark.asyncio + async def test_invalid_ip_in_doh_response_skipped(self, monkeypatch): + answer = {"Answer": [ + {"type": 1, "data": "not-an-ip"}, + {"type": 1, "data": "149.154.167.220"}, + ]} + self._patch_doh(monkeypatch, { + "https://dns.google": (200, answer), + "https://cloudflare-dns.com": (200, _doh_answer()), + }, system_dns_ips=["149.154.166.110"]) + + ips = await tnet.discover_fallback_ips() + assert ips == ["149.154.167.220"] diff --git a/tests/gateway/test_telegram_network_reconnect.py b/tests/gateway/test_telegram_network_reconnect.py index 8223823578..f78a7f2080 100644 --- a/tests/gateway/test_telegram_network_reconnect.py +++ b/tests/gateway/test_telegram_network_reconnect.py @@ -27,7 +27,7 @@ def _ensure_telegram_mock(): telegram_mod.constants.ChatType.CHANNEL = "channel" telegram_mod.constants.ChatType.PRIVATE = "private" - for name in ("telegram", "telegram.ext", "telegram.constants"): + for name in ("telegram", "telegram.ext", "telegram.constants", "telegram.request"): sys.modules.setdefault(name, telegram_mod) @@ -36,6 +36,14 @@ _ensure_telegram_mock() from gateway.platforms.telegram import TelegramAdapter # noqa: E402 +@pytest.fixture(autouse=True) +def _no_auto_discovery(monkeypatch): + """Disable DoH auto-discovery so connect() uses the plain builder chain.""" + async def _noop(): + return [] + monkeypatch.setattr("gateway.platforms.telegram.discover_fallback_ips", _noop) + + def _make_adapter() -> TelegramAdapter: return TelegramAdapter(PlatformConfig(enabled=True, token="test-token")) diff --git a/tests/gateway/test_telegram_reply_mode.py b/tests/gateway/test_telegram_reply_mode.py index 1ec16b5123..1218afa0c1 100644 --- a/tests/gateway/test_telegram_reply_mode.py +++ b/tests/gateway/test_telegram_reply_mode.py @@ -25,7 +25,7 @@ def _ensure_telegram_mock(): mod.constants.ChatType.SUPERGROUP = "supergroup" mod.constants.ChatType.CHANNEL = "channel" mod.constants.ChatType.PRIVATE = "private" - for name in ("telegram", "telegram.ext", "telegram.constants"): + for name in ("telegram", "telegram.ext", "telegram.constants", "telegram.request"): sys.modules.setdefault(name, mod)