mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-27 11:22:03 +00:00
fix(telegram): wire platform_httpx_limits into general-pool HTTPXRequest (#31599)
PTB's HTTPXRequest builds its httpx.AsyncClient with `limits = httpx.Limits(max_connections=connection_pool_size)` and no keepalive tuning, so httpx's default keepalive_expiry=5.0 applies. Behind an HTTP proxy (Cloudflare Warp etc.) a peer-initiated FIN can sit in CLOSE_WAIT longer than that, leaking fds in the general request pool (_request[1], which routes bot.send_message/set_my_commands) — the pool _drain_polling_connections never resets. Telegram was the lone holdout adapter not using the shared #18451 CLOSE_WAIT helper. Wire gateway.platforms._http_client_limits.platform_httpx_limits() into the httpx client across ALL THREE request-construction branches — fallback-transport, proxy, and plain — via httpx_kwargs["limits"], which PTB spreads last into its client kwargs so our tuned limits win. PTB's connection_pool_size (max_connections) is preserved; only keepalive behaviour is tightened (max_keepalive_connections + keepalive_expiry<5.0). The fix is macOS-import-safe: no Linux-only socket TCP_KEEPIDLE/INTVL/CNT constants at module scope (unlike the broken candidate which crashed on import on the reporter's OS), and it patches the actual proxy path the repro hits rather than TelegramFallbackTransport, which the proxy repro never instantiates. Adds a mutation-survivable behavior-contract test asserting every HTTPXRequest built by connect() receives httpx_kwargs["limits"] with keepalive_expiry < httpx's 5.0 default, across both the proxy and plain branches. Reverting the limits wiring fails the test. Co-authored-by: indigokarasu <mx.indigo.karasu@gmail.com>
This commit is contained in:
parent
aaa2e2cb88
commit
4b7f3826c2
2 changed files with 230 additions and 6 deletions
|
|
@ -2204,6 +2204,43 @@ class TelegramAdapter(BasePlatformAdapter):
|
|||
"write_timeout": _env_float("HERMES_TELEGRAM_HTTP_WRITE_TIMEOUT", 20.0),
|
||||
}
|
||||
|
||||
# CLOSE_WAIT fd leak (#31599, same class as #18451): PTB's
|
||||
# HTTPXRequest builds the underlying httpx.AsyncClient with
|
||||
# `limits = httpx.Limits(max_connections=connection_pool_size)`
|
||||
# and *no* keepalive tuning, so httpx's default
|
||||
# keepalive_expiry=5.0 applies. Behind an HTTP proxy (Cloudflare
|
||||
# Warp etc.) a peer-initiated FIN can sit in CLOSE_WAIT longer
|
||||
# than that, leaking fds in the general request pool (_request[1])
|
||||
# which _drain_polling_connections never resets. Wire the shared
|
||||
# platform_httpx_limits() helper into the httpx client so idle
|
||||
# keepalive sockets drain aggressively, while preserving PTB's
|
||||
# max_connections (= connection_pool_size). httpx_kwargs is spread
|
||||
# last into PTB's client kwargs, so `limits` here wins.
|
||||
from gateway.platforms._http_client_limits import platform_httpx_limits
|
||||
|
||||
_base_limits = platform_httpx_limits()
|
||||
if _base_limits is not None:
|
||||
import httpx as _httpx
|
||||
|
||||
_pool_limits = _httpx.Limits(
|
||||
max_connections=request_kwargs["connection_pool_size"],
|
||||
max_keepalive_connections=_base_limits.max_keepalive_connections,
|
||||
keepalive_expiry=_base_limits.keepalive_expiry,
|
||||
)
|
||||
else: # pragma: no cover — httpx always present alongside PTB
|
||||
_pool_limits = None
|
||||
|
||||
def _with_limits(httpx_kwargs: Optional[dict] = None) -> dict:
|
||||
"""Merge tuned keepalive limits into httpx client kwargs.
|
||||
|
||||
A caller-supplied ``limits`` (none today) is left untouched;
|
||||
otherwise the CLOSE_WAIT-safe limits are injected.
|
||||
"""
|
||||
kwargs = dict(httpx_kwargs or {})
|
||||
if _pool_limits is not None and "limits" not in kwargs:
|
||||
kwargs["limits"] = _pool_limits
|
||||
return kwargs
|
||||
|
||||
disable_fallback = (os.getenv("HERMES_TELEGRAM_DISABLE_FALLBACK_IPS", "").strip().lower() in {"1", "true", "yes", "on"})
|
||||
fallback_ips = self._fallback_ips()
|
||||
if not fallback_ips:
|
||||
|
|
@ -2226,21 +2263,31 @@ class TelegramAdapter(BasePlatformAdapter):
|
|||
# polling reconnect + bot API bootstrap/delete_webhook calls.
|
||||
request = HTTPXRequest(
|
||||
**request_kwargs,
|
||||
httpx_kwargs={"transport": TelegramFallbackTransport(fallback_ips)},
|
||||
httpx_kwargs=_with_limits(
|
||||
{"transport": TelegramFallbackTransport(fallback_ips)}
|
||||
),
|
||||
)
|
||||
get_updates_request = HTTPXRequest(
|
||||
**request_kwargs,
|
||||
httpx_kwargs={"transport": TelegramFallbackTransport(fallback_ips)},
|
||||
httpx_kwargs=_with_limits(
|
||||
{"transport": TelegramFallbackTransport(fallback_ips)}
|
||||
),
|
||||
)
|
||||
elif proxy_url:
|
||||
logger.info("[%s] Proxy detected; passing explicitly to HTTPXRequest: %s", self.name, proxy_url)
|
||||
request = HTTPXRequest(**request_kwargs, proxy=proxy_url)
|
||||
get_updates_request = HTTPXRequest(**request_kwargs, proxy=proxy_url)
|
||||
request = HTTPXRequest(
|
||||
**request_kwargs, proxy=proxy_url, httpx_kwargs=_with_limits()
|
||||
)
|
||||
get_updates_request = HTTPXRequest(
|
||||
**request_kwargs, proxy=proxy_url, httpx_kwargs=_with_limits()
|
||||
)
|
||||
else:
|
||||
if disable_fallback:
|
||||
logger.info("[%s] Telegram fallback-IP transport disabled via env", self.name)
|
||||
request = HTTPXRequest(**request_kwargs)
|
||||
get_updates_request = HTTPXRequest(**request_kwargs)
|
||||
request = HTTPXRequest(**request_kwargs, httpx_kwargs=_with_limits())
|
||||
get_updates_request = HTTPXRequest(
|
||||
**request_kwargs, httpx_kwargs=_with_limits()
|
||||
)
|
||||
|
||||
builder = builder.request(request).get_updates_request(get_updates_request)
|
||||
self._app = builder.build()
|
||||
|
|
|
|||
177
tests/gateway/test_telegram_closewait_limits_31599.py
Normal file
177
tests/gateway/test_telegram_closewait_limits_31599.py
Normal file
|
|
@ -0,0 +1,177 @@
|
|||
"""Regression test for #31599 — Telegram general-pool CLOSE_WAIT fd leak.
|
||||
|
||||
Background
|
||||
----------
|
||||
PTB's ``telegram.request.HTTPXRequest`` builds the underlying
|
||||
``httpx.AsyncClient`` with ``limits = httpx.Limits(max_connections=...)``
|
||||
and *no* keepalive tuning, so httpx's default ``keepalive_expiry=5.0``
|
||||
applies. Behind an HTTP proxy (Cloudflare Warp etc.) a peer-initiated
|
||||
FIN can sit in ``CLOSE_WAIT`` longer than that, leaking fds in the
|
||||
general request pool (``_request[1]`` — the pool that routes
|
||||
``bot.send_message`` / ``set_my_commands``), which
|
||||
``_drain_polling_connections`` never resets.
|
||||
|
||||
The fix wires the shared ``gateway.platforms._http_client_limits``
|
||||
``platform_httpx_limits()`` helper into *every* HTTPXRequest the adapter
|
||||
builds — the fallback-transport branch, the proxy branch, and the plain
|
||||
branch — so idle keepalive sockets drain aggressively.
|
||||
|
||||
Contract asserted here (mutation-survivable)
|
||||
---------------------------------------------
|
||||
Every ``HTTPXRequest`` constructed by ``TelegramAdapter.connect()`` must
|
||||
receive ``httpx_kwargs["limits"]`` that is an ``httpx.Limits`` with a
|
||||
``keepalive_expiry`` strictly below httpx's 5.0 default and a positive,
|
||||
bounded ``max_keepalive_connections``. Reverting the limits wiring (so
|
||||
HTTPXRequest falls back to PTB's default 5.0s keepalive) fails this test.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import sys
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import httpx
|
||||
import pytest
|
||||
|
||||
from gateway.config import PlatformConfig
|
||||
|
||||
|
||||
def _ensure_telegram_mock():
|
||||
if "telegram" in sys.modules and hasattr(sys.modules["telegram"], "__file__"):
|
||||
return
|
||||
telegram_mod = MagicMock()
|
||||
telegram_mod.ext.ContextTypes.DEFAULT_TYPE = type(None)
|
||||
telegram_mod.constants.ParseMode.MARKDOWN_V2 = "MarkdownV2"
|
||||
telegram_mod.constants.ChatType.GROUP = "group"
|
||||
telegram_mod.constants.ChatType.SUPERGROUP = "supergroup"
|
||||
telegram_mod.constants.ChatType.CHANNEL = "channel"
|
||||
telegram_mod.constants.ChatType.PRIVATE = "private"
|
||||
for name in ("telegram", "telegram.ext", "telegram.constants", "telegram.request"):
|
||||
sys.modules.setdefault(name, telegram_mod)
|
||||
|
||||
|
||||
_ensure_telegram_mock()
|
||||
|
||||
from plugins.platforms.telegram import adapter as tg_adapter # noqa: E402
|
||||
from plugins.platforms.telegram.adapter import TelegramAdapter # noqa: E402
|
||||
|
||||
|
||||
class _StopConnect(Exception):
|
||||
"""Sentinel raised to abort connect() once requests are built."""
|
||||
|
||||
|
||||
class _RecordingHTTPXRequest:
|
||||
"""Stand-in for PTB's HTTPXRequest that records constructor kwargs."""
|
||||
|
||||
instances: list = []
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.args = args
|
||||
self.kwargs = kwargs
|
||||
_RecordingHTTPXRequest.instances.append(self)
|
||||
|
||||
|
||||
def _make_adapter() -> TelegramAdapter:
|
||||
return TelegramAdapter(PlatformConfig(enabled=True, token="test-token"))
|
||||
|
||||
|
||||
def _drive_connect(monkeypatch, *, proxy_url):
|
||||
"""Run connect() far enough to build the HTTPXRequests, then abort.
|
||||
|
||||
Returns the list of recorded _RecordingHTTPXRequest instances.
|
||||
"""
|
||||
_RecordingHTTPXRequest.instances = []
|
||||
|
||||
# No DoH auto-discovery → exercise the proxy / plain branches, not fallback.
|
||||
async def _no_fallback():
|
||||
return []
|
||||
|
||||
monkeypatch.setattr(tg_adapter, "discover_fallback_ips", _no_fallback)
|
||||
monkeypatch.setattr(
|
||||
tg_adapter, "resolve_proxy_url", lambda *a, **k: proxy_url
|
||||
)
|
||||
# Replace the real HTTPXRequest with our recorder.
|
||||
monkeypatch.setattr(tg_adapter, "HTTPXRequest", _RecordingHTTPXRequest)
|
||||
|
||||
adapter = _make_adapter()
|
||||
# Skip the cross-process token lock.
|
||||
monkeypatch.setattr(adapter, "_acquire_platform_lock", lambda *a, **k: True)
|
||||
# Ensure the adapter reports no statically-configured fallback IPs.
|
||||
monkeypatch.setattr(adapter, "_fallback_ips", lambda: [])
|
||||
|
||||
# builder.request(...).get_updates_request(...).build() must be harmless;
|
||||
# make build() raise our sentinel so connect() stops right after the
|
||||
# HTTPXRequests are constructed (before any real network/init).
|
||||
fake_built_app = MagicMock()
|
||||
fake_built_app.initialize = MagicMock(side_effect=_StopConnect)
|
||||
|
||||
chainable = MagicMock()
|
||||
chainable.token.return_value = chainable
|
||||
chainable.base_url.return_value = chainable
|
||||
chainable.base_file_url.return_value = chainable
|
||||
chainable.local_mode.return_value = chainable
|
||||
chainable.request.return_value = chainable
|
||||
chainable.get_updates_request.return_value = chainable
|
||||
chainable.build.side_effect = _StopConnect
|
||||
|
||||
builder_root = MagicMock()
|
||||
builder_root.builder.return_value = chainable
|
||||
monkeypatch.setattr(tg_adapter, "Application", builder_root)
|
||||
|
||||
try:
|
||||
asyncio.run(adapter.connect())
|
||||
except _StopConnect:
|
||||
pass
|
||||
except Exception:
|
||||
# connect() wraps work in a try; if it swallows the sentinel and
|
||||
# continues to real init, the recorded instances are still valid.
|
||||
pass
|
||||
|
||||
return list(_RecordingHTTPXRequest.instances)
|
||||
|
||||
|
||||
def _assert_keepalive_tight(instances):
|
||||
assert instances, "connect() built no HTTPXRequest — test setup is wrong"
|
||||
for inst in instances:
|
||||
limits = inst.kwargs.get("httpx_kwargs", {}).get("limits")
|
||||
assert isinstance(limits, httpx.Limits), (
|
||||
"HTTPXRequest must receive httpx_kwargs['limits'] = httpx.Limits "
|
||||
"wired from platform_httpx_limits() (#31599). Missing → PTB falls "
|
||||
"back to default keepalive_expiry=5.0 and leaks CLOSE_WAIT fds."
|
||||
)
|
||||
# The whole point: keepalive must be tighter than httpx's 5.0 default.
|
||||
assert limits.keepalive_expiry is not None
|
||||
assert limits.keepalive_expiry < 5.0, (
|
||||
"keepalive_expiry must be < httpx default 5.0 so idle/CLOSE_WAIT "
|
||||
"sockets drain promptly behind a proxy (#31599)."
|
||||
)
|
||||
assert limits.max_keepalive_connections is not None
|
||||
assert 1 <= limits.max_keepalive_connections <= 50
|
||||
# PTB's connection_pool_size (max_connections) must be preserved.
|
||||
assert limits.max_connections is not None and limits.max_connections > 0
|
||||
|
||||
|
||||
def test_proxy_branch_general_pool_has_tight_keepalive(monkeypatch):
|
||||
"""The proxy path the #31599 reporter hit must wire tuned limits."""
|
||||
instances = _drive_connect(monkeypatch, proxy_url="http://127.0.0.1:9/")
|
||||
# Both the general request pool and the get_updates pool are built here.
|
||||
assert len(instances) >= 2
|
||||
_assert_keepalive_tight(instances)
|
||||
# Sanity: the proxy was actually threaded through (we're on the proxy branch).
|
||||
assert any(inst.kwargs.get("proxy") == "http://127.0.0.1:9/" for inst in instances)
|
||||
|
||||
|
||||
def test_plain_branch_general_pool_has_tight_keepalive(monkeypatch):
|
||||
"""No proxy / no fallback IPs → plain branch must also wire tuned limits."""
|
||||
instances = _drive_connect(monkeypatch, proxy_url=None)
|
||||
assert len(instances) >= 2
|
||||
_assert_keepalive_tight(instances)
|
||||
|
||||
|
||||
def test_limits_keepalive_below_ptb_default_is_the_contract():
|
||||
"""Document the invariant independent of adapter wiring: the shared
|
||||
helper itself must tighten keepalive below httpx's 5.0 default."""
|
||||
from gateway.platforms._http_client_limits import platform_httpx_limits
|
||||
|
||||
limits = platform_httpx_limits()
|
||||
assert isinstance(limits, httpx.Limits)
|
||||
assert limits.keepalive_expiry is not None and limits.keepalive_expiry < 5.0
|
||||
Loading…
Add table
Add a link
Reference in a new issue