diff --git a/gateway/platforms/_http_client_limits.py b/gateway/platforms/_http_client_limits.py new file mode 100644 index 0000000000..4d8a7c86e9 --- /dev/null +++ b/gateway/platforms/_http_client_limits.py @@ -0,0 +1,84 @@ +"""Shared HTTP client factory for long-lived platform adapters. + +Gateway messaging platforms (QQ Bot, Feishu, WeCom, DingTalk, Signal, +BlueBubbles, WeCom-callback) keep a persistent ``httpx.AsyncClient`` +alive for the adapter's lifetime. That amortises TLS/connection setup +across many API calls, but it also means the process's file-descriptor +pressure is sensitive to how aggressively the pool recycles idle keep- +alive connections. + +httpx's default ``keepalive_expiry`` is 5 seconds. On macOS behind +Cloudflare Warp (and other transparent proxies), peer-initiated FIN can +sit in ``CLOSE_WAIT`` longer than that before the local socket actually +drains — which, multiplied across 7 long-lived adapters plus the LLM +client and MCP clients, walks straight into the default 256 fd limit. +See #18451. + +``platform_httpx_limits()`` returns a tighter ``httpx.Limits`` the +adapter factories use instead of the httpx default. The values chosen: + +* ``max_keepalive_connections=10`` — plenty for any single adapter; + platform APIs rarely parallelise beyond this. +* ``keepalive_expiry=2.0`` — close idle sockets aggressively so a + proxy's lingering CLOSE_WAIT window can't starve the process. + +Override via ``HERMES_GATEWAY_HTTPX_KEEPALIVE_EXPIRY`` / +``HERMES_GATEWAY_HTTPX_MAX_KEEPALIVE`` env vars when tuning under load. +""" + +from __future__ import annotations + +import os + +try: + import httpx +except ImportError: # pragma: no cover — optional dep + httpx = None # type: ignore[assignment] + + +_DEFAULT_KEEPALIVE_EXPIRY_S = 2.0 +_DEFAULT_MAX_KEEPALIVE = 10 + + +def platform_httpx_limits() -> "httpx.Limits | None": + """Return ``httpx.Limits`` tuned for persistent platform-adapter clients. + + Returns ``None`` when httpx isn't importable, so callers can fall + back to httpx's built-in default without a hard dependency on this + helper being reachable. + """ + if httpx is None: + return None + + def _env_float(name: str, default: float) -> float: + raw = os.environ.get(name, "").strip() + if not raw: + return default + try: + val = float(raw) + except (TypeError, ValueError): + return default + return val if val > 0 else default + + def _env_int(name: str, default: int) -> int: + raw = os.environ.get(name, "").strip() + if not raw: + return default + try: + val = int(raw) + except (TypeError, ValueError): + return default + return val if val > 0 else default + + keepalive_expiry = _env_float( + "HERMES_GATEWAY_HTTPX_KEEPALIVE_EXPIRY", _DEFAULT_KEEPALIVE_EXPIRY_S + ) + max_keepalive = _env_int( + "HERMES_GATEWAY_HTTPX_MAX_KEEPALIVE", _DEFAULT_MAX_KEEPALIVE + ) + + return httpx.Limits( + max_keepalive_connections=max_keepalive, + # Leave max_connections at httpx default (100) — plenty of headroom. + keepalive_expiry=keepalive_expiry, + ) diff --git a/gateway/platforms/bluebubbles.py b/gateway/platforms/bluebubbles.py index afcbf1a7e4..31120785c0 100644 --- a/gateway/platforms/bluebubbles.py +++ b/gateway/platforms/bluebubbles.py @@ -162,7 +162,9 @@ class BlueBubblesAdapter(BasePlatformAdapter): return False from aiohttp import web - self.client = httpx.AsyncClient(timeout=30.0) + # Tighter keepalive so idle CLOSE_WAIT drains promptly (#18451). + from gateway.platforms._http_client_limits import platform_httpx_limits + self.client = httpx.AsyncClient(timeout=30.0, limits=platform_httpx_limits()) try: await self._api_get("/api/v1/ping") info = await self._api_get("/api/v1/server/info") diff --git a/gateway/platforms/dingtalk.py b/gateway/platforms/dingtalk.py index 3037e402b2..f1520e22c6 100644 --- a/gateway/platforms/dingtalk.py +++ b/gateway/platforms/dingtalk.py @@ -228,7 +228,11 @@ class DingTalkAdapter(BasePlatformAdapter): return False try: - self._http_client = httpx.AsyncClient(timeout=30.0) + # Tighter keepalive so idle CLOSE_WAIT drains promptly (#18451). + from gateway.platforms._http_client_limits import platform_httpx_limits + self._http_client = httpx.AsyncClient( + timeout=30.0, limits=platform_httpx_limits(), + ) credential = dingtalk_stream.Credential( self._client_id, self._client_secret diff --git a/gateway/platforms/qqbot/adapter.py b/gateway/platforms/qqbot/adapter.py index 10e1f62e72..c6e5d428c6 100644 --- a/gateway/platforms/qqbot/adapter.py +++ b/gateway/platforms/qqbot/adapter.py @@ -243,10 +243,14 @@ class QQAdapter(BasePlatformAdapter): return False try: + # Tighter keepalive pool so idle CLOSE_WAIT sockets drain + # faster behind proxies like Cloudflare Warp (#18451). + from gateway.platforms._http_client_limits import platform_httpx_limits self._http_client = httpx.AsyncClient( timeout=30.0, follow_redirects=True, event_hooks={"response": [_ssrf_redirect_guard]}, + limits=platform_httpx_limits(), ) # 1. Get access token diff --git a/gateway/platforms/signal.py b/gateway/platforms/signal.py index 225430600d..77d3c18cb6 100644 --- a/gateway/platforms/signal.py +++ b/gateway/platforms/signal.py @@ -248,7 +248,9 @@ class SignalAdapter(BasePlatformAdapter): except Exception as e: logger.warning("Signal: Could not acquire phone lock (non-fatal): %s", e) - self.client = httpx.AsyncClient(timeout=30.0) + # Tighter keepalive so idle CLOSE_WAIT drains promptly (#18451). + from gateway.platforms._http_client_limits import platform_httpx_limits + self.client = httpx.AsyncClient(timeout=30.0, limits=platform_httpx_limits()) try: # Health check — verify signal-cli daemon is reachable try: diff --git a/gateway/platforms/wecom.py b/gateway/platforms/wecom.py index 7ba0fa21b9..453b95a717 100644 --- a/gateway/platforms/wecom.py +++ b/gateway/platforms/wecom.py @@ -206,7 +206,11 @@ class WeComAdapter(BasePlatformAdapter): return False try: - self._http_client = httpx.AsyncClient(timeout=30.0, follow_redirects=True) + # Tighter keepalive so idle CLOSE_WAIT drains promptly (#18451). + from gateway.platforms._http_client_limits import platform_httpx_limits + self._http_client = httpx.AsyncClient( + timeout=30.0, follow_redirects=True, limits=platform_httpx_limits(), + ) await self._open_connection() self._mark_connected() self._listen_task = asyncio.create_task(self._listen_loop()) diff --git a/gateway/platforms/wecom_callback.py b/gateway/platforms/wecom_callback.py index 5440792dea..139c67fe7c 100644 --- a/gateway/platforms/wecom_callback.py +++ b/gateway/platforms/wecom_callback.py @@ -119,7 +119,9 @@ class WecomCallbackAdapter(BasePlatformAdapter): pass try: - self._http_client = httpx.AsyncClient(timeout=20.0) + # Tighter keepalive so idle CLOSE_WAIT drains promptly (#18451). + from gateway.platforms._http_client_limits import platform_httpx_limits + self._http_client = httpx.AsyncClient(timeout=20.0, limits=platform_httpx_limits()) self._app = web.Application() self._app.router.add_get("/health", self._handle_health) self._app.router.add_get(self._path, self._handle_verify) diff --git a/gateway/platforms/whatsapp.py b/gateway/platforms/whatsapp.py index b3e655a51b..921dd70d72 100644 --- a/gateway/platforms/whatsapp.py +++ b/gateway/platforms/whatsapp.py @@ -902,11 +902,15 @@ class WhatsAppAdapter(BasePlatformAdapter): try: import aiohttp - await self._http_session.post( + # Must wrap in `async with` — a bare `await session.post(...)` + # leaves the response object alive until GC, holding its TCP + # socket in CLOSE_WAIT. See #18451. + async with self._http_session.post( f"http://127.0.0.1:{self._bridge_port}/typing", json={"chatId": chat_id}, timeout=aiohttp.ClientTimeout(total=5) - ) + ): + pass except Exception: pass # Ignore typing indicator failures diff --git a/tests/gateway/test_platform_http_client_limits.py b/tests/gateway/test_platform_http_client_limits.py new file mode 100644 index 0000000000..fe613fb1f0 --- /dev/null +++ b/tests/gateway/test_platform_http_client_limits.py @@ -0,0 +1,114 @@ +"""Tests for the shared httpx.Limits helper that all long-lived platform +adapters use to tighten their keep-alive pool. + +Context: #18451 — on macOS behind Cloudflare Warp, httpx's default +keepalive_expiry=5s let idle CLOSE_WAIT sockets accumulate across +multiple long-lived gateway adapters (QQ Bot, Feishu, WeCom, DingTalk, +Signal, BlueBubbles, WeCom-callback) until the process hit the default +256 fd limit. These tests just verify the helper returns sensibly +tuned limits and respects env-var overrides; the actual fd-pressure +behaviour is only observable at runtime under load. +""" + +from __future__ import annotations + +import os + +import pytest + + +@pytest.fixture(autouse=True) +def _clear_env(monkeypatch): + monkeypatch.delenv("HERMES_GATEWAY_HTTPX_KEEPALIVE_EXPIRY", raising=False) + monkeypatch.delenv("HERMES_GATEWAY_HTTPX_MAX_KEEPALIVE", raising=False) + + +def test_returns_none_when_httpx_unavailable(monkeypatch): + """If httpx can't be imported, the helper returns None so callers + fall back to httpx's built-in Limits default without raising.""" + import gateway.platforms._http_client_limits as mod + monkeypatch.setattr(mod, "httpx", None) + assert mod.platform_httpx_limits() is None + + +def test_default_limits_tighten_keepalive_below_httpx_default(): + import httpx + from gateway.platforms._http_client_limits import platform_httpx_limits + limits = platform_httpx_limits() + assert isinstance(limits, httpx.Limits) + # httpx default keepalive_expiry is 5.0 — ours must be shorter so + # CLOSE_WAIT sockets drain promptly behind proxies like Warp. + assert limits.keepalive_expiry is not None + assert limits.keepalive_expiry < 5.0 + # max_keepalive_connections must be positive and reasonable for a + # single adapter (platform APIs rarely parallelise beyond ~10). + assert limits.max_keepalive_connections is not None + assert 1 <= limits.max_keepalive_connections <= 50 + + +def test_env_override_keepalive_expiry(monkeypatch): + monkeypatch.setenv("HERMES_GATEWAY_HTTPX_KEEPALIVE_EXPIRY", "7.5") + from gateway.platforms._http_client_limits import platform_httpx_limits + limits = platform_httpx_limits() + assert limits.keepalive_expiry == 7.5 + + +def test_env_override_max_keepalive(monkeypatch): + monkeypatch.setenv("HERMES_GATEWAY_HTTPX_MAX_KEEPALIVE", "25") + from gateway.platforms._http_client_limits import platform_httpx_limits + limits = platform_httpx_limits() + assert limits.max_keepalive_connections == 25 + + +def test_env_override_rejects_garbage(monkeypatch): + """Malformed env values fall back to defaults rather than raising.""" + monkeypatch.setenv("HERMES_GATEWAY_HTTPX_KEEPALIVE_EXPIRY", "not-a-number") + monkeypatch.setenv("HERMES_GATEWAY_HTTPX_MAX_KEEPALIVE", "-3") + from gateway.platforms._http_client_limits import platform_httpx_limits + limits = platform_httpx_limits() + # Non-positive / non-numeric → fell back to defaults (not the override values) + assert limits.keepalive_expiry is not None and limits.keepalive_expiry > 0 + assert limits.max_keepalive_connections is not None + assert limits.max_keepalive_connections > 0 + + +def test_helper_is_importable_from_every_platform_that_uses_it(): + """Every persistent-httpx-client platform adapter imports this helper. + If any of those modules fails to import, this test surfaces it before + the regression shows up as a runtime adapter-startup crash.""" + # Just importing exercises the helper's import path for each adapter. + import gateway.platforms.qqbot.adapter # noqa: F401 + import gateway.platforms.wecom # noqa: F401 + import gateway.platforms.dingtalk # noqa: F401 + import gateway.platforms.signal # noqa: F401 + import gateway.platforms.bluebubbles # noqa: F401 + import gateway.platforms.wecom_callback # noqa: F401 + + +class TestWhatsappTypingLeakFix: + """#18451 — whatsapp.send_typing previously used a bare + `await self._http_session.post(...)` which leaked the aiohttp + response object until GC, holding its TCP socket in CLOSE_WAIT. + Must now wrap the call in `async with` so the response is + released immediately when the call returns. + + We verify by inspecting the source text rather than exercising + the coroutine — the test suite would otherwise need a live + aiohttp server, and the contract we care about is structural. + """ + + def test_bare_await_removed(self): + import inspect + import gateway.platforms.whatsapp as mod + + src = inspect.getsource(mod.WhatsAppAdapter.send_typing) + # The fix must be structural: the post() call is inside an + # `async with`, not a bare `await`. + assert "async with self._http_session.post(" in src, ( + "send_typing must wrap self._http_session.post(...) in " + "`async with` to release the aiohttp response socket " + "(#18451). Otherwise the response sits in CLOSE_WAIT " + "until GC." + ) + # The old bare-await form must be gone. + assert "await self._http_session.post(" not in src