mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-25 00:51:20 +00:00
fix(hindsight): preserve shared event loop across provider shutdowns
The module-global `_loop` / `_loop_thread` pair is shared across every `HindsightMemoryProvider` instance in the process — the plugin loader creates one provider per `AIAgent`, and the gateway creates one `AIAgent` per concurrent chat session (Telegram/Discord/Slack/CLI). `HindsightMemoryProvider.shutdown()` stopped the shared loop when any one session ended. That stranded the aiohttp `ClientSession` and `TCPConnector` owned by every sibling provider on a now-dead loop — they were never reachable for close and surfaced as the `Unclosed client session` / `Unclosed connector` warnings reported in #11923. Fix: stop stopping the shared loop in `shutdown()`. Per-provider cleanup still closes that provider's own client via `self._client.aclose()`. The loop runs on a daemon thread and is reclaimed on process exit; keeping it alive between provider shutdowns means sibling providers can drain their own sessions cleanly. Regression tests in `tests/plugins/memory/test_hindsight_provider.py` (`TestSharedEventLoopLifecycle`): - `test_shutdown_does_not_stop_shared_event_loop` — two providers share the loop; shutting down one leaves the loop live for the other. This test reproduces the #11923 leak on `main` and passes with the fix. - `test_client_aclose_called_on_cloud_mode_shutdown` — each provider's own aiohttp session is still closed via `aclose()`. Fixes #11923.
This commit is contained in:
parent
b4c030025f
commit
93a74f74bf
2 changed files with 81 additions and 8 deletions
|
|
@ -1112,7 +1112,6 @@ class HindsightMemoryProvider(MemoryProvider):
|
||||||
|
|
||||||
def shutdown(self) -> None:
|
def shutdown(self) -> None:
|
||||||
logger.debug("Hindsight shutdown: waiting for background threads")
|
logger.debug("Hindsight shutdown: waiting for background threads")
|
||||||
global _loop, _loop_thread
|
|
||||||
for t in (self._prefetch_thread, self._sync_thread):
|
for t in (self._prefetch_thread, self._sync_thread):
|
||||||
if t and t.is_alive():
|
if t and t.is_alive():
|
||||||
t.join(timeout=5.0)
|
t.join(timeout=5.0)
|
||||||
|
|
@ -1131,13 +1130,17 @@ class HindsightMemoryProvider(MemoryProvider):
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
self._client = None
|
self._client = None
|
||||||
# Stop the background event loop so no tasks are pending at exit
|
# The module-global background event loop (_loop / _loop_thread)
|
||||||
if _loop is not None and _loop.is_running():
|
# is intentionally NOT stopped here. It is shared across every
|
||||||
_loop.call_soon_threadsafe(_loop.stop)
|
# HindsightMemoryProvider instance in the process — the plugin
|
||||||
if _loop_thread is not None:
|
# loader creates a new provider per AIAgent, and the gateway
|
||||||
_loop_thread.join(timeout=5.0)
|
# creates one AIAgent per concurrent chat session. Stopping the
|
||||||
_loop = None
|
# loop from one provider's shutdown() strands the aiohttp
|
||||||
_loop_thread = None
|
# ClientSession + TCPConnector owned by every sibling provider
|
||||||
|
# on a dead loop, which surfaces as the "Unclosed client session"
|
||||||
|
# / "Unclosed connector" warnings reported in #11923. The loop
|
||||||
|
# runs on a daemon thread and is reclaimed on process exit;
|
||||||
|
# per-session cleanup happens via self._client.aclose() above.
|
||||||
|
|
||||||
|
|
||||||
def register(ctx) -> None:
|
def register(ctx) -> None:
|
||||||
|
|
|
||||||
|
|
@ -767,3 +767,73 @@ class TestAvailability:
|
||||||
p = HindsightMemoryProvider()
|
p = HindsightMemoryProvider()
|
||||||
p.initialize(session_id="test-session", hermes_home=str(tmp_path), platform="cli")
|
p.initialize(session_id="test-session", hermes_home=str(tmp_path), platform="cli")
|
||||||
assert p._mode == "disabled"
|
assert p._mode == "disabled"
|
||||||
|
|
||||||
|
|
||||||
|
class TestSharedEventLoopLifecycle:
|
||||||
|
"""Regression tests for #11923 — Hindsight leaking aiohttp ClientSession /
|
||||||
|
TCPConnector objects in long-running gateway processes.
|
||||||
|
|
||||||
|
Root cause: the module-global ``_loop`` / ``_loop_thread`` pair is shared
|
||||||
|
across every HindsightMemoryProvider instance in the process (the plugin
|
||||||
|
loader builds one provider per AIAgent, and the gateway builds one AIAgent
|
||||||
|
per concurrent chat session). When a session ended, ``shutdown()`` stopped
|
||||||
|
the shared loop, which orphaned every *other* live provider's aiohttp
|
||||||
|
ClientSession on a dead loop. Those sessions were never closed and surfaced
|
||||||
|
as ``Unclosed client session`` / ``Unclosed connector`` errors.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def test_shutdown_does_not_stop_shared_event_loop(self, provider_with_config):
|
||||||
|
from plugins.memory import hindsight as hindsight_mod
|
||||||
|
|
||||||
|
async def _noop():
|
||||||
|
return 1
|
||||||
|
|
||||||
|
# Prime the shared loop by scheduling a trivial coroutine — mirrors
|
||||||
|
# the first time any real async call (arecall/aretain/areflect) runs.
|
||||||
|
assert hindsight_mod._run_sync(_noop()) == 1
|
||||||
|
|
||||||
|
loop_before = hindsight_mod._loop
|
||||||
|
thread_before = hindsight_mod._loop_thread
|
||||||
|
assert loop_before is not None and loop_before.is_running()
|
||||||
|
assert thread_before is not None and thread_before.is_alive()
|
||||||
|
|
||||||
|
# Build two independent providers (two concurrent chat sessions).
|
||||||
|
provider_a = provider_with_config()
|
||||||
|
provider_b = provider_with_config()
|
||||||
|
|
||||||
|
# End session A.
|
||||||
|
provider_a.shutdown()
|
||||||
|
|
||||||
|
# Module-global loop/thread must still be the same live objects —
|
||||||
|
# provider B (and any other sibling provider) is still relying on them.
|
||||||
|
assert hindsight_mod._loop is loop_before, (
|
||||||
|
"shutdown() swapped out the shared event loop — sibling providers "
|
||||||
|
"would have their aiohttp ClientSession orphaned (#11923)"
|
||||||
|
)
|
||||||
|
assert hindsight_mod._loop.is_running(), (
|
||||||
|
"shutdown() stopped the shared event loop — sibling providers' "
|
||||||
|
"aiohttp sessions would leak (#11923)"
|
||||||
|
)
|
||||||
|
assert hindsight_mod._loop_thread is thread_before
|
||||||
|
assert hindsight_mod._loop_thread.is_alive()
|
||||||
|
|
||||||
|
# Provider B can still dispatch async work on the shared loop.
|
||||||
|
async def _still_working():
|
||||||
|
return 42
|
||||||
|
|
||||||
|
assert hindsight_mod._run_sync(_still_working()) == 42
|
||||||
|
|
||||||
|
provider_b.shutdown()
|
||||||
|
|
||||||
|
def test_client_aclose_called_on_cloud_mode_shutdown(self, provider):
|
||||||
|
"""Per-provider session cleanup still runs even though the shared
|
||||||
|
loop is preserved. Each provider's own aiohttp session is closed
|
||||||
|
via ``self._client.aclose()``; only the (empty) shared loop survives.
|
||||||
|
"""
|
||||||
|
assert provider._client is not None
|
||||||
|
mock_client = provider._client
|
||||||
|
|
||||||
|
provider.shutdown()
|
||||||
|
|
||||||
|
mock_client.aclose.assert_called_once()
|
||||||
|
assert provider._client is None
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue