mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-14 04:02:26 +00:00
fix(gateway): cap cached session sources with LRU eviction
Follow-up on top of Zyproth's session-source cache: swap the unbounded dict for an OrderedDict with a 512-entry LRU cap so long-running gateways can't accumulate stale entries for dead sessions forever. - self._session_sources is now an OrderedDict - _cache_session_source() move_to_end + popitem(last=False) above cap - _get_cached_session_source() move_to_end on hit (LRU read bump) - restart_test_helpers.py wires OrderedDict + _session_sources_max
This commit is contained in:
parent
176b93575a
commit
333598cb0e
2 changed files with 27 additions and 4 deletions
|
|
@ -1086,7 +1086,13 @@ class GatewayRunner:
|
||||||
self._pending_native_image_paths_by_session: Dict[str, List[str]] = {}
|
self._pending_native_image_paths_by_session: Dict[str, List[str]] = {}
|
||||||
self._busy_ack_ts: Dict[str, float] = {} # last busy-ack timestamp per session (debounce)
|
self._busy_ack_ts: Dict[str, float] = {} # last busy-ack timestamp per session (debounce)
|
||||||
self._session_run_generation: Dict[str, int] = {}
|
self._session_run_generation: Dict[str, int] = {}
|
||||||
self._session_sources: Dict[str, "SessionSource"] = {}
|
# LRU cache of live SessionSources keyed by session_key. Used by
|
||||||
|
# fallback routing paths (shutdown notifications, synthetic
|
||||||
|
# background-process events) when the persisted origin is missing
|
||||||
|
# and _parse_session_key can't recover thread_id. Capped so it
|
||||||
|
# cannot grow unbounded over a long-running gateway lifetime.
|
||||||
|
self._session_sources: "OrderedDict[str, SessionSource]" = OrderedDict()
|
||||||
|
self._session_sources_max = 512
|
||||||
|
|
||||||
# Cache AIAgent instances per session to preserve prompt caching.
|
# Cache AIAgent instances per session to preserve prompt caching.
|
||||||
# Without this, a new AIAgent is created per message, rebuilding the
|
# Without this, a new AIAgent is created per message, rebuilding the
|
||||||
|
|
@ -6015,12 +6021,21 @@ class GatewayRunner:
|
||||||
return
|
return
|
||||||
cached_sources = getattr(self, "_session_sources", None)
|
cached_sources = getattr(self, "_session_sources", None)
|
||||||
if cached_sources is None:
|
if cached_sources is None:
|
||||||
cached_sources = {}
|
cached_sources = OrderedDict()
|
||||||
self._session_sources = cached_sources
|
self._session_sources = cached_sources
|
||||||
try:
|
try:
|
||||||
cached_sources[session_key] = dataclasses.replace(source)
|
cached_sources[session_key] = dataclasses.replace(source)
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.debug("Failed to cache live session source for %s", session_key, exc_info=True)
|
logger.debug("Failed to cache live session source for %s", session_key, exc_info=True)
|
||||||
|
return
|
||||||
|
# LRU: mark as most-recently-used and trim to max size.
|
||||||
|
try:
|
||||||
|
cached_sources.move_to_end(session_key)
|
||||||
|
max_size = getattr(self, "_session_sources_max", 512)
|
||||||
|
while len(cached_sources) > max_size:
|
||||||
|
cached_sources.popitem(last=False)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
def _get_cached_session_source(self, session_key: str):
|
def _get_cached_session_source(self, session_key: str):
|
||||||
if not session_key:
|
if not session_key:
|
||||||
|
|
@ -6028,7 +6043,13 @@ class GatewayRunner:
|
||||||
cached_sources = getattr(self, "_session_sources", None)
|
cached_sources = getattr(self, "_session_sources", None)
|
||||||
if not cached_sources:
|
if not cached_sources:
|
||||||
return None
|
return None
|
||||||
return cached_sources.get(session_key)
|
source = cached_sources.get(session_key)
|
||||||
|
if source is not None:
|
||||||
|
try:
|
||||||
|
cached_sources.move_to_end(session_key)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
return source
|
||||||
|
|
||||||
async def _handle_message_with_agent(self, event, source, _quick_key: str, run_generation: int):
|
async def _handle_message_with_agent(self, event, source, _quick_key: str, run_generation: int):
|
||||||
"""Inner handler that runs under the _running_agents sentinel guard."""
|
"""Inner handler that runs under the _running_agents sentinel guard."""
|
||||||
|
|
|
||||||
|
|
@ -1,4 +1,5 @@
|
||||||
import asyncio
|
import asyncio
|
||||||
|
from collections import OrderedDict
|
||||||
from unittest.mock import AsyncMock, MagicMock
|
from unittest.mock import AsyncMock, MagicMock
|
||||||
|
|
||||||
from gateway.config import GatewayConfig, Platform, PlatformConfig
|
from gateway.config import GatewayConfig, Platform, PlatformConfig
|
||||||
|
|
@ -74,7 +75,8 @@ def make_restart_runner(
|
||||||
runner._update_prompt_pending = {}
|
runner._update_prompt_pending = {}
|
||||||
runner._voice_mode = {}
|
runner._voice_mode = {}
|
||||||
runner._session_model_overrides = {}
|
runner._session_model_overrides = {}
|
||||||
runner._session_sources = {}
|
runner._session_sources = OrderedDict()
|
||||||
|
runner._session_sources_max = 512
|
||||||
runner._shutdown_all_gateway_honcho = lambda: None
|
runner._shutdown_all_gateway_honcho = lambda: None
|
||||||
runner._update_runtime_status = MagicMock()
|
runner._update_runtime_status = MagicMock()
|
||||||
runner._queue_or_replace_pending_event = GatewayRunner._queue_or_replace_pending_event.__get__(
|
runner._queue_or_replace_pending_event = GatewayRunner._queue_or_replace_pending_event.__get__(
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue