diff --git a/gateway/platforms/yuanbao.py b/gateway/platforms/yuanbao.py index 6dc54dbcd50..7c34f1453cb 100644 --- a/gateway/platforms/yuanbao.py +++ b/gateway/platforms/yuanbao.py @@ -120,6 +120,16 @@ AUTH_TIMEOUT_SECONDS = 10.0 MAX_RECONNECT_ATTEMPTS = 100 DEFAULT_SEND_TIMEOUT = 30.0 # WS biz request timeout +# Upper bound on the WS close handshake during teardown (#40383). The +# websockets connection's own close_timeout (5s) blocks until the server +# echoes the close frame; an idle/unresponsive server never replies, stalling +# gateway shutdown by the full timeout. Bounding the close await here keeps +# teardown fast — a responsive server completes the handshake in well under a +# second, so this only caps the pathological hang. Also bounds the reconnect / +# connect-failure cleanup paths that reuse _cleanup_ws(), where a graceful +# close is unnecessary anyway (the socket is being discarded to redial). +WS_CLOSE_TIMEOUT_S = 1.0 + # Close codes that indicate permanent errors — do NOT reconnect. NO_RECONNECT_CLOSE_CODES = {4012, 4013, 4014, 4018, 4019, 4021} @@ -3445,12 +3455,22 @@ class ConnectionManager: return False async def _cleanup_ws(self) -> None: - """Close and clear the WebSocket connection.""" + """Close and clear the WebSocket connection, bounded by + ``WS_CLOSE_TIMEOUT_S`` so an unresponsive server can't stall teardown + (see the constant's definition for the full rationale).""" ws = self._ws self._ws = None if ws is not None: try: - await ws.close() + await asyncio.wait_for(ws.close(), timeout=WS_CLOSE_TIMEOUT_S) + except asyncio.TimeoutError: + # Server never echoed the close frame within the bound; drop the + # connection. websockets force-closes the transport on cancel, + # and at shutdown the loop is tearing down anyway. + logger.debug( + "[%s] WS close handshake exceeded %.1fs — dropping connection", + self._adapter.name, WS_CLOSE_TIMEOUT_S, + ) except Exception: pass diff --git a/tests/test_yuanbao_shutdown.py b/tests/test_yuanbao_shutdown.py new file mode 100644 index 00000000000..be535f46c70 --- /dev/null +++ b/tests/test_yuanbao_shutdown.py @@ -0,0 +1,117 @@ +"""test_yuanbao_shutdown.py - Yuanbao adapter shutdown teardown timing. + +Regression coverage for #40383: a non-responsive Yuanbao WS server must not +stall gateway shutdown. ``websockets`` ``ws.close()`` blocks up to the +connection's ``close_timeout`` (5s) waiting for the server's close-frame echo; +on an idle shutdown the server never replies, so ``_cleanup_ws`` used to wait +the full ~5s. The cleanup path now bounds the close await so a hung server +cannot stall teardown. + +These tests assert the *bounding/timing* contract of ``_cleanup_ws`` using +lightweight fakes; force-closing the underlying TCP transport on cancellation +is ``websockets``' responsibility (and harmless at shutdown, where the loop is +tearing down regardless), so it is intentionally out of scope here. +""" + +import sys +import os +import asyncio + +_REPO_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +if _REPO_ROOT not in sys.path: + sys.path.insert(0, _REPO_ROOT) + +import pytest +from gateway.config import PlatformConfig +from gateway.platforms.yuanbao import ( + YuanbaoAdapter, + ConnectionManager, + WS_CLOSE_TIMEOUT_S, +) + + +def make_config(**kwargs): + extra = kwargs.pop("extra", {}) + extra.setdefault("app_id", "test_key") + extra.setdefault("app_secret", "test_secret") + extra.setdefault("ws_url", "wss://test.example.com/ws") + extra.setdefault("api_domain", "https://test.example.com") + return PlatformConfig(extra=extra, **kwargs) + + +class _HangingWS: + """Fake WS whose close() never gets a server echo — sleeps past the bound.""" + + def __init__(self, sleep_s: float): + self._sleep_s = sleep_s + self.close_called = False + + async def close(self): + self.close_called = True + await asyncio.sleep(self._sleep_s) + + +class _FastWS: + """Fake WS whose close() returns promptly (responsive server).""" + + def __init__(self): + self.close_called = False + + async def close(self): + self.close_called = True + + +class _RaisingWS: + async def close(self): + raise RuntimeError("connection already reset") + + +def _connection() -> ConnectionManager: + return YuanbaoAdapter(make_config())._connection + + +@pytest.mark.asyncio +async def test_cleanup_ws_does_not_stall_on_hung_server(): + """A server that never echoes the close frame must not stall teardown.""" + cm = _connection() + hung = _HangingWS(sleep_s=WS_CLOSE_TIMEOUT_S + 4.0) + cm._ws = hung + + loop = asyncio.get_running_loop() + start = loop.time() + await cm._cleanup_ws() + elapsed = loop.time() - start + + assert hung.close_called + assert cm._ws is None + # Bounded by WS_CLOSE_TIMEOUT_S (+ small scheduling slack), not the 5s + # close_timeout the server would otherwise hold us to. + assert elapsed < WS_CLOSE_TIMEOUT_S + 1.0 + + +@pytest.mark.asyncio +async def test_cleanup_ws_fast_path_returns_immediately(): + """A responsive server completes the handshake well under the bound.""" + cm = _connection() + fast = _FastWS() + cm._ws = fast + + loop = asyncio.get_running_loop() + start = loop.time() + await cm._cleanup_ws() + elapsed = loop.time() - start + + assert fast.close_called + assert cm._ws is None + assert elapsed < 1.0 + + +@pytest.mark.asyncio +async def test_cleanup_ws_swallows_close_errors(): + """A close() that raises must still clear the ws reference.""" + cm = _connection() + cm._ws = _RaisingWS() + + await cm._cleanup_ws() + + assert cm._ws is None