diff --git a/plugins/memory/hindsight/__init__.py b/plugins/memory/hindsight/__init__.py index c9ebb95f8..b46d3e6ed 100644 --- a/plugins/memory/hindsight/__init__.py +++ b/plugins/memory/hindsight/__init__.py @@ -1112,7 +1112,6 @@ class HindsightMemoryProvider(MemoryProvider): def shutdown(self) -> None: logger.debug("Hindsight shutdown: waiting for background threads") - global _loop, _loop_thread for t in (self._prefetch_thread, self._sync_thread): if t and t.is_alive(): t.join(timeout=5.0) @@ -1131,13 +1130,17 @@ class HindsightMemoryProvider(MemoryProvider): except Exception: pass self._client = None - # Stop the background event loop so no tasks are pending at exit - if _loop is not None and _loop.is_running(): - _loop.call_soon_threadsafe(_loop.stop) - if _loop_thread is not None: - _loop_thread.join(timeout=5.0) - _loop = None - _loop_thread = None + # The module-global background event loop (_loop / _loop_thread) + # is intentionally NOT stopped here. It is shared across every + # HindsightMemoryProvider instance in the process — the plugin + # loader creates a new provider per AIAgent, and the gateway + # creates one AIAgent per concurrent chat session. Stopping the + # loop from one provider's shutdown() strands the aiohttp + # ClientSession + TCPConnector owned by every sibling provider + # on a dead loop, which surfaces as the "Unclosed client session" + # / "Unclosed connector" warnings reported in #11923. The loop + # runs on a daemon thread and is reclaimed on process exit; + # per-session cleanup happens via self._client.aclose() above. def register(ctx) -> None: diff --git a/tests/plugins/memory/test_hindsight_provider.py b/tests/plugins/memory/test_hindsight_provider.py index aed3a2349..64e5fa39e 100644 --- a/tests/plugins/memory/test_hindsight_provider.py +++ b/tests/plugins/memory/test_hindsight_provider.py @@ -767,3 +767,73 @@ class TestAvailability: p = HindsightMemoryProvider() p.initialize(session_id="test-session", hermes_home=str(tmp_path), platform="cli") assert p._mode == "disabled" + + +class TestSharedEventLoopLifecycle: + """Regression tests for #11923 — Hindsight leaking aiohttp ClientSession / + TCPConnector objects in long-running gateway processes. + + Root cause: the module-global ``_loop`` / ``_loop_thread`` pair is shared + across every HindsightMemoryProvider instance in the process (the plugin + loader builds one provider per AIAgent, and the gateway builds one AIAgent + per concurrent chat session). When a session ended, ``shutdown()`` stopped + the shared loop, which orphaned every *other* live provider's aiohttp + ClientSession on a dead loop. Those sessions were never closed and surfaced + as ``Unclosed client session`` / ``Unclosed connector`` errors. + """ + + def test_shutdown_does_not_stop_shared_event_loop(self, provider_with_config): + from plugins.memory import hindsight as hindsight_mod + + async def _noop(): + return 1 + + # Prime the shared loop by scheduling a trivial coroutine — mirrors + # the first time any real async call (arecall/aretain/areflect) runs. + assert hindsight_mod._run_sync(_noop()) == 1 + + loop_before = hindsight_mod._loop + thread_before = hindsight_mod._loop_thread + assert loop_before is not None and loop_before.is_running() + assert thread_before is not None and thread_before.is_alive() + + # Build two independent providers (two concurrent chat sessions). + provider_a = provider_with_config() + provider_b = provider_with_config() + + # End session A. + provider_a.shutdown() + + # Module-global loop/thread must still be the same live objects — + # provider B (and any other sibling provider) is still relying on them. + assert hindsight_mod._loop is loop_before, ( + "shutdown() swapped out the shared event loop — sibling providers " + "would have their aiohttp ClientSession orphaned (#11923)" + ) + assert hindsight_mod._loop.is_running(), ( + "shutdown() stopped the shared event loop — sibling providers' " + "aiohttp sessions would leak (#11923)" + ) + assert hindsight_mod._loop_thread is thread_before + assert hindsight_mod._loop_thread.is_alive() + + # Provider B can still dispatch async work on the shared loop. + async def _still_working(): + return 42 + + assert hindsight_mod._run_sync(_still_working()) == 42 + + provider_b.shutdown() + + def test_client_aclose_called_on_cloud_mode_shutdown(self, provider): + """Per-provider session cleanup still runs even though the shared + loop is preserved. Each provider's own aiohttp session is closed + via ``self._client.aclose()``; only the (empty) shared loop survives. + """ + assert provider._client is not None + mock_client = provider._client + + provider.shutdown() + + mock_client.aclose.assert_called_once() + assert provider._client is None