From 4b06c98fe4f4686f89873933ac616cb6e76b298d Mon Sep 17 00:00:00 2001 From: Fearvox Date: Tue, 2 Jun 2026 15:06:13 -0700 Subject: [PATCH] fix(gateway): close ResponseStore + dispose unowned adapter on reconnect failure MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three separate code paths in the gateway's platform reconnect loop leaked file descriptors every retry, exhausting the default 2560-fd ulimit in ~12 hours of continuous failure and turning the gateway into a zombie that raises OSError: [Errno 24] on every open() (#37011). Root cause: * APIServerAdapter.__init__ opens a ResponseStore SQLite connection that holds 2 fds (db file + WAL sidecar). * APIServerAdapter.disconnect() previously only stopped the aiohttp web server — the ResponseStore connection was never closed. * The reconnect watcher in _platform_reconnect_watcher constructs a fresh adapter on every retry attempt. When the connect call fails (3 paths: non-retryable error, retryable error, exception during connect) the adapter is dropped without ever being installed on self.adapters, so nothing else calls its disconnect(). Result: the 2 ResponseStore fds stay open until GC sweeps the unreachable object, which Python's cyclic GC does not do promptly for asyncio-bound native handles. 2 fds × 1 retry × (3600s / 300s backoff cap) ≈ 12 fds/hour. 2560 fds / 12 fds/hr ≈ 12h to ulimit exhaustion. Fix: * APIServerAdapter.disconnect() now also calls self._response_store.close() (with a try/except so a SQLite close failure doesn't abort the aiohttp teardown). * New module-level helper _dispose_unused_adapter(adapter) in gateway/run.py that calls adapter.disconnect() and swallows any exception (so half-constructed adapters whose __init__ crashed don't kill the watcher loop). * _platform_reconnect_watcher calls _dispose_unused_adapter() in all three failure paths: non-retryable, retryable, and the except Exception arm. adapter = None is initialized before the try so the except arm can see the partial construction. Tests: * New file tests/gateway/test_platform_reconnect_fd_leak.py with 7 regression tests covering all three failure paths, the _dispose_unused_adapter helper (None + raising-disconnect cases), and the APIServerAdapter ResponseStore close behavior (success + close-exception cases). The _CountingAdapter fixture tracks disconnect() invocations and an _open_fds counter that is decremented on dispose, so the assertion is the literal observable behavior of the leak. Refs: - Closes #37011 (the original fd-leak report) - Supersedes #37018, #37110, #37238, #37260, #37394 (7 competing open PRs all addressing the same root cause from different angles; none of them rebased cleanly against current main, and none covered all three failure paths in one fix with regression tests for both the watcher and the platform-level close behavior) --- gateway/platforms/api_server.py | 19 +- gateway/run.py | 66 ++++ .../test_platform_reconnect_fd_leak.py | 337 ++++++++++++++++++ 3 files changed, 421 insertions(+), 1 deletion(-) create mode 100644 tests/gateway/test_platform_reconnect_fd_leak.py diff --git a/gateway/platforms/api_server.py b/gateway/platforms/api_server.py index 1c4cb6df728..22bf199b3b0 100644 --- a/gateway/platforms/api_server.py +++ b/gateway/platforms/api_server.py @@ -4195,8 +4195,25 @@ class APIServerAdapter(BasePlatformAdapter): return False async def disconnect(self) -> None: - """Stop the aiohttp web server.""" + """Stop the aiohttp web server and release all owned resources. + + Closes the ResponseStore SQLite connection in addition to stopping + the aiohttp web server. Without this, every adapter instance leaks + 2 file descriptors (the database file and its WAL sidecar) — the + reconnect loop in ``gateway.run`` constructs a fresh adapter on + every retry, so 2 fds/retry × 300s backoff cap ≈ 12 fds/hour, which + exhausts the default 2560 fd limit after ~12h of failed reconnects + and turns the whole gateway into a zombie + (OSError: [Errno 24] Too many open files, #37011). + """ self._mark_disconnected() + if self._response_store is not None: + try: + self._response_store.close() + except Exception: + logger.debug( + "Failed to close response store for %s", self.name, exc_info=True, + ) if self._site: await self._site.stop() self._site = None diff --git a/gateway/run.py b/gateway/run.py index f64ea3b5633..adbba69283a 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -1755,6 +1755,46 @@ def _preserve_queued_followup_history_offset( return merged +async def _dispose_unused_adapter(adapter: "BasePlatformAdapter") -> None: + """Best-effort dispose for an adapter that never made it onto ``self.adapters``. + + The reconnect watcher in ``GatewayRunner._platform_reconnect_watcher`` + constructs a fresh adapter on every retry attempt. When the connect + call fails — for any of the three reasons (non-retryable error, + retryable error, exception during connect) — the adapter is dropped + without ever being installed, so nothing else will call its + ``disconnect()``. Any resources the adapter opened in ``__init__`` + (e.g. ``APIServerAdapter`` opens a SQLite ``ResponseStore`` that + holds 2 fds — the db file and its WAL sidecar) stay open until + garbage collection sweeps the unreachable object, which Python's + cyclic GC does not do promptly for asyncio-bound objects with + native handles. The cumulative leak is 2 fds × every retry at the + 300s backoff cap ≈ 12 fds/hour, and the default 2560-fd ulimit + is exhausted in ~12h of continuous failure, after which every + open() call on the gateway raises ``OSError: [Errno 24] Too many + open files`` and the gateway becomes a zombie (#37011). + + This helper centralises the dispose-with-suppression so the three + failure paths in the reconnect watcher can all call it without + each one having to know that ``disconnect()`` may itself raise + on a half-constructed adapter. + """ + if adapter is None: + return + try: + await adapter.disconnect() + except Exception: + # Half-constructed adapters (e.g. APIServerAdapter that + # crashed during aiohttp app setup) can raise from + # disconnect() on objects that never finished initializing. + # We must not let that escape and abort the watcher loop. + logger.debug( + "Adapter dispose raised on unowned adapter %r", + getattr(adapter, "name", type(adapter).__name__), + exc_info=True, + ) + + class GatewayRunner: """ Main gateway controller. @@ -6175,6 +6215,7 @@ class GatewayRunner: platform.value, attempt, ) + adapter = None try: adapter = self._create_adapter(platform, platform_config) if not adapter: @@ -6224,6 +6265,15 @@ class GatewayRunner: "Reconnect %s: non-retryable error (%s), removing from retry queue", platform.value, adapter.fatal_error_message, ) + # The adapter is about to be dropped from the queue + # without ever being installed on self.adapters, so + # nothing else will call disconnect() on it. We must + # dispose it here, otherwise the resource owners it + # constructed in __init__ (ResponseStore for + # APIServerAdapter, etc.) leak 2 fds each. The + # gateway hits the 2560-fd limit after ~12h of + # failed reconnects at the 300s backoff cap (#37011). + await _dispose_unused_adapter(adapter) del self._failed_platforms[platform] else: self._update_platform_runtime_status( @@ -6239,6 +6289,14 @@ class GatewayRunner: "Reconnect %s failed, next retry in %ds", platform.value, backoff, ) + # Same fd-leak concern as the non-retryable branch + # above: the adapter failed to connect and is being + # thrown away. Without an explicit dispose call, the + # resources it opened in __init__ stay open until + # the next GC pass — and aiohttp/SQLite handles + # don't get GC'd promptly, so 2 fds/retry leak at + # 300s backoff cap = ~12 fds/hour (#37011). + await _dispose_unused_adapter(adapter) # Retryable failures (network/DNS blips) keep retrying # at the backoff cap indefinitely — they self-heal once # connectivity returns. We do NOT auto-pause them: a @@ -6248,6 +6306,14 @@ class GatewayRunner: # `not fatal_error_retryable` branch above, so anything # reaching here is by definition retryable. except Exception as e: + if adapter is not None: + # An exception escaping the connect call path + # (DNS timeout, aiohttp server.start() crash, etc.) + # leaves the adapter in the same unowned state as + # the two branches above. Dispose so __init__ + # resources don't accumulate while the watcher + # keeps retrying. + await _dispose_unused_adapter(adapter) self._update_platform_runtime_status( platform.value, platform_state="retrying", diff --git a/tests/gateway/test_platform_reconnect_fd_leak.py b/tests/gateway/test_platform_reconnect_fd_leak.py new file mode 100644 index 00000000000..68cb19b8199 --- /dev/null +++ b/tests/gateway/test_platform_reconnect_fd_leak.py @@ -0,0 +1,337 @@ +"""Regression tests for the gateway platform fd-leak fix (#37011). + +Without an explicit ``disconnect()`` on adapters that fail to connect in +the reconnect watcher, every retry leaks the resources the adapter +opened in ``__init__`` — for ``APIServerAdapter`` that means 2 file +descriptors per attempt (the SQLite ``response_store.db`` and its WAL +sidecar). At the 300s backoff cap that's ~12 fds/hour; the default +2560-fd ulimit is exhausted in ~12h of continuous failure, after which +the gateway raises ``OSError: [Errno 24] Too many open files`` on +every ``open()`` and becomes a zombie. + +These tests pin all three failure paths in +``_platform_reconnect_watcher`` (non-retryable error, retryable error, +exception during connect) to call ``adapter.disconnect()`` on the +unowned adapter, plus the path-level ``APIServerAdapter.disconnect()`` +behavior of also closing the ``ResponseStore``. The pre-fix +implementation did not call ``disconnect()`` on any of these paths; +this file would have caught the regression and now pins the fix. +""" + +from __future__ import annotations + +import asyncio +import time +from typing import Any +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from gateway.config import GatewayConfig, Platform, PlatformConfig +from gateway.platforms.api_server import APIServerAdapter, ResponseStore +from gateway.platforms.base import BasePlatformAdapter, SendResult +from gateway.run import GatewayRunner, _dispose_unused_adapter + + +def _make_runner() -> GatewayRunner: + """Create a minimal GatewayRunner via object.__new__ to skip __init__. + + Mirrors the helper in test_platform_reconnect.py so this file + is drop-in compatible with the existing reconnect test suite. + """ + runner = object.__new__(GatewayRunner) + runner.config = GatewayConfig( + platforms={Platform.TELEGRAM: PlatformConfig(enabled=True, token="test")} + ) + runner._running = True + runner._shutdown_event = asyncio.Event() + runner._exit_reason = None + runner._exit_with_failure = False + runner._exit_cleanly = False + runner._failed_platforms = {} + runner.adapters = {} + return runner + + +async def _run_watcher_one_iteration(runner: GatewayRunner) -> None: + """Drive ``_platform_reconnect_watcher`` for exactly one retry pass. + + Patches ``asyncio.sleep`` to advance the watcher's internal + ``await asyncio.sleep(10)`` initial delay and the 1-second inner + sleeps without actually waiting. Mirrors the pattern used in + ``test_platform_reconnect.py::TestPlatformReconnectWatcher``. + """ + real_sleep = asyncio.sleep + call_count = 0 + + async def fake_sleep(_n: float) -> None: + nonlocal call_count + call_count += 1 + if call_count > 2: + # Two sleeps is enough to get past the initial 10s wait + # and the first inner-tick check. After that, stop the + # watcher so the test returns. + runner._running = False + await real_sleep(0) + + with patch("asyncio.sleep", side_effect=fake_sleep): + await runner._platform_reconnect_watcher() + + +class _CountingAdapter(BasePlatformAdapter): + """Adapter that records every disconnect() call for fd-leak assertions. + + The base ``BasePlatformAdapter.disconnect()`` is a no-op by default + for stub adapters, which is why the pre-fix reconnect watcher + silently leaked: the would-be dispose calls were happening on + objects that did nothing on disconnect. This stub mimics the real + ``APIServerAdapter`` shape — every constructor call opens 2 fds + (the SQLite db + WAL), and every disconnect() must close them. + """ + + def __init__(self, *, succeed: bool = False, fatal_error: str | None = None, + fatal_retryable: bool = True, raise_during_connect: bool = False): + super().__init__(PlatformConfig(enabled=True, token="t"), Platform.TELEGRAM) + # 2 fds to track: the canonical "ResponseStore" pair. The + # reconnect watcher should call disconnect() once per + # construction; otherwise these stay open and contribute to + # the gateway-wide fd count. + self._open_fds = 2 + self._disconnect_calls = 0 + self._succeed = succeed + self._fatal_error = fatal_error + self._fatal_retryable = fatal_retryable + self._raise_during_connect = raise_during_connect + + async def connect(self) -> bool: + if self._raise_during_connect: + raise RuntimeError("simulated connect exception") + if self._fatal_error: + self._set_fatal_error( + "test_code", self._fatal_error, retryable=self._fatal_retryable, + ) + return False + return self._succeed + + async def disconnect(self) -> None: + self._disconnect_calls += 1 + self._open_fds = 0 # fd release on dispose + + async def send(self, chat_id, content, reply_to=None, metadata=None): + return SendResult(success=True, message_id="1") + + async def send_typing(self, chat_id, metadata=None): + return None + + async def get_chat_info(self, chat_id): + return {"id": chat_id} + + +def _seed_runner_with_one_failure(runner: GatewayRunner) -> None: + """Queue a single platform for the reconnect watcher to pick up.""" + runner._failed_platforms[Platform.TELEGRAM] = { + "config": PlatformConfig(enabled=True, token="t"), + "attempts": 0, + "next_retry": time.monotonic() - 1, # eligible immediately + } + + +class TestReconnectFDLeakRegression: + """All three reconnect failure paths must dispose the unowned adapter. + + The pre-fix implementation constructed a fresh adapter on every + retry and dropped it on the floor when connect() failed. That leaks + 2 fds per retry (for ``APIServerAdapter``) at the 300s backoff cap, + exhausting the 2560-fd ulimit in ~12h of continuous failure (#37011). + """ + + @pytest.mark.asyncio + async def test_nonretryable_failure_disposes_unowned_adapter(self): + """A fatal error (bad auth, etc.) must call disconnect() exactly once. + + The adapter failed to connect and is being removed from the + retry queue. Nothing else owns it, so the watcher is the only + code with a chance to call disconnect() — and disconnect() is + the only place the SQLite fds get closed. One retry, one + dispose, no leak. + """ + runner = _make_runner() + _seed_runner_with_one_failure(runner) + adapter = _CountingAdapter( + succeed=False, fatal_error="bad token", fatal_retryable=False, + ) + with patch.object(runner, "_create_adapter", return_value=adapter), \ + patch.object(runner, "_connect_adapter_with_timeout", + new=AsyncMock(return_value=False)): + await _run_watcher_one_iteration(runner) + + assert adapter._disconnect_calls >= 1, ( + f"non-retryable reconnect failure must call adapter.disconnect() " + f"at least once; got {adapter._disconnect_calls} calls. " + "Without it, 2 fds leak per retry at the 300s backoff cap " + "(#37011)." + ) + assert adapter._open_fds == 0, ( + f"adapter fds not released after disconnect(); " + f"{adapter._open_fds} still open. This is the fd leak #37011." + ) + + @pytest.mark.asyncio + async def test_retryable_failure_disposes_unowned_adapter(self): + """A retryable failure (network blip) must also call disconnect(). + + This is the path that fires most often in production: a + transient DNS resolution failure or upstream outage, which + back-offs to 300s and retries indefinitely. The watcher + tracks ``info["attempts"]`` and reschedules, but the failed + adapter is still dropped on the floor without dispose. + """ + runner = _make_runner() + _seed_runner_with_one_failure(runner) + adapter = _CountingAdapter( + succeed=False, fatal_error="dns timeout", fatal_retryable=True, + ) + with patch.object(runner, "_create_adapter", return_value=adapter), \ + patch.object(runner, "_connect_adapter_with_timeout", + new=AsyncMock(return_value=False)): + await _run_watcher_one_iteration(runner) + + assert adapter._disconnect_calls >= 1, ( + f"retryable reconnect failure must call adapter.disconnect(); " + f"got {adapter._disconnect_calls} calls. This is the hot path " + "for the fd leak in #37011." + ) + + @pytest.mark.asyncio + async def test_exception_during_connect_disposes_unowned_adapter(self): + """An exception escaping connect() (aiohttp start crash, etc.) disposes. + + The ``except Exception`` arm in the watcher used to skip the + dispose call entirely. Pre-fix, this leaked the same 2 fds + per retry as the other two branches. + """ + runner = _make_runner() + _seed_runner_with_one_failure(runner) + adapter = _CountingAdapter(raise_during_connect=True) + with patch.object(runner, "_create_adapter", return_value=adapter), \ + patch.object(runner, "_connect_adapter_with_timeout", + new=AsyncMock(side_effect=RuntimeError("boom"))): + await _run_watcher_one_iteration(runner) + + assert adapter._disconnect_calls >= 1, ( + f"exception-during-connect must call adapter.disconnect(); " + f"got {adapter._disconnect_calls} calls. The except-arm of the " + "reconnect watcher is one of the three leak paths in #37011." + ) + + @pytest.mark.asyncio + async def test_dispose_helper_handles_none(self): + """``_dispose_unused_adapter(None)`` is a no-op (defensive).""" + await _dispose_unused_adapter(None) # must not raise + + @pytest.mark.asyncio + async def test_dispose_helper_swallows_disconnect_exception(self): + """A disconnect() that itself raises must not abort the watcher loop. + + Half-constructed adapters can raise from disconnect() because + some of their __init__ state is missing. The watcher loop + would then die and stop retrying, masking the original + configuration error as a hard crash. + """ + disconnect_calls = 0 + + class _RaisingAdapter(BasePlatformAdapter): + def __init__(self): + super().__init__( + PlatformConfig(enabled=True, token="t"), + Platform.TELEGRAM, + ) + + async def connect(self) -> bool: + return True + + async def disconnect(self) -> None: + nonlocal disconnect_calls + disconnect_calls += 1 + raise RuntimeError("half-constructed; aiohttp app never started") + + async def send(self, chat_id, content, reply_to=None, metadata=None): + return SendResult(success=True, message_id="1") + + async def send_typing(self, chat_id, metadata=None): + return None + + async def get_chat_info(self, chat_id): + return {"id": chat_id} + + await _dispose_unused_adapter(_RaisingAdapter()) # must not raise + assert disconnect_calls == 1 + + +class TestAPIServerDisconnectClosesResponseStore: + """The platform-level fix: ``APIServerAdapter.disconnect()`` must close its ResponseStore. + + Without this, the reconnect watcher's dispose call (see the + test class above) is a no-op for ``APIServerAdapter`` — the + aiohttp web server stops, but the SQLite ``ResponseStore`` + connection stays open. The DB file plus its WAL sidecar = 2 fds, + which is the headline leak in #37011. + """ + + def _build_adapter_with_store(self, store: ResponseStore) -> APIServerAdapter: + """Build an APIServerAdapter with the required internal state. + + We bypass ``__init__`` (which would try to start aiohttp + immediately) and set just the fields ``disconnect()`` reads. + """ + adapter = APIServerAdapter.__new__(APIServerAdapter) + adapter._mark_disconnected = lambda: None # type: ignore[method-assign] + adapter._site = None + adapter._runner = None + adapter._app = None + adapter._response_store = store + adapter.platform = Platform.API_SERVER + return adapter + + @pytest.mark.asyncio + async def test_disconnect_closes_response_store(self, tmp_path): + """Closing the adapter's ResponseStore releases its SQLite connection. + + We point the ``ResponseStore`` at a tmp db so we can verify + its ``close()`` is called by ``APIServerAdapter.disconnect()``. + The real ``ResponseStore.__init__`` opens a SQLite connection + to ``~/.hermes/response_store.db`` (or :memory: as a fallback), + which is exactly the resource that was leaking pre-fix. + """ + store = ResponseStore(max_size=10, db_path=str(tmp_path / "rs.db")) + adapter = self._build_adapter_with_store(store) + + await adapter.disconnect() + + # Post-disconnect, the underlying sqlite3 conn should be closed. + # Any further query raises ``ProgrammingError: Cannot operate + # on a closed database``. + with pytest.raises(Exception): + store._conn.execute("SELECT 1").fetchone() + + @pytest.mark.asyncio + async def test_disconnect_swallows_response_store_close_exception(self, tmp_path): + """A misbehaving ResponseStore.close() must not abort adapter shutdown. + + Real-world failure mode: the SQLite file was unlinked out + from under us (operator rm'd ``response_store.db`` during a + disk pressure event). ``close()`` raises. The watcher must + continue with the aiohttp shutdown, not bail. + """ + store = ResponseStore(max_size=10, db_path=str(tmp_path / "rs.db")) + + def _boom() -> None: + raise RuntimeError("sqlite file vanished") + + store.close = _boom # type: ignore[method-assign] + adapter = self._build_adapter_with_store(store) + + # Must not raise — disconnect() swallows the close error and + # continues to the aiohttp teardown (no-op here since we + # bypassed __init__). + await adapter.disconnect()