fix(gateway): defer cross-process cache cleanup off the cache lock (#52197) (#52761)

The #45966 cross-process coherence guard popped the stale cached agent
and then called the blocking _cleanup_agent_resources (memory-provider
shutdown, tool-resource teardown, async-client teardown) while still
holding _agent_cache_lock, on the gateway event-loop thread. While that
ran, _sweep_idle_cached_agents (driven by _session_expiry_watcher)
blocked acquiring the same lock and the asyncio loop stalled for minutes,
tripping repeated Discord 'heartbeat blocked' warnings.

Fix mirrors the cap-enforcer / idle-sweep paths: pop the stale entry
under the lock, release it, then schedule the SOFT release on a daemon
thread. The soft path (_release_evicted_agent_soft) is also more correct
here than the hard teardown the regression used — the same session
rebuilds a fresh agent immediately after invalidation, so its terminal
sandbox / browser / bg processes (keyed on task_id) must be preserved
for the rebuilt agent to inherit, not torn down.

Verified the cross-process site was the only cleanup-under-lock instance;
the other _cleanup_agent_resources call sites run outside the lock.
This commit is contained in:
Teknium 2026-06-25 18:58:47 -07:00 committed by GitHub
parent e29823f1e8
commit 811df74a10
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 149 additions and 1 deletions

View file

@ -15829,6 +15829,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
except Exception:
pass
_xproc_evicted_agent = None
if _cache_lock and _cache is not None:
with _cache_lock:
cached = _cache.get(session_key)
@ -15852,7 +15853,20 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
evicted = self._agent_cache.pop(session_key, None)
_ev_agent = evicted[0] if isinstance(evicted, tuple) and evicted else None
if _ev_agent and _ev_agent is not _AGENT_PENDING_SENTINEL:
self._cleanup_agent_resources(_ev_agent)
# Defer cleanup until AFTER the lock is
# released — _cleanup_agent_resources /
# release_clients can block on memory-provider
# shutdown and socket teardown, and running it
# here would stall the gateway event loop while
# _sweep_idle_cached_agents (session-expiry
# watcher) waits on the same lock, blocking
# Discord heartbeats (#52197). The same session
# rebuilds a fresh agent immediately below, so
# use the SOFT release that preserves the
# session's terminal sandbox / browser / bg
# processes for the rebuilt agent to inherit —
# mirrors _evict_cached_agent / idle-sweep.
_xproc_evicted_agent = _ev_agent
else:
agent = cached[0]
# Refresh LRU order so the cap enforcement evicts
@ -15868,6 +15882,26 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
agent.max_iterations = max_iterations
logger.debug("Reusing cached agent for session %s", session_key)
# Lock released — now schedule cleanup of any cross-process-evicted
# agent on a daemon thread so memory-provider shutdown / socket
# teardown never blocks the gateway event loop or the cache lock
# the session-expiry watcher needs (#52197).
if _xproc_evicted_agent is not None:
try:
threading.Thread(
target=self._release_evicted_agent_soft,
args=(_xproc_evicted_agent,),
daemon=True,
name=f"agent-xproc-evict-{str(session_key)[:24]}",
).start()
except Exception:
# Interpreter shutdown or thread-spawn failure — release
# inline as a best-effort fallback.
try:
self._release_evicted_agent_soft(_xproc_evicted_agent)
except Exception:
pass
if agent is None:
# Config changed or first message — create fresh agent
agent = AIAgent(

View file

@ -1706,3 +1706,117 @@ class TestAgentCacheMessageCountRebaseline:
runner._refresh_agent_cache_message_count("telegram:s1", "s1")
with runner._agent_cache_lock:
assert runner._agent_cache["telegram:s1"][2] == 5
class TestCrossProcessInvalidationDefersCleanup:
"""#52197: cross-process cache invalidation must NOT run agent cleanup
while holding ``_agent_cache_lock``.
The #45966 guard popped the stale cached agent and then called the
blocking ``_cleanup_agent_resources`` (memory-provider shutdown, socket
teardown) *inside* the ``with _agent_cache_lock:`` block, on the gateway
event-loop thread. While that ran, ``_sweep_idle_cached_agents`` (driven
by the session-expiry watcher) blocked acquiring the same lock and the
asyncio loop stalled, tripping Discord heartbeat-blocked warnings.
The fix mirrors the cap-enforcer / idle-sweep paths: pop under the lock,
release it, then schedule the SOFT release (which preserves the session's
terminal sandbox / browser / bg processes for the immediately-rebuilt
agent) on a daemon thread.
These tests replicate the exact eviction sequence the production guard now
performs and pin the invariant: the lock is free while cleanup runs, and
the hard-teardown path is never used here.
"""
def _runner(self):
from collections import OrderedDict
from gateway.run import GatewayRunner
runner = GatewayRunner.__new__(GatewayRunner)
runner._agent_cache = OrderedDict()
runner._agent_cache_lock = threading.Lock()
return runner
@staticmethod
def _evict_like_production(runner, session_key):
"""Run the post-#52197 cross-process eviction sequence verbatim:
pop the stale entry under the lock, then schedule the soft release
on a daemon thread AFTER the lock is released."""
_xproc_evicted_agent = None
with runner._agent_cache_lock:
evicted = runner._agent_cache.pop(session_key, None)
_ev_agent = evicted[0] if isinstance(evicted, tuple) and evicted else None
if _ev_agent is not None:
_xproc_evicted_agent = _ev_agent
if _xproc_evicted_agent is not None:
threading.Thread(
target=runner._release_evicted_agent_soft,
args=(_xproc_evicted_agent,),
daemon=True,
name="agent-xproc-evict-test",
).start()
def test_cleanup_runs_with_lock_released(self):
"""The cache lock must be acquirable WHILE the evicted agent's
cleanup is running proving cleanup is off the locked path."""
runner = self._runner()
cleanup_started = threading.Event()
release_lock = threading.Event()
def _soft(agent):
cleanup_started.set()
# Block here as if memory-provider shutdown / socket teardown is
# slow. If cleanup were still holding _agent_cache_lock, the
# assertion below could never acquire it.
release_lock.wait(timeout=2.0)
runner._release_evicted_agent_soft = _soft
runner._cleanup_agent_resources = MagicMock()
old_agent = MagicMock()
with runner._agent_cache_lock:
runner._agent_cache["telegram:s1"] = (old_agent, "sig", 3)
self._evict_like_production(runner, "telegram:s1")
# Wait until the (blocking) cleanup is mid-flight.
assert cleanup_started.wait(timeout=2.0)
# The lock MUST be free right now — this is the heart of #52197.
# A 0.5s acquire timeout would fire if cleanup held the lock.
acquired = runner._agent_cache_lock.acquire(timeout=0.5)
assert acquired, "cache lock blocked during cross-process cleanup (#52197)"
runner._agent_cache_lock.release()
# Let the cleanup thread finish.
release_lock.set()
# Stale entry was popped, hard-teardown path never used.
assert "telegram:s1" not in runner._agent_cache
runner._cleanup_agent_resources.assert_not_called()
def test_soft_release_scheduled_for_evicted_agent(self):
"""The evicted agent is handed to the soft-release path, not the
hard ``_cleanup_agent_resources`` teardown."""
runner = self._runner()
release_calls: list = []
runner._release_evicted_agent_soft = lambda agent: release_calls.append(agent)
runner._cleanup_agent_resources = MagicMock()
old_agent = MagicMock()
with runner._agent_cache_lock:
runner._agent_cache["telegram:s1"] = (old_agent, "sig", 3)
self._evict_like_production(runner, "telegram:s1")
import time as _t
deadline = _t.time() + 2.0
while _t.time() < deadline and not release_calls:
_t.sleep(0.02)
assert release_calls == [old_agent]
runner._cleanup_agent_resources.assert_not_called()