feat(telegram): auto-discover fallback IPs via DoH when api.telegram.org is unreachable (#3376)

* feat(telegram): auto-discover fallback IPs via DoH when api.telegram.org is unreachable

On some networks (university, corporate), api.telegram.org resolves to a
valid Telegram IP that is unreachable due to routing/firewall rules. A
different IP in the same Telegram-owned 149.154.160.0/20 block works fine.

This adds automatic fallback IP discovery at connect time:
1. Query Google and Cloudflare DNS-over-HTTPS for api.telegram.org A records
2. Exclude the system-DNS IP (the unreachable one), use the rest as fallbacks
3. If DoH is also blocked, fall back to a seed list (149.154.167.220)
4. TelegramFallbackTransport tries primary first, sticks to whichever works

No configuration needed — works automatically. TELEGRAM_FALLBACK_IPS env var
still available as manual override. Zero impact on healthy networks (primary
path succeeds on first attempt, fallback never exercised).

No new dependencies (uses httpx already in deps + stdlib socket).

* fix: share transport instance and downgrade seed fallback log to info

- Use single TelegramFallbackTransport shared between request and
  get_updates_request so sticky IP is shared across polling and API calls
- Keep separate HTTPXRequest instances (different timeout settings)
- Downgrade "using seed fallback IPs" from warning to info to avoid
  noisy logs on healthy networks

* fix: add telegram.request mock and discovery fixture to remaining test files

The original PR missed test_dm_topics.py and
test_telegram_network_reconnect.py — both need the telegram.request
mock module. The reconnect test also needs _no_auto_discovery since
_handle_polling_network_error calls connect() which now invokes
discover_fallback_ips().

---------

Co-authored-by: Mohan Qiao <Gavin-Qiao@users.noreply.github.com>
This commit is contained in:
Teknium 2026-03-27 04:03:13 -07:00 committed by GitHub
parent be416cdfa9
commit 75fcbc44ce
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 925 additions and 9 deletions

View file

@ -601,6 +601,14 @@ def _apply_env_overrides(config: GatewayConfig) -> None:
config.platforms[Platform.TELEGRAM] = PlatformConfig()
config.platforms[Platform.TELEGRAM].reply_to_mode = telegram_reply_mode
telegram_fallback_ips = os.getenv("TELEGRAM_FALLBACK_IPS", "")
if telegram_fallback_ips:
if Platform.TELEGRAM not in config.platforms:
config.platforms[Platform.TELEGRAM] = PlatformConfig()
config.platforms[Platform.TELEGRAM].extra["fallback_ips"] = [
ip.strip() for ip in telegram_fallback_ips.split(",") if ip.strip()
]
telegram_home = os.getenv("TELEGRAM_HOME_CHANNEL")
if telegram_home and Platform.TELEGRAM in config.platforms:
config.platforms[Platform.TELEGRAM].home_channel = HomeChannel(

View file

@ -11,7 +11,7 @@ import asyncio
import logging
import os
import re
from typing import Dict, Optional, Any
from typing import Dict, List, Optional, Any
logger = logging.getLogger(__name__)
@ -25,6 +25,7 @@ try:
filters,
)
from telegram.constants import ParseMode, ChatType
from telegram.request import HTTPXRequest
TELEGRAM_AVAILABLE = True
except ImportError:
TELEGRAM_AVAILABLE = False
@ -34,6 +35,7 @@ except ImportError:
Application = Any
CommandHandler = Any
TelegramMessageHandler = Any
HTTPXRequest = Any
filters = None
ParseMode = None
ChatType = None
@ -59,6 +61,11 @@ from gateway.platforms.base import (
cache_document_from_bytes,
SUPPORTED_DOCUMENT_TYPES,
)
from gateway.platforms.telegram_network import (
TelegramFallbackTransport,
discover_fallback_ips,
parse_fallback_ip_env,
)
def check_telegram_requirements() -> bool:
@ -138,6 +145,13 @@ class TelegramAdapter(BasePlatformAdapter):
# DM Topics config from extra.dm_topics
self._dm_topics_config: List[Dict[str, Any]] = self.config.extra.get("dm_topics", [])
def _fallback_ips(self) -> list[str]:
"""Return validated fallback IPs from config (populated by _apply_env_overrides)."""
configured = self.config.extra.get("fallback_ips", []) if getattr(self.config, "extra", None) else []
if isinstance(configured, str):
configured = configured.split(",")
return parse_fallback_ip_env(",".join(str(v) for v in configured) if configured else None)
@staticmethod
def _looks_like_polling_conflict(error: Exception) -> bool:
text = str(error).lower()
@ -474,7 +488,26 @@ class TelegramAdapter(BasePlatformAdapter):
return False
# Build the application
self._app = Application.builder().token(self.config.token).build()
builder = Application.builder().token(self.config.token)
fallback_ips = self._fallback_ips()
if not fallback_ips:
fallback_ips = await discover_fallback_ips()
logger.info(
"[%s] Auto-discovered Telegram fallback IPs: %s",
self.name,
", ".join(fallback_ips),
)
if fallback_ips:
logger.warning(
"[%s] Telegram fallback IPs active: %s",
self.name,
", ".join(fallback_ips),
)
transport = TelegramFallbackTransport(fallback_ips)
request = HTTPXRequest(httpx_kwargs={"transport": transport})
get_updates_request = HTTPXRequest(httpx_kwargs={"transport": transport})
builder = builder.request(request).get_updates_request(get_updates_request)
self._app = builder.build()
self._bot = self._app.bot
# Register handlers

View file

@ -0,0 +1,233 @@
"""Telegram-specific network helpers.
Provides a hostname-preserving fallback transport for networks where
api.telegram.org resolves to an endpoint that is unreachable from the current
host. The transport keeps the logical request host and TLS SNI as
api.telegram.org while retrying the TCP connection against one or more fallback
IPv4 addresses.
"""
from __future__ import annotations
import asyncio
import ipaddress
import logging
import socket
from typing import Iterable, Optional
import httpx
logger = logging.getLogger(__name__)
_TELEGRAM_API_HOST = "api.telegram.org"
# DNS-over-HTTPS providers used to discover Telegram API IPs that may differ
# from the (potentially unreachable) IP returned by the local system resolver.
_DOH_TIMEOUT = 4.0 # seconds — bounded so connect() isn't noticeably delayed
_DOH_PROVIDERS: list[dict] = [
{
"url": "https://dns.google/resolve",
"params": {"name": _TELEGRAM_API_HOST, "type": "A"},
"headers": {},
},
{
"url": "https://cloudflare-dns.com/dns-query",
"params": {"name": _TELEGRAM_API_HOST, "type": "A"},
"headers": {"Accept": "application/dns-json"},
},
]
# Last-resort IPs when DoH is also blocked. These are stable Telegram Bot API
# endpoints in the 149.154.160.0/20 block (same seed used by OpenClaw).
_SEED_FALLBACK_IPS: list[str] = ["149.154.167.220"]
class TelegramFallbackTransport(httpx.AsyncBaseTransport):
"""Retry Telegram Bot API requests via fallback IPs while preserving TLS/SNI.
Requests continue to target https://api.telegram.org/... logically, but on
connect failures the underlying TCP connection is retried against a known
reachable IP. This is effectively the programmatic equivalent of
``curl --resolve api.telegram.org:443:<ip>``.
"""
def __init__(self, fallback_ips: Iterable[str], **transport_kwargs):
self._fallback_ips = [ip for ip in dict.fromkeys(_normalize_fallback_ips(fallback_ips))]
self._primary = httpx.AsyncHTTPTransport(**transport_kwargs)
self._fallbacks = {
ip: httpx.AsyncHTTPTransport(**transport_kwargs) for ip in self._fallback_ips
}
self._sticky_ip: Optional[str] = None
self._sticky_lock = asyncio.Lock()
async def handle_async_request(self, request: httpx.Request) -> httpx.Response:
if request.url.host != _TELEGRAM_API_HOST or not self._fallback_ips:
return await self._primary.handle_async_request(request)
sticky_ip = self._sticky_ip
attempt_order: list[Optional[str]] = [sticky_ip] if sticky_ip else [None]
for ip in self._fallback_ips:
if ip != sticky_ip:
attempt_order.append(ip)
last_error: Exception | None = None
for ip in attempt_order:
candidate = request if ip is None else _rewrite_request_for_ip(request, ip)
transport = self._primary if ip is None else self._fallbacks[ip]
try:
response = await transport.handle_async_request(candidate)
if ip is not None and self._sticky_ip != ip:
async with self._sticky_lock:
if self._sticky_ip != ip:
self._sticky_ip = ip
logger.warning(
"[Telegram] Primary api.telegram.org path unreachable; using sticky fallback IP %s",
ip,
)
return response
except Exception as exc:
last_error = exc
if not _is_retryable_connect_error(exc):
raise
if ip is None:
logger.warning(
"[Telegram] Primary api.telegram.org connection failed (%s); trying fallback IPs %s",
exc,
", ".join(self._fallback_ips),
)
continue
logger.warning("[Telegram] Fallback IP %s failed: %s", ip, exc)
continue
assert last_error is not None
raise last_error
async def aclose(self) -> None:
await self._primary.aclose()
for transport in self._fallbacks.values():
await transport.aclose()
def _normalize_fallback_ips(values: Iterable[str]) -> list[str]:
normalized: list[str] = []
for value in values:
raw = str(value).strip()
if not raw:
continue
try:
addr = ipaddress.ip_address(raw)
except ValueError:
logger.warning("Ignoring invalid Telegram fallback IP: %r", raw)
continue
if addr.version != 4:
logger.warning("Ignoring non-IPv4 Telegram fallback IP: %s", raw)
continue
normalized.append(str(addr))
return normalized
def parse_fallback_ip_env(value: str | None) -> list[str]:
if not value:
return []
parts = [part.strip() for part in value.split(",")]
return _normalize_fallback_ips(parts)
def _resolve_system_dns() -> set[str]:
"""Return the IPv4 addresses that the OS resolver gives for api.telegram.org."""
try:
results = socket.getaddrinfo(_TELEGRAM_API_HOST, 443, socket.AF_INET)
return {addr[4][0] for addr in results}
except Exception:
return set()
async def _query_doh_provider(
client: httpx.AsyncClient, provider: dict
) -> list[str]:
"""Query one DoH provider and return A-record IPs."""
try:
resp = await client.get(
provider["url"], params=provider["params"], headers=provider["headers"]
)
resp.raise_for_status()
data = resp.json()
ips: list[str] = []
for answer in data.get("Answer", []):
if answer.get("type") != 1: # A record
continue
raw = answer.get("data", "").strip()
try:
ipaddress.ip_address(raw)
ips.append(raw)
except ValueError:
continue
return ips
except Exception as exc:
logger.debug("DoH query to %s failed: %s", provider["url"], exc)
return []
async def discover_fallback_ips() -> list[str]:
"""Auto-discover Telegram API IPs via DNS-over-HTTPS.
Resolves api.telegram.org through Google and Cloudflare DoH, collects all
unique IPs, and excludes the system-DNS-resolved IP (which is presumably
unreachable on this network). Falls back to a hardcoded seed list when DoH
is also unavailable.
"""
async with httpx.AsyncClient(timeout=httpx.Timeout(_DOH_TIMEOUT)) as client:
doh_tasks = [_query_doh_provider(client, p) for p in _DOH_PROVIDERS]
system_dns_task = asyncio.to_thread(_resolve_system_dns)
results = await asyncio.gather(system_dns_task, *doh_tasks, return_exceptions=True)
# results[0] = system DNS IPs (set), results[1:] = DoH IP lists
system_ips: set[str] = results[0] if isinstance(results[0], set) else set()
doh_ips: list[str] = []
for r in results[1:]:
if isinstance(r, list):
doh_ips.extend(r)
# Deduplicate preserving order, exclude system-DNS IPs
seen: set[str] = set()
candidates: list[str] = []
for ip in doh_ips:
if ip not in seen and ip not in system_ips:
seen.add(ip)
candidates.append(ip)
# Validate through existing normalization
validated = _normalize_fallback_ips(candidates)
if validated:
logger.debug("Discovered Telegram fallback IPs via DoH: %s", ", ".join(validated))
return validated
logger.info(
"DoH discovery yielded no new IPs (system DNS: %s); using seed fallback IPs %s",
", ".join(system_ips) or "unknown",
", ".join(_SEED_FALLBACK_IPS),
)
return list(_SEED_FALLBACK_IPS)
def _rewrite_request_for_ip(request: httpx.Request, ip: str) -> httpx.Request:
original_host = request.url.host or _TELEGRAM_API_HOST
url = request.url.copy_with(host=ip)
headers = request.headers.copy()
headers["host"] = original_host
extensions = dict(request.extensions)
extensions["sni_hostname"] = original_host
return httpx.Request(
method=request.method,
url=url,
headers=headers,
stream=request.stream,
extensions=extensions,
)
def _is_retryable_connect_error(exc: Exception) -> bool:
return isinstance(exc, (httpx.ConnectTimeout, httpx.ConnectError))

View file

@ -32,7 +32,7 @@ def _ensure_telegram_mock():
telegram_mod.constants.ChatType.CHANNEL = "channel"
telegram_mod.constants.ChatType.PRIVATE = "private"
for name in ("telegram", "telegram.ext", "telegram.constants"):
for name in ("telegram", "telegram.ext", "telegram.constants", "telegram.request"):
sys.modules.setdefault(name, telegram_mod)

View file

@ -76,7 +76,7 @@ def _ensure_telegram_mock():
telegram_mod.constants.ChatType.CHANNEL = "channel"
telegram_mod.constants.ChatType.PRIVATE = "private"
for name in ("telegram", "telegram.ext", "telegram.constants"):
for name in ("telegram", "telegram.ext", "telegram.constants", "telegram.request"):
sys.modules.setdefault(name, telegram_mod)

View file

@ -20,7 +20,7 @@ def _ensure_telegram_mock():
telegram_mod.constants.ChatType.CHANNEL = "channel"
telegram_mod.constants.ChatType.PRIVATE = "private"
for name in ("telegram", "telegram.ext", "telegram.constants"):
for name in ("telegram", "telegram.ext", "telegram.constants", "telegram.request"):
sys.modules.setdefault(name, telegram_mod)
@ -29,6 +29,14 @@ _ensure_telegram_mock()
from gateway.platforms.telegram import TelegramAdapter # noqa: E402
@pytest.fixture(autouse=True)
def _no_auto_discovery(monkeypatch):
"""Disable DoH auto-discovery so connect() uses the plain builder chain."""
async def _noop():
return []
monkeypatch.setattr("gateway.platforms.telegram.discover_fallback_ips", _noop)
@pytest.mark.asyncio
async def test_connect_rejects_same_host_token_lock(monkeypatch):
adapter = TelegramAdapter(PlatformConfig(enabled=True, token="secret-token"))

View file

@ -45,7 +45,7 @@ def _ensure_telegram_mock():
telegram_mod.constants.ChatType.CHANNEL = "channel"
telegram_mod.constants.ChatType.PRIVATE = "private"
for name in ("telegram", "telegram.ext", "telegram.constants"):
for name in ("telegram", "telegram.ext", "telegram.constants", "telegram.request"):
sys.modules.setdefault(name, telegram_mod)

View file

@ -28,7 +28,7 @@ def _ensure_telegram_mock():
mod.constants.ChatType.SUPERGROUP = "supergroup"
mod.constants.ChatType.CHANNEL = "channel"
mod.constants.ChatType.PRIVATE = "private"
for name in ("telegram", "telegram.ext", "telegram.constants"):
for name in ("telegram", "telegram.ext", "telegram.constants", "telegram.request"):
sys.modules.setdefault(name, mod)

View file

@ -0,0 +1,626 @@
"""Tests for gateway.platforms.telegram_network fallback transport layer.
Background
----------
api.telegram.org resolves to an IP (e.g. 149.154.166.110) that is unreachable
from some networks. The workaround: route TCP through a different IP in the
same Telegram-owned 149.154.160.0/20 block (e.g. 149.154.167.220) while
keeping TLS SNI and the Host header as api.telegram.org so Telegram's edge
servers still accept the request. This is the programmatic equivalent of:
curl --resolve api.telegram.org:443:149.154.167.220 https://api.telegram.org/bot<token>/getMe
The TelegramFallbackTransport implements this: try the primary (DNS-resolved)
path first, and on ConnectTimeout / ConnectError fall through to configured
fallback IPs in order, then "stick" to whichever IP works.
"""
import httpx
import pytest
from gateway.platforms import telegram_network as tnet
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
class FakeTransport(httpx.AsyncBaseTransport):
"""Records calls and raises / returns based on a host→action mapping."""
def __init__(self, calls, behavior):
self.calls = calls
self.behavior = behavior
self.closed = False
async def handle_async_request(self, request: httpx.Request) -> httpx.Response:
self.calls.append(
{
"url_host": request.url.host,
"host_header": request.headers.get("host"),
"sni_hostname": request.extensions.get("sni_hostname"),
"path": request.url.path,
}
)
action = self.behavior.get(request.url.host, "ok")
if action == "timeout":
raise httpx.ConnectTimeout("timed out")
if action == "connect_error":
raise httpx.ConnectError("connect error")
if isinstance(action, Exception):
raise action
return httpx.Response(200, request=request, text="ok")
async def aclose(self) -> None:
self.closed = True
def _fake_transport_factory(calls, behavior):
"""Returns a factory that creates FakeTransport instances."""
instances = []
def factory(**kwargs):
t = FakeTransport(calls, behavior)
instances.append(t)
return t
factory.instances = instances
return factory
def _telegram_request(path="/botTOKEN/getMe"):
return httpx.Request("GET", f"https://api.telegram.org{path}")
# ═══════════════════════════════════════════════════════════════════════════
# IP parsing & validation
# ═══════════════════════════════════════════════════════════════════════════
class TestParseFallbackIpEnv:
def test_filters_invalid_and_ipv6(self, caplog):
ips = tnet.parse_fallback_ip_env("149.154.167.220, bad, 2001:67c:4e8:f004::9,149.154.167.220")
assert ips == ["149.154.167.220", "149.154.167.220"]
assert "Ignoring invalid Telegram fallback IP" in caplog.text
assert "Ignoring non-IPv4 Telegram fallback IP" in caplog.text
def test_none_returns_empty(self):
assert tnet.parse_fallback_ip_env(None) == []
def test_empty_string_returns_empty(self):
assert tnet.parse_fallback_ip_env("") == []
def test_whitespace_only_returns_empty(self):
assert tnet.parse_fallback_ip_env(" , , ") == []
def test_single_valid_ip(self):
assert tnet.parse_fallback_ip_env("149.154.167.220") == ["149.154.167.220"]
def test_multiple_valid_ips(self):
ips = tnet.parse_fallback_ip_env("149.154.167.220, 149.154.167.221")
assert ips == ["149.154.167.220", "149.154.167.221"]
def test_rejects_leading_zeros(self, caplog):
"""Leading zeros are ambiguous (octal?) so ipaddress rejects them."""
ips = tnet.parse_fallback_ip_env("149.154.167.010")
assert ips == []
assert "Ignoring invalid" in caplog.text
class TestNormalizeFallbackIps:
def test_deduplication_happens_at_transport_level(self):
"""_normalize does not dedup; TelegramFallbackTransport.__init__ does."""
raw = ["149.154.167.220", "149.154.167.220"]
assert tnet._normalize_fallback_ips(raw) == ["149.154.167.220", "149.154.167.220"]
def test_empty_strings_skipped(self):
assert tnet._normalize_fallback_ips(["", " ", "149.154.167.220"]) == ["149.154.167.220"]
# ═══════════════════════════════════════════════════════════════════════════
# Request rewriting
# ═══════════════════════════════════════════════════════════════════════════
class TestRewriteRequestForIp:
def test_preserves_host_and_sni(self):
request = _telegram_request()
rewritten = tnet._rewrite_request_for_ip(request, "149.154.167.220")
assert rewritten.url.host == "149.154.167.220"
assert rewritten.headers["host"] == "api.telegram.org"
assert rewritten.extensions["sni_hostname"] == "api.telegram.org"
assert rewritten.url.path == "/botTOKEN/getMe"
def test_preserves_method_and_path(self):
request = httpx.Request("POST", "https://api.telegram.org/botTOKEN/sendMessage")
rewritten = tnet._rewrite_request_for_ip(request, "149.154.167.220")
assert rewritten.method == "POST"
assert rewritten.url.path == "/botTOKEN/sendMessage"
# ═══════════════════════════════════════════════════════════════════════════
# Fallback transport core behavior
# ═══════════════════════════════════════════════════════════════════════════
class TestFallbackTransport:
"""Primary path fails → try fallback IPs → stick to whichever works."""
@pytest.mark.asyncio
async def test_falls_back_on_connect_timeout_and_becomes_sticky(self, monkeypatch):
calls = []
behavior = {"api.telegram.org": "timeout", "149.154.167.220": "ok"}
monkeypatch.setattr(tnet.httpx, "AsyncHTTPTransport", _fake_transport_factory(calls, behavior))
transport = tnet.TelegramFallbackTransport(["149.154.167.220"])
resp = await transport.handle_async_request(_telegram_request())
assert resp.status_code == 200
assert transport._sticky_ip == "149.154.167.220"
# First attempt was primary (api.telegram.org), second was fallback
assert calls[0]["url_host"] == "api.telegram.org"
assert calls[1]["url_host"] == "149.154.167.220"
assert calls[1]["host_header"] == "api.telegram.org"
assert calls[1]["sni_hostname"] == "api.telegram.org"
# Second request goes straight to sticky IP
calls.clear()
resp2 = await transport.handle_async_request(_telegram_request())
assert resp2.status_code == 200
assert calls[0]["url_host"] == "149.154.167.220"
@pytest.mark.asyncio
async def test_falls_back_on_connect_error(self, monkeypatch):
calls = []
behavior = {"api.telegram.org": "connect_error", "149.154.167.220": "ok"}
monkeypatch.setattr(tnet.httpx, "AsyncHTTPTransport", _fake_transport_factory(calls, behavior))
transport = tnet.TelegramFallbackTransport(["149.154.167.220"])
resp = await transport.handle_async_request(_telegram_request())
assert resp.status_code == 200
assert transport._sticky_ip == "149.154.167.220"
@pytest.mark.asyncio
async def test_does_not_fallback_on_non_connect_error(self, monkeypatch):
"""Errors like ReadTimeout are not connection issues — don't retry."""
calls = []
behavior = {"api.telegram.org": httpx.ReadTimeout("read timeout"), "149.154.167.220": "ok"}
monkeypatch.setattr(tnet.httpx, "AsyncHTTPTransport", _fake_transport_factory(calls, behavior))
transport = tnet.TelegramFallbackTransport(["149.154.167.220"])
with pytest.raises(httpx.ReadTimeout):
await transport.handle_async_request(_telegram_request())
assert [c["url_host"] for c in calls] == ["api.telegram.org"]
@pytest.mark.asyncio
async def test_all_ips_fail_raises_last_error(self, monkeypatch):
calls = []
behavior = {"api.telegram.org": "timeout", "149.154.167.220": "timeout"}
monkeypatch.setattr(tnet.httpx, "AsyncHTTPTransport", _fake_transport_factory(calls, behavior))
transport = tnet.TelegramFallbackTransport(["149.154.167.220"])
with pytest.raises(httpx.ConnectTimeout):
await transport.handle_async_request(_telegram_request())
assert [c["url_host"] for c in calls] == ["api.telegram.org", "149.154.167.220"]
assert transport._sticky_ip is None
@pytest.mark.asyncio
async def test_multiple_fallback_ips_tried_in_order(self, monkeypatch):
calls = []
behavior = {
"api.telegram.org": "timeout",
"149.154.167.220": "timeout",
"149.154.167.221": "ok",
}
monkeypatch.setattr(tnet.httpx, "AsyncHTTPTransport", _fake_transport_factory(calls, behavior))
transport = tnet.TelegramFallbackTransport(["149.154.167.220", "149.154.167.221"])
resp = await transport.handle_async_request(_telegram_request())
assert resp.status_code == 200
assert transport._sticky_ip == "149.154.167.221"
assert [c["url_host"] for c in calls] == [
"api.telegram.org",
"149.154.167.220",
"149.154.167.221",
]
@pytest.mark.asyncio
async def test_sticky_ip_tried_first_but_falls_through_if_stale(self, monkeypatch):
"""If the sticky IP stops working, the transport retries others."""
calls = []
behavior = {
"api.telegram.org": "timeout",
"149.154.167.220": "ok",
"149.154.167.221": "ok",
}
monkeypatch.setattr(tnet.httpx, "AsyncHTTPTransport", _fake_transport_factory(calls, behavior))
transport = tnet.TelegramFallbackTransport(["149.154.167.220", "149.154.167.221"])
# First request: primary fails → .220 works → becomes sticky
await transport.handle_async_request(_telegram_request())
assert transport._sticky_ip == "149.154.167.220"
# Now .220 goes bad too
calls.clear()
behavior["149.154.167.220"] = "timeout"
resp = await transport.handle_async_request(_telegram_request())
assert resp.status_code == 200
# Tried sticky (.220) first, then fell through to .221
assert [c["url_host"] for c in calls] == ["149.154.167.220", "149.154.167.221"]
assert transport._sticky_ip == "149.154.167.221"
class TestFallbackTransportPassthrough:
"""Requests that don't need fallback behavior."""
@pytest.mark.asyncio
async def test_non_telegram_host_bypasses_fallback(self, monkeypatch):
calls = []
behavior = {}
monkeypatch.setattr(tnet.httpx, "AsyncHTTPTransport", _fake_transport_factory(calls, behavior))
transport = tnet.TelegramFallbackTransport(["149.154.167.220"])
request = httpx.Request("GET", "https://example.com/path")
resp = await transport.handle_async_request(request)
assert resp.status_code == 200
assert calls[0]["url_host"] == "example.com"
assert transport._sticky_ip is None
@pytest.mark.asyncio
async def test_empty_fallback_list_uses_primary_only(self, monkeypatch):
calls = []
behavior = {}
monkeypatch.setattr(tnet.httpx, "AsyncHTTPTransport", _fake_transport_factory(calls, behavior))
transport = tnet.TelegramFallbackTransport([])
resp = await transport.handle_async_request(_telegram_request())
assert resp.status_code == 200
assert calls[0]["url_host"] == "api.telegram.org"
@pytest.mark.asyncio
async def test_primary_succeeds_no_fallback_needed(self, monkeypatch):
calls = []
behavior = {"api.telegram.org": "ok"}
monkeypatch.setattr(tnet.httpx, "AsyncHTTPTransport", _fake_transport_factory(calls, behavior))
transport = tnet.TelegramFallbackTransport(["149.154.167.220"])
resp = await transport.handle_async_request(_telegram_request())
assert resp.status_code == 200
assert transport._sticky_ip is None
assert len(calls) == 1
class TestFallbackTransportInit:
def test_deduplicates_fallback_ips(self, monkeypatch):
monkeypatch.setattr(
tnet.httpx, "AsyncHTTPTransport", lambda **kw: FakeTransport([], {})
)
transport = tnet.TelegramFallbackTransport(["149.154.167.220", "149.154.167.220"])
assert transport._fallback_ips == ["149.154.167.220"]
def test_filters_invalid_ips_at_init(self, monkeypatch):
monkeypatch.setattr(
tnet.httpx, "AsyncHTTPTransport", lambda **kw: FakeTransport([], {})
)
transport = tnet.TelegramFallbackTransport(["149.154.167.220", "not-an-ip"])
assert transport._fallback_ips == ["149.154.167.220"]
class TestFallbackTransportClose:
@pytest.mark.asyncio
async def test_aclose_closes_all_transports(self, monkeypatch):
factory = _fake_transport_factory([], {})
monkeypatch.setattr(tnet.httpx, "AsyncHTTPTransport", factory)
transport = tnet.TelegramFallbackTransport(["149.154.167.220", "149.154.167.221"])
await transport.aclose()
# 1 primary + 2 fallback transports
assert len(factory.instances) == 3
assert all(t.closed for t in factory.instances)
# ═══════════════════════════════════════════════════════════════════════════
# Config layer TELEGRAM_FALLBACK_IPS env → config.extra
# ═══════════════════════════════════════════════════════════════════════════
class TestConfigFallbackIps:
def test_env_var_populates_config_extra(self, monkeypatch):
from gateway.config import GatewayConfig, Platform, PlatformConfig, _apply_env_overrides
monkeypatch.setenv("TELEGRAM_FALLBACK_IPS", "149.154.167.220,149.154.167.221")
config = GatewayConfig(platforms={Platform.TELEGRAM: PlatformConfig(enabled=True, token="tok")})
_apply_env_overrides(config)
assert config.platforms[Platform.TELEGRAM].extra["fallback_ips"] == [
"149.154.167.220", "149.154.167.221",
]
def test_env_var_creates_platform_if_missing(self, monkeypatch):
from gateway.config import GatewayConfig, Platform, _apply_env_overrides
monkeypatch.setenv("TELEGRAM_FALLBACK_IPS", "149.154.167.220")
config = GatewayConfig(platforms={})
_apply_env_overrides(config)
assert Platform.TELEGRAM in config.platforms
assert config.platforms[Platform.TELEGRAM].extra["fallback_ips"] == ["149.154.167.220"]
def test_env_var_strips_whitespace(self, monkeypatch):
from gateway.config import GatewayConfig, Platform, PlatformConfig, _apply_env_overrides
monkeypatch.setenv("TELEGRAM_FALLBACK_IPS", " 149.154.167.220 , 149.154.167.221 ")
config = GatewayConfig(platforms={Platform.TELEGRAM: PlatformConfig(enabled=True, token="tok")})
_apply_env_overrides(config)
assert config.platforms[Platform.TELEGRAM].extra["fallback_ips"] == [
"149.154.167.220", "149.154.167.221",
]
def test_empty_env_var_does_not_populate(self, monkeypatch):
from gateway.config import GatewayConfig, Platform, PlatformConfig, _apply_env_overrides
monkeypatch.setenv("TELEGRAM_FALLBACK_IPS", "")
config = GatewayConfig(platforms={Platform.TELEGRAM: PlatformConfig(enabled=True, token="tok")})
_apply_env_overrides(config)
assert "fallback_ips" not in config.platforms[Platform.TELEGRAM].extra
# ═══════════════════════════════════════════════════════════════════════════
# Adapter layer _fallback_ips() reads config correctly
# ═══════════════════════════════════════════════════════════════════════════
class TestAdapterFallbackIps:
def _make_adapter(self, extra=None):
import sys
from unittest.mock import MagicMock
# Ensure telegram mock is in place
if "telegram" not in sys.modules or not hasattr(sys.modules["telegram"], "__file__"):
mod = MagicMock()
mod.ext.ContextTypes.DEFAULT_TYPE = type(None)
mod.constants.ParseMode.MARKDOWN_V2 = "MarkdownV2"
mod.constants.ChatType.GROUP = "group"
mod.constants.ChatType.SUPERGROUP = "supergroup"
mod.constants.ChatType.CHANNEL = "channel"
mod.constants.ChatType.PRIVATE = "private"
for name in ("telegram", "telegram.ext", "telegram.constants", "telegram.request"):
sys.modules.setdefault(name, mod)
from gateway.config import PlatformConfig
from gateway.platforms.telegram import TelegramAdapter
config = PlatformConfig(enabled=True, token="test-token")
if extra:
config.extra.update(extra)
return TelegramAdapter(config)
def test_list_in_extra(self):
adapter = self._make_adapter(extra={"fallback_ips": ["149.154.167.220"]})
assert adapter._fallback_ips() == ["149.154.167.220"]
def test_csv_string_in_extra(self):
adapter = self._make_adapter(extra={"fallback_ips": "149.154.167.220,149.154.167.221"})
assert adapter._fallback_ips() == ["149.154.167.220", "149.154.167.221"]
def test_empty_extra(self):
adapter = self._make_adapter()
assert adapter._fallback_ips() == []
def test_no_extra_attr(self):
adapter = self._make_adapter()
adapter.config.extra = None
assert adapter._fallback_ips() == []
def test_invalid_ips_filtered(self):
adapter = self._make_adapter(extra={"fallback_ips": ["149.154.167.220", "not-valid"]})
assert adapter._fallback_ips() == ["149.154.167.220"]
# ═══════════════════════════════════════════════════════════════════════════
# DoH auto-discovery
# ═══════════════════════════════════════════════════════════════════════════
def _doh_answer(*ips: str) -> dict:
"""Build a minimal DoH JSON response with A records."""
return {"Answer": [{"type": 1, "data": ip} for ip in ips]}
class FakeDoHClient:
"""Mock httpx.AsyncClient for DoH queries."""
def __init__(self, responses: dict):
# responses: URL prefix → (status, json_body) | Exception
self._responses = responses
self.requests_made: list[dict] = []
@staticmethod
def _make_response(status, body, url):
"""Build an httpx.Response with a request attached (needed for raise_for_status)."""
request = httpx.Request("GET", url)
return httpx.Response(status, json=body, request=request)
async def get(self, url, *, params=None, headers=None, **kwargs):
self.requests_made.append({"url": url, "params": params, "headers": headers})
for prefix, action in self._responses.items():
if url.startswith(prefix):
if isinstance(action, Exception):
raise action
status, body = action
return self._make_response(status, body, url)
return self._make_response(200, {}, url)
async def __aenter__(self):
return self
async def __aexit__(self, *args):
pass
class TestDiscoverFallbackIps:
"""Tests for discover_fallback_ips() — DoH-based auto-discovery."""
def _patch_doh(self, monkeypatch, responses, system_dns_ips=None):
"""Wire up fake DoH client and system DNS."""
client = FakeDoHClient(responses)
monkeypatch.setattr(tnet.httpx, "AsyncClient", lambda **kw: client)
if system_dns_ips is not None:
addrs = [(None, None, None, None, (ip, 443)) for ip in system_dns_ips]
monkeypatch.setattr(tnet.socket, "getaddrinfo", lambda *a, **kw: addrs)
else:
def _fail(*a, **kw):
raise OSError("dns failed")
monkeypatch.setattr(tnet.socket, "getaddrinfo", _fail)
return client
@pytest.mark.asyncio
async def test_google_and_cloudflare_ips_collected(self, monkeypatch):
self._patch_doh(monkeypatch, {
"https://dns.google": (200, _doh_answer("149.154.167.220")),
"https://cloudflare-dns.com": (200, _doh_answer("149.154.167.221")),
}, system_dns_ips=["149.154.166.110"])
ips = await tnet.discover_fallback_ips()
assert "149.154.167.220" in ips
assert "149.154.167.221" in ips
@pytest.mark.asyncio
async def test_system_dns_ip_excluded(self, monkeypatch):
"""The IP from system DNS is the one that doesn't work — exclude it."""
self._patch_doh(monkeypatch, {
"https://dns.google": (200, _doh_answer("149.154.166.110", "149.154.167.220")),
"https://cloudflare-dns.com": (200, _doh_answer("149.154.166.110")),
}, system_dns_ips=["149.154.166.110"])
ips = await tnet.discover_fallback_ips()
assert ips == ["149.154.167.220"]
@pytest.mark.asyncio
async def test_doh_results_deduplicated(self, monkeypatch):
self._patch_doh(monkeypatch, {
"https://dns.google": (200, _doh_answer("149.154.167.220")),
"https://cloudflare-dns.com": (200, _doh_answer("149.154.167.220")),
}, system_dns_ips=["149.154.166.110"])
ips = await tnet.discover_fallback_ips()
assert ips == ["149.154.167.220"]
@pytest.mark.asyncio
async def test_doh_timeout_falls_back_to_seed(self, monkeypatch):
self._patch_doh(monkeypatch, {
"https://dns.google": httpx.TimeoutException("timeout"),
"https://cloudflare-dns.com": httpx.TimeoutException("timeout"),
}, system_dns_ips=["149.154.166.110"])
ips = await tnet.discover_fallback_ips()
assert ips == tnet._SEED_FALLBACK_IPS
@pytest.mark.asyncio
async def test_doh_connect_error_falls_back_to_seed(self, monkeypatch):
self._patch_doh(monkeypatch, {
"https://dns.google": httpx.ConnectError("refused"),
"https://cloudflare-dns.com": httpx.ConnectError("refused"),
}, system_dns_ips=["149.154.166.110"])
ips = await tnet.discover_fallback_ips()
assert ips == tnet._SEED_FALLBACK_IPS
@pytest.mark.asyncio
async def test_doh_malformed_json_falls_back_to_seed(self, monkeypatch):
self._patch_doh(monkeypatch, {
"https://dns.google": (200, {"Status": 0}), # no Answer key
"https://cloudflare-dns.com": (200, {"garbage": True}),
}, system_dns_ips=["149.154.166.110"])
ips = await tnet.discover_fallback_ips()
assert ips == tnet._SEED_FALLBACK_IPS
@pytest.mark.asyncio
async def test_one_provider_fails_other_succeeds(self, monkeypatch):
self._patch_doh(monkeypatch, {
"https://dns.google": httpx.TimeoutException("timeout"),
"https://cloudflare-dns.com": (200, _doh_answer("149.154.167.220")),
}, system_dns_ips=["149.154.166.110"])
ips = await tnet.discover_fallback_ips()
assert ips == ["149.154.167.220"]
@pytest.mark.asyncio
async def test_system_dns_failure_keeps_all_doh_ips(self, monkeypatch):
"""If system DNS fails, nothing gets excluded — all DoH IPs kept."""
self._patch_doh(monkeypatch, {
"https://dns.google": (200, _doh_answer("149.154.166.110", "149.154.167.220")),
"https://cloudflare-dns.com": (200, _doh_answer()),
}, system_dns_ips=None) # triggers OSError
ips = await tnet.discover_fallback_ips()
assert "149.154.166.110" in ips
assert "149.154.167.220" in ips
@pytest.mark.asyncio
async def test_all_doh_ips_same_as_system_dns_uses_seed(self, monkeypatch):
"""DoH returns only the same blocked IP — seed list is the fallback."""
self._patch_doh(monkeypatch, {
"https://dns.google": (200, _doh_answer("149.154.166.110")),
"https://cloudflare-dns.com": (200, _doh_answer("149.154.166.110")),
}, system_dns_ips=["149.154.166.110"])
ips = await tnet.discover_fallback_ips()
assert ips == tnet._SEED_FALLBACK_IPS
@pytest.mark.asyncio
async def test_cloudflare_gets_accept_header(self, monkeypatch):
client = self._patch_doh(monkeypatch, {
"https://dns.google": (200, _doh_answer("149.154.167.220")),
"https://cloudflare-dns.com": (200, _doh_answer("149.154.167.221")),
}, system_dns_ips=["149.154.166.110"])
await tnet.discover_fallback_ips()
cf_reqs = [r for r in client.requests_made if "cloudflare" in r["url"]]
assert cf_reqs
assert cf_reqs[0]["headers"]["Accept"] == "application/dns-json"
@pytest.mark.asyncio
async def test_non_a_records_ignored(self, monkeypatch):
"""AAAA records (type 28) and CNAME (type 5) should be skipped."""
answer = {
"Answer": [
{"type": 5, "data": "telegram.org"}, # CNAME
{"type": 28, "data": "2001:67c:4e8:f004::9"}, # AAAA
{"type": 1, "data": "149.154.167.220"}, # A ✓
]
}
self._patch_doh(monkeypatch, {
"https://dns.google": (200, answer),
"https://cloudflare-dns.com": (200, _doh_answer()),
}, system_dns_ips=["149.154.166.110"])
ips = await tnet.discover_fallback_ips()
assert ips == ["149.154.167.220"]
@pytest.mark.asyncio
async def test_invalid_ip_in_doh_response_skipped(self, monkeypatch):
answer = {"Answer": [
{"type": 1, "data": "not-an-ip"},
{"type": 1, "data": "149.154.167.220"},
]}
self._patch_doh(monkeypatch, {
"https://dns.google": (200, answer),
"https://cloudflare-dns.com": (200, _doh_answer()),
}, system_dns_ips=["149.154.166.110"])
ips = await tnet.discover_fallback_ips()
assert ips == ["149.154.167.220"]

View file

@ -27,7 +27,7 @@ def _ensure_telegram_mock():
telegram_mod.constants.ChatType.CHANNEL = "channel"
telegram_mod.constants.ChatType.PRIVATE = "private"
for name in ("telegram", "telegram.ext", "telegram.constants"):
for name in ("telegram", "telegram.ext", "telegram.constants", "telegram.request"):
sys.modules.setdefault(name, telegram_mod)
@ -36,6 +36,14 @@ _ensure_telegram_mock()
from gateway.platforms.telegram import TelegramAdapter # noqa: E402
@pytest.fixture(autouse=True)
def _no_auto_discovery(monkeypatch):
"""Disable DoH auto-discovery so connect() uses the plain builder chain."""
async def _noop():
return []
monkeypatch.setattr("gateway.platforms.telegram.discover_fallback_ips", _noop)
def _make_adapter() -> TelegramAdapter:
return TelegramAdapter(PlatformConfig(enabled=True, token="test-token"))

View file

@ -25,7 +25,7 @@ def _ensure_telegram_mock():
mod.constants.ChatType.SUPERGROUP = "supergroup"
mod.constants.ChatType.CHANNEL = "channel"
mod.constants.ChatType.PRIVATE = "private"
for name in ("telegram", "telegram.ext", "telegram.constants"):
for name in ("telegram", "telegram.ext", "telegram.constants", "telegram.request"):
sys.modules.setdefault(name, mod)