diff --git a/gateway/run.py b/gateway/run.py index 576a84342b..91b80d6741 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -1086,7 +1086,13 @@ class GatewayRunner: self._pending_native_image_paths_by_session: Dict[str, List[str]] = {} self._busy_ack_ts: Dict[str, float] = {} # last busy-ack timestamp per session (debounce) self._session_run_generation: Dict[str, int] = {} - self._session_sources: Dict[str, "SessionSource"] = {} + # LRU cache of live SessionSources keyed by session_key. Used by + # fallback routing paths (shutdown notifications, synthetic + # background-process events) when the persisted origin is missing + # and _parse_session_key can't recover thread_id. Capped so it + # cannot grow unbounded over a long-running gateway lifetime. + self._session_sources: "OrderedDict[str, SessionSource]" = OrderedDict() + self._session_sources_max = 512 # Cache AIAgent instances per session to preserve prompt caching. # Without this, a new AIAgent is created per message, rebuilding the @@ -6015,12 +6021,21 @@ class GatewayRunner: return cached_sources = getattr(self, "_session_sources", None) if cached_sources is None: - cached_sources = {} + cached_sources = OrderedDict() self._session_sources = cached_sources try: cached_sources[session_key] = dataclasses.replace(source) except Exception: logger.debug("Failed to cache live session source for %s", session_key, exc_info=True) + return + # LRU: mark as most-recently-used and trim to max size. + try: + cached_sources.move_to_end(session_key) + max_size = getattr(self, "_session_sources_max", 512) + while len(cached_sources) > max_size: + cached_sources.popitem(last=False) + except Exception: + pass def _get_cached_session_source(self, session_key: str): if not session_key: @@ -6028,7 +6043,13 @@ class GatewayRunner: cached_sources = getattr(self, "_session_sources", None) if not cached_sources: return None - return cached_sources.get(session_key) + source = cached_sources.get(session_key) + if source is not None: + try: + cached_sources.move_to_end(session_key) + except Exception: + pass + return source async def _handle_message_with_agent(self, event, source, _quick_key: str, run_generation: int): """Inner handler that runs under the _running_agents sentinel guard.""" diff --git a/tests/gateway/restart_test_helpers.py b/tests/gateway/restart_test_helpers.py index cdc88902c9..213c46cbad 100644 --- a/tests/gateway/restart_test_helpers.py +++ b/tests/gateway/restart_test_helpers.py @@ -1,4 +1,5 @@ import asyncio +from collections import OrderedDict from unittest.mock import AsyncMock, MagicMock from gateway.config import GatewayConfig, Platform, PlatformConfig @@ -74,7 +75,8 @@ def make_restart_runner( runner._update_prompt_pending = {} runner._voice_mode = {} runner._session_model_overrides = {} - runner._session_sources = {} + runner._session_sources = OrderedDict() + runner._session_sources_max = 512 runner._shutdown_all_gateway_honcho = lambda: None runner._update_runtime_status = MagicMock() runner._queue_or_replace_pending_event = GatewayRunner._queue_or_replace_pending_event.__get__(