From 93a74f74bf9341a2a3d88c9e1067ded12f102bc4 Mon Sep 17 00:00:00 2001 From: Jason Perlow Date: Wed, 22 Apr 2026 14:42:19 -0400 Subject: [PATCH] fix(hindsight): preserve shared event loop across provider shutdowns MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The module-global `_loop` / `_loop_thread` pair is shared across every `HindsightMemoryProvider` instance in the process — the plugin loader creates one provider per `AIAgent`, and the gateway creates one `AIAgent` per concurrent chat session (Telegram/Discord/Slack/CLI). `HindsightMemoryProvider.shutdown()` stopped the shared loop when any one session ended. That stranded the aiohttp `ClientSession` and `TCPConnector` owned by every sibling provider on a now-dead loop — they were never reachable for close and surfaced as the `Unclosed client session` / `Unclosed connector` warnings reported in #11923. Fix: stop stopping the shared loop in `shutdown()`. Per-provider cleanup still closes that provider's own client via `self._client.aclose()`. The loop runs on a daemon thread and is reclaimed on process exit; keeping it alive between provider shutdowns means sibling providers can drain their own sessions cleanly. Regression tests in `tests/plugins/memory/test_hindsight_provider.py` (`TestSharedEventLoopLifecycle`): - `test_shutdown_does_not_stop_shared_event_loop` — two providers share the loop; shutting down one leaves the loop live for the other. This test reproduces the #11923 leak on `main` and passes with the fix. - `test_client_aclose_called_on_cloud_mode_shutdown` — each provider's own aiohttp session is still closed via `aclose()`. Fixes #11923. --- plugins/memory/hindsight/__init__.py | 19 ++--- .../plugins/memory/test_hindsight_provider.py | 70 +++++++++++++++++++ 2 files changed, 81 insertions(+), 8 deletions(-) diff --git a/plugins/memory/hindsight/__init__.py b/plugins/memory/hindsight/__init__.py index c9ebb95f8..b46d3e6ed 100644 --- a/plugins/memory/hindsight/__init__.py +++ b/plugins/memory/hindsight/__init__.py @@ -1112,7 +1112,6 @@ class HindsightMemoryProvider(MemoryProvider): def shutdown(self) -> None: logger.debug("Hindsight shutdown: waiting for background threads") - global _loop, _loop_thread for t in (self._prefetch_thread, self._sync_thread): if t and t.is_alive(): t.join(timeout=5.0) @@ -1131,13 +1130,17 @@ class HindsightMemoryProvider(MemoryProvider): except Exception: pass self._client = None - # Stop the background event loop so no tasks are pending at exit - if _loop is not None and _loop.is_running(): - _loop.call_soon_threadsafe(_loop.stop) - if _loop_thread is not None: - _loop_thread.join(timeout=5.0) - _loop = None - _loop_thread = None + # The module-global background event loop (_loop / _loop_thread) + # is intentionally NOT stopped here. It is shared across every + # HindsightMemoryProvider instance in the process — the plugin + # loader creates a new provider per AIAgent, and the gateway + # creates one AIAgent per concurrent chat session. Stopping the + # loop from one provider's shutdown() strands the aiohttp + # ClientSession + TCPConnector owned by every sibling provider + # on a dead loop, which surfaces as the "Unclosed client session" + # / "Unclosed connector" warnings reported in #11923. The loop + # runs on a daemon thread and is reclaimed on process exit; + # per-session cleanup happens via self._client.aclose() above. def register(ctx) -> None: diff --git a/tests/plugins/memory/test_hindsight_provider.py b/tests/plugins/memory/test_hindsight_provider.py index aed3a2349..64e5fa39e 100644 --- a/tests/plugins/memory/test_hindsight_provider.py +++ b/tests/plugins/memory/test_hindsight_provider.py @@ -767,3 +767,73 @@ class TestAvailability: p = HindsightMemoryProvider() p.initialize(session_id="test-session", hermes_home=str(tmp_path), platform="cli") assert p._mode == "disabled" + + +class TestSharedEventLoopLifecycle: + """Regression tests for #11923 — Hindsight leaking aiohttp ClientSession / + TCPConnector objects in long-running gateway processes. + + Root cause: the module-global ``_loop`` / ``_loop_thread`` pair is shared + across every HindsightMemoryProvider instance in the process (the plugin + loader builds one provider per AIAgent, and the gateway builds one AIAgent + per concurrent chat session). When a session ended, ``shutdown()`` stopped + the shared loop, which orphaned every *other* live provider's aiohttp + ClientSession on a dead loop. Those sessions were never closed and surfaced + as ``Unclosed client session`` / ``Unclosed connector`` errors. + """ + + def test_shutdown_does_not_stop_shared_event_loop(self, provider_with_config): + from plugins.memory import hindsight as hindsight_mod + + async def _noop(): + return 1 + + # Prime the shared loop by scheduling a trivial coroutine — mirrors + # the first time any real async call (arecall/aretain/areflect) runs. + assert hindsight_mod._run_sync(_noop()) == 1 + + loop_before = hindsight_mod._loop + thread_before = hindsight_mod._loop_thread + assert loop_before is not None and loop_before.is_running() + assert thread_before is not None and thread_before.is_alive() + + # Build two independent providers (two concurrent chat sessions). + provider_a = provider_with_config() + provider_b = provider_with_config() + + # End session A. + provider_a.shutdown() + + # Module-global loop/thread must still be the same live objects — + # provider B (and any other sibling provider) is still relying on them. + assert hindsight_mod._loop is loop_before, ( + "shutdown() swapped out the shared event loop — sibling providers " + "would have their aiohttp ClientSession orphaned (#11923)" + ) + assert hindsight_mod._loop.is_running(), ( + "shutdown() stopped the shared event loop — sibling providers' " + "aiohttp sessions would leak (#11923)" + ) + assert hindsight_mod._loop_thread is thread_before + assert hindsight_mod._loop_thread.is_alive() + + # Provider B can still dispatch async work on the shared loop. + async def _still_working(): + return 42 + + assert hindsight_mod._run_sync(_still_working()) == 42 + + provider_b.shutdown() + + def test_client_aclose_called_on_cloud_mode_shutdown(self, provider): + """Per-provider session cleanup still runs even though the shared + loop is preserved. Each provider's own aiohttp session is closed + via ``self._client.aclose()``; only the (empty) shared loop survives. + """ + assert provider._client is not None + mock_client = provider._client + + provider.shutdown() + + mock_client.aclose.assert_called_once() + assert provider._client is None