diff --git a/plugins/platforms/telegram/adapter.py b/plugins/platforms/telegram/adapter.py index 2de169ee092..b4458d0d7d5 100644 --- a/plugins/platforms/telegram/adapter.py +++ b/plugins/platforms/telegram/adapter.py @@ -2204,6 +2204,43 @@ class TelegramAdapter(BasePlatformAdapter): "write_timeout": _env_float("HERMES_TELEGRAM_HTTP_WRITE_TIMEOUT", 20.0), } + # CLOSE_WAIT fd leak (#31599, same class as #18451): PTB's + # HTTPXRequest builds the underlying httpx.AsyncClient with + # `limits = httpx.Limits(max_connections=connection_pool_size)` + # and *no* keepalive tuning, so httpx's default + # keepalive_expiry=5.0 applies. Behind an HTTP proxy (Cloudflare + # Warp etc.) a peer-initiated FIN can sit in CLOSE_WAIT longer + # than that, leaking fds in the general request pool (_request[1]) + # which _drain_polling_connections never resets. Wire the shared + # platform_httpx_limits() helper into the httpx client so idle + # keepalive sockets drain aggressively, while preserving PTB's + # max_connections (= connection_pool_size). httpx_kwargs is spread + # last into PTB's client kwargs, so `limits` here wins. + from gateway.platforms._http_client_limits import platform_httpx_limits + + _base_limits = platform_httpx_limits() + if _base_limits is not None: + import httpx as _httpx + + _pool_limits = _httpx.Limits( + max_connections=request_kwargs["connection_pool_size"], + max_keepalive_connections=_base_limits.max_keepalive_connections, + keepalive_expiry=_base_limits.keepalive_expiry, + ) + else: # pragma: no cover — httpx always present alongside PTB + _pool_limits = None + + def _with_limits(httpx_kwargs: Optional[dict] = None) -> dict: + """Merge tuned keepalive limits into httpx client kwargs. + + A caller-supplied ``limits`` (none today) is left untouched; + otherwise the CLOSE_WAIT-safe limits are injected. + """ + kwargs = dict(httpx_kwargs or {}) + if _pool_limits is not None and "limits" not in kwargs: + kwargs["limits"] = _pool_limits + return kwargs + disable_fallback = (os.getenv("HERMES_TELEGRAM_DISABLE_FALLBACK_IPS", "").strip().lower() in {"1", "true", "yes", "on"}) fallback_ips = self._fallback_ips() if not fallback_ips: @@ -2226,21 +2263,31 @@ class TelegramAdapter(BasePlatformAdapter): # polling reconnect + bot API bootstrap/delete_webhook calls. request = HTTPXRequest( **request_kwargs, - httpx_kwargs={"transport": TelegramFallbackTransport(fallback_ips)}, + httpx_kwargs=_with_limits( + {"transport": TelegramFallbackTransport(fallback_ips)} + ), ) get_updates_request = HTTPXRequest( **request_kwargs, - httpx_kwargs={"transport": TelegramFallbackTransport(fallback_ips)}, + httpx_kwargs=_with_limits( + {"transport": TelegramFallbackTransport(fallback_ips)} + ), ) elif proxy_url: logger.info("[%s] Proxy detected; passing explicitly to HTTPXRequest: %s", self.name, proxy_url) - request = HTTPXRequest(**request_kwargs, proxy=proxy_url) - get_updates_request = HTTPXRequest(**request_kwargs, proxy=proxy_url) + request = HTTPXRequest( + **request_kwargs, proxy=proxy_url, httpx_kwargs=_with_limits() + ) + get_updates_request = HTTPXRequest( + **request_kwargs, proxy=proxy_url, httpx_kwargs=_with_limits() + ) else: if disable_fallback: logger.info("[%s] Telegram fallback-IP transport disabled via env", self.name) - request = HTTPXRequest(**request_kwargs) - get_updates_request = HTTPXRequest(**request_kwargs) + request = HTTPXRequest(**request_kwargs, httpx_kwargs=_with_limits()) + get_updates_request = HTTPXRequest( + **request_kwargs, httpx_kwargs=_with_limits() + ) builder = builder.request(request).get_updates_request(get_updates_request) self._app = builder.build() diff --git a/tests/gateway/test_telegram_closewait_limits_31599.py b/tests/gateway/test_telegram_closewait_limits_31599.py new file mode 100644 index 00000000000..1cef73a120b --- /dev/null +++ b/tests/gateway/test_telegram_closewait_limits_31599.py @@ -0,0 +1,177 @@ +"""Regression test for #31599 — Telegram general-pool CLOSE_WAIT fd leak. + +Background +---------- +PTB's ``telegram.request.HTTPXRequest`` builds the underlying +``httpx.AsyncClient`` with ``limits = httpx.Limits(max_connections=...)`` +and *no* keepalive tuning, so httpx's default ``keepalive_expiry=5.0`` +applies. Behind an HTTP proxy (Cloudflare Warp etc.) a peer-initiated +FIN can sit in ``CLOSE_WAIT`` longer than that, leaking fds in the +general request pool (``_request[1]`` — the pool that routes +``bot.send_message`` / ``set_my_commands``), which +``_drain_polling_connections`` never resets. + +The fix wires the shared ``gateway.platforms._http_client_limits`` +``platform_httpx_limits()`` helper into *every* HTTPXRequest the adapter +builds — the fallback-transport branch, the proxy branch, and the plain +branch — so idle keepalive sockets drain aggressively. + +Contract asserted here (mutation-survivable) +--------------------------------------------- +Every ``HTTPXRequest`` constructed by ``TelegramAdapter.connect()`` must +receive ``httpx_kwargs["limits"]`` that is an ``httpx.Limits`` with a +``keepalive_expiry`` strictly below httpx's 5.0 default and a positive, +bounded ``max_keepalive_connections``. Reverting the limits wiring (so +HTTPXRequest falls back to PTB's default 5.0s keepalive) fails this test. +""" + +import asyncio +import sys +from unittest.mock import MagicMock, patch + +import httpx +import pytest + +from gateway.config import PlatformConfig + + +def _ensure_telegram_mock(): + if "telegram" in sys.modules and hasattr(sys.modules["telegram"], "__file__"): + return + telegram_mod = MagicMock() + telegram_mod.ext.ContextTypes.DEFAULT_TYPE = type(None) + telegram_mod.constants.ParseMode.MARKDOWN_V2 = "MarkdownV2" + telegram_mod.constants.ChatType.GROUP = "group" + telegram_mod.constants.ChatType.SUPERGROUP = "supergroup" + telegram_mod.constants.ChatType.CHANNEL = "channel" + telegram_mod.constants.ChatType.PRIVATE = "private" + for name in ("telegram", "telegram.ext", "telegram.constants", "telegram.request"): + sys.modules.setdefault(name, telegram_mod) + + +_ensure_telegram_mock() + +from plugins.platforms.telegram import adapter as tg_adapter # noqa: E402 +from plugins.platforms.telegram.adapter import TelegramAdapter # noqa: E402 + + +class _StopConnect(Exception): + """Sentinel raised to abort connect() once requests are built.""" + + +class _RecordingHTTPXRequest: + """Stand-in for PTB's HTTPXRequest that records constructor kwargs.""" + + instances: list = [] + + def __init__(self, *args, **kwargs): + self.args = args + self.kwargs = kwargs + _RecordingHTTPXRequest.instances.append(self) + + +def _make_adapter() -> TelegramAdapter: + return TelegramAdapter(PlatformConfig(enabled=True, token="test-token")) + + +def _drive_connect(monkeypatch, *, proxy_url): + """Run connect() far enough to build the HTTPXRequests, then abort. + + Returns the list of recorded _RecordingHTTPXRequest instances. + """ + _RecordingHTTPXRequest.instances = [] + + # No DoH auto-discovery → exercise the proxy / plain branches, not fallback. + async def _no_fallback(): + return [] + + monkeypatch.setattr(tg_adapter, "discover_fallback_ips", _no_fallback) + monkeypatch.setattr( + tg_adapter, "resolve_proxy_url", lambda *a, **k: proxy_url + ) + # Replace the real HTTPXRequest with our recorder. + monkeypatch.setattr(tg_adapter, "HTTPXRequest", _RecordingHTTPXRequest) + + adapter = _make_adapter() + # Skip the cross-process token lock. + monkeypatch.setattr(adapter, "_acquire_platform_lock", lambda *a, **k: True) + # Ensure the adapter reports no statically-configured fallback IPs. + monkeypatch.setattr(adapter, "_fallback_ips", lambda: []) + + # builder.request(...).get_updates_request(...).build() must be harmless; + # make build() raise our sentinel so connect() stops right after the + # HTTPXRequests are constructed (before any real network/init). + fake_built_app = MagicMock() + fake_built_app.initialize = MagicMock(side_effect=_StopConnect) + + chainable = MagicMock() + chainable.token.return_value = chainable + chainable.base_url.return_value = chainable + chainable.base_file_url.return_value = chainable + chainable.local_mode.return_value = chainable + chainable.request.return_value = chainable + chainable.get_updates_request.return_value = chainable + chainable.build.side_effect = _StopConnect + + builder_root = MagicMock() + builder_root.builder.return_value = chainable + monkeypatch.setattr(tg_adapter, "Application", builder_root) + + try: + asyncio.run(adapter.connect()) + except _StopConnect: + pass + except Exception: + # connect() wraps work in a try; if it swallows the sentinel and + # continues to real init, the recorded instances are still valid. + pass + + return list(_RecordingHTTPXRequest.instances) + + +def _assert_keepalive_tight(instances): + assert instances, "connect() built no HTTPXRequest — test setup is wrong" + for inst in instances: + limits = inst.kwargs.get("httpx_kwargs", {}).get("limits") + assert isinstance(limits, httpx.Limits), ( + "HTTPXRequest must receive httpx_kwargs['limits'] = httpx.Limits " + "wired from platform_httpx_limits() (#31599). Missing → PTB falls " + "back to default keepalive_expiry=5.0 and leaks CLOSE_WAIT fds." + ) + # The whole point: keepalive must be tighter than httpx's 5.0 default. + assert limits.keepalive_expiry is not None + assert limits.keepalive_expiry < 5.0, ( + "keepalive_expiry must be < httpx default 5.0 so idle/CLOSE_WAIT " + "sockets drain promptly behind a proxy (#31599)." + ) + assert limits.max_keepalive_connections is not None + assert 1 <= limits.max_keepalive_connections <= 50 + # PTB's connection_pool_size (max_connections) must be preserved. + assert limits.max_connections is not None and limits.max_connections > 0 + + +def test_proxy_branch_general_pool_has_tight_keepalive(monkeypatch): + """The proxy path the #31599 reporter hit must wire tuned limits.""" + instances = _drive_connect(monkeypatch, proxy_url="http://127.0.0.1:9/") + # Both the general request pool and the get_updates pool are built here. + assert len(instances) >= 2 + _assert_keepalive_tight(instances) + # Sanity: the proxy was actually threaded through (we're on the proxy branch). + assert any(inst.kwargs.get("proxy") == "http://127.0.0.1:9/" for inst in instances) + + +def test_plain_branch_general_pool_has_tight_keepalive(monkeypatch): + """No proxy / no fallback IPs → plain branch must also wire tuned limits.""" + instances = _drive_connect(monkeypatch, proxy_url=None) + assert len(instances) >= 2 + _assert_keepalive_tight(instances) + + +def test_limits_keepalive_below_ptb_default_is_the_contract(): + """Document the invariant independent of adapter wiring: the shared + helper itself must tighten keepalive below httpx's 5.0 default.""" + from gateway.platforms._http_client_limits import platform_httpx_limits + + limits = platform_httpx_limits() + assert isinstance(limits, httpx.Limits) + assert limits.keepalive_expiry is not None and limits.keepalive_expiry < 5.0