diff --git a/gateway/run.py b/gateway/run.py index dddba78440e..8555480d7b7 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -15829,6 +15829,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew except Exception: pass + _xproc_evicted_agent = None if _cache_lock and _cache is not None: with _cache_lock: cached = _cache.get(session_key) @@ -15852,7 +15853,20 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew evicted = self._agent_cache.pop(session_key, None) _ev_agent = evicted[0] if isinstance(evicted, tuple) and evicted else None if _ev_agent and _ev_agent is not _AGENT_PENDING_SENTINEL: - self._cleanup_agent_resources(_ev_agent) + # Defer cleanup until AFTER the lock is + # released — _cleanup_agent_resources / + # release_clients can block on memory-provider + # shutdown and socket teardown, and running it + # here would stall the gateway event loop while + # _sweep_idle_cached_agents (session-expiry + # watcher) waits on the same lock, blocking + # Discord heartbeats (#52197). The same session + # rebuilds a fresh agent immediately below, so + # use the SOFT release that preserves the + # session's terminal sandbox / browser / bg + # processes for the rebuilt agent to inherit — + # mirrors _evict_cached_agent / idle-sweep. + _xproc_evicted_agent = _ev_agent else: agent = cached[0] # Refresh LRU order so the cap enforcement evicts @@ -15868,6 +15882,26 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew agent.max_iterations = max_iterations logger.debug("Reusing cached agent for session %s", session_key) + # Lock released — now schedule cleanup of any cross-process-evicted + # agent on a daemon thread so memory-provider shutdown / socket + # teardown never blocks the gateway event loop or the cache lock + # the session-expiry watcher needs (#52197). + if _xproc_evicted_agent is not None: + try: + threading.Thread( + target=self._release_evicted_agent_soft, + args=(_xproc_evicted_agent,), + daemon=True, + name=f"agent-xproc-evict-{str(session_key)[:24]}", + ).start() + except Exception: + # Interpreter shutdown or thread-spawn failure — release + # inline as a best-effort fallback. + try: + self._release_evicted_agent_soft(_xproc_evicted_agent) + except Exception: + pass + if agent is None: # Config changed or first message — create fresh agent agent = AIAgent( diff --git a/tests/gateway/test_agent_cache.py b/tests/gateway/test_agent_cache.py index 559e1c0e96c..54b0fe08794 100644 --- a/tests/gateway/test_agent_cache.py +++ b/tests/gateway/test_agent_cache.py @@ -1706,3 +1706,117 @@ class TestAgentCacheMessageCountRebaseline: runner._refresh_agent_cache_message_count("telegram:s1", "s1") with runner._agent_cache_lock: assert runner._agent_cache["telegram:s1"][2] == 5 + + +class TestCrossProcessInvalidationDefersCleanup: + """#52197: cross-process cache invalidation must NOT run agent cleanup + while holding ``_agent_cache_lock``. + + The #45966 guard popped the stale cached agent and then called the + blocking ``_cleanup_agent_resources`` (memory-provider shutdown, socket + teardown) *inside* the ``with _agent_cache_lock:`` block, on the gateway + event-loop thread. While that ran, ``_sweep_idle_cached_agents`` (driven + by the session-expiry watcher) blocked acquiring the same lock and the + asyncio loop stalled, tripping Discord heartbeat-blocked warnings. + + The fix mirrors the cap-enforcer / idle-sweep paths: pop under the lock, + release it, then schedule the SOFT release (which preserves the session's + terminal sandbox / browser / bg processes for the immediately-rebuilt + agent) on a daemon thread. + + These tests replicate the exact eviction sequence the production guard now + performs and pin the invariant: the lock is free while cleanup runs, and + the hard-teardown path is never used here. + """ + + def _runner(self): + from collections import OrderedDict + from gateway.run import GatewayRunner + + runner = GatewayRunner.__new__(GatewayRunner) + runner._agent_cache = OrderedDict() + runner._agent_cache_lock = threading.Lock() + return runner + + @staticmethod + def _evict_like_production(runner, session_key): + """Run the post-#52197 cross-process eviction sequence verbatim: + pop the stale entry under the lock, then schedule the soft release + on a daemon thread AFTER the lock is released.""" + _xproc_evicted_agent = None + with runner._agent_cache_lock: + evicted = runner._agent_cache.pop(session_key, None) + _ev_agent = evicted[0] if isinstance(evicted, tuple) and evicted else None + if _ev_agent is not None: + _xproc_evicted_agent = _ev_agent + if _xproc_evicted_agent is not None: + threading.Thread( + target=runner._release_evicted_agent_soft, + args=(_xproc_evicted_agent,), + daemon=True, + name="agent-xproc-evict-test", + ).start() + + def test_cleanup_runs_with_lock_released(self): + """The cache lock must be acquirable WHILE the evicted agent's + cleanup is running — proving cleanup is off the locked path.""" + runner = self._runner() + + cleanup_started = threading.Event() + release_lock = threading.Event() + + def _soft(agent): + cleanup_started.set() + # Block here as if memory-provider shutdown / socket teardown is + # slow. If cleanup were still holding _agent_cache_lock, the + # assertion below could never acquire it. + release_lock.wait(timeout=2.0) + + runner._release_evicted_agent_soft = _soft + runner._cleanup_agent_resources = MagicMock() + + old_agent = MagicMock() + with runner._agent_cache_lock: + runner._agent_cache["telegram:s1"] = (old_agent, "sig", 3) + + self._evict_like_production(runner, "telegram:s1") + + # Wait until the (blocking) cleanup is mid-flight. + assert cleanup_started.wait(timeout=2.0) + + # The lock MUST be free right now — this is the heart of #52197. + # A 0.5s acquire timeout would fire if cleanup held the lock. + acquired = runner._agent_cache_lock.acquire(timeout=0.5) + assert acquired, "cache lock blocked during cross-process cleanup (#52197)" + runner._agent_cache_lock.release() + + # Let the cleanup thread finish. + release_lock.set() + + # Stale entry was popped, hard-teardown path never used. + assert "telegram:s1" not in runner._agent_cache + runner._cleanup_agent_resources.assert_not_called() + + def test_soft_release_scheduled_for_evicted_agent(self): + """The evicted agent is handed to the soft-release path, not the + hard ``_cleanup_agent_resources`` teardown.""" + runner = self._runner() + + release_calls: list = [] + runner._release_evicted_agent_soft = lambda agent: release_calls.append(agent) + runner._cleanup_agent_resources = MagicMock() + + old_agent = MagicMock() + with runner._agent_cache_lock: + runner._agent_cache["telegram:s1"] = (old_agent, "sig", 3) + + self._evict_like_production(runner, "telegram:s1") + + import time as _t + deadline = _t.time() + 2.0 + while _t.time() < deadline and not release_calls: + _t.sleep(0.02) + + assert release_calls == [old_agent] + runner._cleanup_agent_resources.assert_not_called() +