mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-05 02:31:47 +00:00
fix(gateway): tighten httpx keepalive and close whatsapp typing-response leak (#18451)
Two mitigations for the CLOSE_WAIT accumulation reported against QQ Bot + Feishu on macOS behind Cloudflare Warp. 1. Shared httpx.Limits helper (gateway/platforms/_http_client_limits.py). Every long-lived platform adapter now constructs httpx.AsyncClient with max_keepalive_connections=10 and keepalive_expiry=2.0, vs httpx's default of unbounded keepalive pool and 5.0s expiry. On macOS/Warp the default 5s window let idle keepalive sockets sit in CLOSE_WAIT long enough for seven persistent adapters (QQ Bot, WeCom, DingTalk, Signal, BlueBubbles, WeCom-callback, plus the transient Feishu helper) to compound to the 256-fd ulimit. Tunable via HERMES_GATEWAY_HTTPX_KEEPALIVE_EXPIRY and HERMES_GATEWAY_HTTPX_MAX_KEEPALIVE env vars. 2. whatsapp.send_typing aiohttp leak. The call was 'await self._http_session.post(...)' with no 'async with' and no variable capture — the ClientResponse went out of scope unclosed, holding its TCP socket in CLOSE_WAIT until GC. Fixed by wrapping in 'async with'. This was the only bare-await aiohttp leak in the gateway/tools/plugins tree per audit; all other aiohttp sites use the context-manager pattern correctly. The underlying reporter also saw Feishu SDK (lark-oapi) connections in CLOSE_WAIT — those are inside the SDK and out of our direct control, but tightening httpx keepalive across adapters reduces the aggregate pool pressure regardless of which individual adapter leaks.
This commit is contained in:
parent
38dd057e91
commit
762eb79f1e
9 changed files with 227 additions and 7 deletions
84
gateway/platforms/_http_client_limits.py
Normal file
84
gateway/platforms/_http_client_limits.py
Normal file
|
|
@ -0,0 +1,84 @@
|
|||
"""Shared HTTP client factory for long-lived platform adapters.
|
||||
|
||||
Gateway messaging platforms (QQ Bot, Feishu, WeCom, DingTalk, Signal,
|
||||
BlueBubbles, WeCom-callback) keep a persistent ``httpx.AsyncClient``
|
||||
alive for the adapter's lifetime. That amortises TLS/connection setup
|
||||
across many API calls, but it also means the process's file-descriptor
|
||||
pressure is sensitive to how aggressively the pool recycles idle keep-
|
||||
alive connections.
|
||||
|
||||
httpx's default ``keepalive_expiry`` is 5 seconds. On macOS behind
|
||||
Cloudflare Warp (and other transparent proxies), peer-initiated FIN can
|
||||
sit in ``CLOSE_WAIT`` longer than that before the local socket actually
|
||||
drains — which, multiplied across 7 long-lived adapters plus the LLM
|
||||
client and MCP clients, walks straight into the default 256 fd limit.
|
||||
See #18451.
|
||||
|
||||
``platform_httpx_limits()`` returns a tighter ``httpx.Limits`` the
|
||||
adapter factories use instead of the httpx default. The values chosen:
|
||||
|
||||
* ``max_keepalive_connections=10`` — plenty for any single adapter;
|
||||
platform APIs rarely parallelise beyond this.
|
||||
* ``keepalive_expiry=2.0`` — close idle sockets aggressively so a
|
||||
proxy's lingering CLOSE_WAIT window can't starve the process.
|
||||
|
||||
Override via ``HERMES_GATEWAY_HTTPX_KEEPALIVE_EXPIRY`` /
|
||||
``HERMES_GATEWAY_HTTPX_MAX_KEEPALIVE`` env vars when tuning under load.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
|
||||
try:
|
||||
import httpx
|
||||
except ImportError: # pragma: no cover — optional dep
|
||||
httpx = None # type: ignore[assignment]
|
||||
|
||||
|
||||
_DEFAULT_KEEPALIVE_EXPIRY_S = 2.0
|
||||
_DEFAULT_MAX_KEEPALIVE = 10
|
||||
|
||||
|
||||
def platform_httpx_limits() -> "httpx.Limits | None":
|
||||
"""Return ``httpx.Limits`` tuned for persistent platform-adapter clients.
|
||||
|
||||
Returns ``None`` when httpx isn't importable, so callers can fall
|
||||
back to httpx's built-in default without a hard dependency on this
|
||||
helper being reachable.
|
||||
"""
|
||||
if httpx is None:
|
||||
return None
|
||||
|
||||
def _env_float(name: str, default: float) -> float:
|
||||
raw = os.environ.get(name, "").strip()
|
||||
if not raw:
|
||||
return default
|
||||
try:
|
||||
val = float(raw)
|
||||
except (TypeError, ValueError):
|
||||
return default
|
||||
return val if val > 0 else default
|
||||
|
||||
def _env_int(name: str, default: int) -> int:
|
||||
raw = os.environ.get(name, "").strip()
|
||||
if not raw:
|
||||
return default
|
||||
try:
|
||||
val = int(raw)
|
||||
except (TypeError, ValueError):
|
||||
return default
|
||||
return val if val > 0 else default
|
||||
|
||||
keepalive_expiry = _env_float(
|
||||
"HERMES_GATEWAY_HTTPX_KEEPALIVE_EXPIRY", _DEFAULT_KEEPALIVE_EXPIRY_S
|
||||
)
|
||||
max_keepalive = _env_int(
|
||||
"HERMES_GATEWAY_HTTPX_MAX_KEEPALIVE", _DEFAULT_MAX_KEEPALIVE
|
||||
)
|
||||
|
||||
return httpx.Limits(
|
||||
max_keepalive_connections=max_keepalive,
|
||||
# Leave max_connections at httpx default (100) — plenty of headroom.
|
||||
keepalive_expiry=keepalive_expiry,
|
||||
)
|
||||
|
|
@ -162,7 +162,9 @@ class BlueBubblesAdapter(BasePlatformAdapter):
|
|||
return False
|
||||
from aiohttp import web
|
||||
|
||||
self.client = httpx.AsyncClient(timeout=30.0)
|
||||
# Tighter keepalive so idle CLOSE_WAIT drains promptly (#18451).
|
||||
from gateway.platforms._http_client_limits import platform_httpx_limits
|
||||
self.client = httpx.AsyncClient(timeout=30.0, limits=platform_httpx_limits())
|
||||
try:
|
||||
await self._api_get("/api/v1/ping")
|
||||
info = await self._api_get("/api/v1/server/info")
|
||||
|
|
|
|||
|
|
@ -228,7 +228,11 @@ class DingTalkAdapter(BasePlatformAdapter):
|
|||
return False
|
||||
|
||||
try:
|
||||
self._http_client = httpx.AsyncClient(timeout=30.0)
|
||||
# Tighter keepalive so idle CLOSE_WAIT drains promptly (#18451).
|
||||
from gateway.platforms._http_client_limits import platform_httpx_limits
|
||||
self._http_client = httpx.AsyncClient(
|
||||
timeout=30.0, limits=platform_httpx_limits(),
|
||||
)
|
||||
|
||||
credential = dingtalk_stream.Credential(
|
||||
self._client_id, self._client_secret
|
||||
|
|
|
|||
|
|
@ -243,10 +243,14 @@ class QQAdapter(BasePlatformAdapter):
|
|||
return False
|
||||
|
||||
try:
|
||||
# Tighter keepalive pool so idle CLOSE_WAIT sockets drain
|
||||
# faster behind proxies like Cloudflare Warp (#18451).
|
||||
from gateway.platforms._http_client_limits import platform_httpx_limits
|
||||
self._http_client = httpx.AsyncClient(
|
||||
timeout=30.0,
|
||||
follow_redirects=True,
|
||||
event_hooks={"response": [_ssrf_redirect_guard]},
|
||||
limits=platform_httpx_limits(),
|
||||
)
|
||||
|
||||
# 1. Get access token
|
||||
|
|
|
|||
|
|
@ -248,7 +248,9 @@ class SignalAdapter(BasePlatformAdapter):
|
|||
except Exception as e:
|
||||
logger.warning("Signal: Could not acquire phone lock (non-fatal): %s", e)
|
||||
|
||||
self.client = httpx.AsyncClient(timeout=30.0)
|
||||
# Tighter keepalive so idle CLOSE_WAIT drains promptly (#18451).
|
||||
from gateway.platforms._http_client_limits import platform_httpx_limits
|
||||
self.client = httpx.AsyncClient(timeout=30.0, limits=platform_httpx_limits())
|
||||
try:
|
||||
# Health check — verify signal-cli daemon is reachable
|
||||
try:
|
||||
|
|
|
|||
|
|
@ -206,7 +206,11 @@ class WeComAdapter(BasePlatformAdapter):
|
|||
return False
|
||||
|
||||
try:
|
||||
self._http_client = httpx.AsyncClient(timeout=30.0, follow_redirects=True)
|
||||
# Tighter keepalive so idle CLOSE_WAIT drains promptly (#18451).
|
||||
from gateway.platforms._http_client_limits import platform_httpx_limits
|
||||
self._http_client = httpx.AsyncClient(
|
||||
timeout=30.0, follow_redirects=True, limits=platform_httpx_limits(),
|
||||
)
|
||||
await self._open_connection()
|
||||
self._mark_connected()
|
||||
self._listen_task = asyncio.create_task(self._listen_loop())
|
||||
|
|
|
|||
|
|
@ -119,7 +119,9 @@ class WecomCallbackAdapter(BasePlatformAdapter):
|
|||
pass
|
||||
|
||||
try:
|
||||
self._http_client = httpx.AsyncClient(timeout=20.0)
|
||||
# Tighter keepalive so idle CLOSE_WAIT drains promptly (#18451).
|
||||
from gateway.platforms._http_client_limits import platform_httpx_limits
|
||||
self._http_client = httpx.AsyncClient(timeout=20.0, limits=platform_httpx_limits())
|
||||
self._app = web.Application()
|
||||
self._app.router.add_get("/health", self._handle_health)
|
||||
self._app.router.add_get(self._path, self._handle_verify)
|
||||
|
|
|
|||
|
|
@ -902,11 +902,15 @@ class WhatsAppAdapter(BasePlatformAdapter):
|
|||
try:
|
||||
import aiohttp
|
||||
|
||||
await self._http_session.post(
|
||||
# Must wrap in `async with` — a bare `await session.post(...)`
|
||||
# leaves the response object alive until GC, holding its TCP
|
||||
# socket in CLOSE_WAIT. See #18451.
|
||||
async with self._http_session.post(
|
||||
f"http://127.0.0.1:{self._bridge_port}/typing",
|
||||
json={"chatId": chat_id},
|
||||
timeout=aiohttp.ClientTimeout(total=5)
|
||||
)
|
||||
):
|
||||
pass
|
||||
except Exception:
|
||||
pass # Ignore typing indicator failures
|
||||
|
||||
|
|
|
|||
114
tests/gateway/test_platform_http_client_limits.py
Normal file
114
tests/gateway/test_platform_http_client_limits.py
Normal file
|
|
@ -0,0 +1,114 @@
|
|||
"""Tests for the shared httpx.Limits helper that all long-lived platform
|
||||
adapters use to tighten their keep-alive pool.
|
||||
|
||||
Context: #18451 — on macOS behind Cloudflare Warp, httpx's default
|
||||
keepalive_expiry=5s let idle CLOSE_WAIT sockets accumulate across
|
||||
multiple long-lived gateway adapters (QQ Bot, Feishu, WeCom, DingTalk,
|
||||
Signal, BlueBubbles, WeCom-callback) until the process hit the default
|
||||
256 fd limit. These tests just verify the helper returns sensibly
|
||||
tuned limits and respects env-var overrides; the actual fd-pressure
|
||||
behaviour is only observable at runtime under load.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _clear_env(monkeypatch):
|
||||
monkeypatch.delenv("HERMES_GATEWAY_HTTPX_KEEPALIVE_EXPIRY", raising=False)
|
||||
monkeypatch.delenv("HERMES_GATEWAY_HTTPX_MAX_KEEPALIVE", raising=False)
|
||||
|
||||
|
||||
def test_returns_none_when_httpx_unavailable(monkeypatch):
|
||||
"""If httpx can't be imported, the helper returns None so callers
|
||||
fall back to httpx's built-in Limits default without raising."""
|
||||
import gateway.platforms._http_client_limits as mod
|
||||
monkeypatch.setattr(mod, "httpx", None)
|
||||
assert mod.platform_httpx_limits() is None
|
||||
|
||||
|
||||
def test_default_limits_tighten_keepalive_below_httpx_default():
|
||||
import httpx
|
||||
from gateway.platforms._http_client_limits import platform_httpx_limits
|
||||
limits = platform_httpx_limits()
|
||||
assert isinstance(limits, httpx.Limits)
|
||||
# httpx default keepalive_expiry is 5.0 — ours must be shorter so
|
||||
# CLOSE_WAIT sockets drain promptly behind proxies like Warp.
|
||||
assert limits.keepalive_expiry is not None
|
||||
assert limits.keepalive_expiry < 5.0
|
||||
# max_keepalive_connections must be positive and reasonable for a
|
||||
# single adapter (platform APIs rarely parallelise beyond ~10).
|
||||
assert limits.max_keepalive_connections is not None
|
||||
assert 1 <= limits.max_keepalive_connections <= 50
|
||||
|
||||
|
||||
def test_env_override_keepalive_expiry(monkeypatch):
|
||||
monkeypatch.setenv("HERMES_GATEWAY_HTTPX_KEEPALIVE_EXPIRY", "7.5")
|
||||
from gateway.platforms._http_client_limits import platform_httpx_limits
|
||||
limits = platform_httpx_limits()
|
||||
assert limits.keepalive_expiry == 7.5
|
||||
|
||||
|
||||
def test_env_override_max_keepalive(monkeypatch):
|
||||
monkeypatch.setenv("HERMES_GATEWAY_HTTPX_MAX_KEEPALIVE", "25")
|
||||
from gateway.platforms._http_client_limits import platform_httpx_limits
|
||||
limits = platform_httpx_limits()
|
||||
assert limits.max_keepalive_connections == 25
|
||||
|
||||
|
||||
def test_env_override_rejects_garbage(monkeypatch):
|
||||
"""Malformed env values fall back to defaults rather than raising."""
|
||||
monkeypatch.setenv("HERMES_GATEWAY_HTTPX_KEEPALIVE_EXPIRY", "not-a-number")
|
||||
monkeypatch.setenv("HERMES_GATEWAY_HTTPX_MAX_KEEPALIVE", "-3")
|
||||
from gateway.platforms._http_client_limits import platform_httpx_limits
|
||||
limits = platform_httpx_limits()
|
||||
# Non-positive / non-numeric → fell back to defaults (not the override values)
|
||||
assert limits.keepalive_expiry is not None and limits.keepalive_expiry > 0
|
||||
assert limits.max_keepalive_connections is not None
|
||||
assert limits.max_keepalive_connections > 0
|
||||
|
||||
|
||||
def test_helper_is_importable_from_every_platform_that_uses_it():
|
||||
"""Every persistent-httpx-client platform adapter imports this helper.
|
||||
If any of those modules fails to import, this test surfaces it before
|
||||
the regression shows up as a runtime adapter-startup crash."""
|
||||
# Just importing exercises the helper's import path for each adapter.
|
||||
import gateway.platforms.qqbot.adapter # noqa: F401
|
||||
import gateway.platforms.wecom # noqa: F401
|
||||
import gateway.platforms.dingtalk # noqa: F401
|
||||
import gateway.platforms.signal # noqa: F401
|
||||
import gateway.platforms.bluebubbles # noqa: F401
|
||||
import gateway.platforms.wecom_callback # noqa: F401
|
||||
|
||||
|
||||
class TestWhatsappTypingLeakFix:
|
||||
"""#18451 — whatsapp.send_typing previously used a bare
|
||||
`await self._http_session.post(...)` which leaked the aiohttp
|
||||
response object until GC, holding its TCP socket in CLOSE_WAIT.
|
||||
Must now wrap the call in `async with` so the response is
|
||||
released immediately when the call returns.
|
||||
|
||||
We verify by inspecting the source text rather than exercising
|
||||
the coroutine — the test suite would otherwise need a live
|
||||
aiohttp server, and the contract we care about is structural.
|
||||
"""
|
||||
|
||||
def test_bare_await_removed(self):
|
||||
import inspect
|
||||
import gateway.platforms.whatsapp as mod
|
||||
|
||||
src = inspect.getsource(mod.WhatsAppAdapter.send_typing)
|
||||
# The fix must be structural: the post() call is inside an
|
||||
# `async with`, not a bare `await`.
|
||||
assert "async with self._http_session.post(" in src, (
|
||||
"send_typing must wrap self._http_session.post(...) in "
|
||||
"`async with` to release the aiohttp response socket "
|
||||
"(#18451). Otherwise the response sits in CLOSE_WAIT "
|
||||
"until GC."
|
||||
)
|
||||
# The old bare-await form must be gone.
|
||||
assert "await self._http_session.post(" not in src
|
||||
Loading…
Add table
Add a link
Reference in a new issue