fix(gateway): invalidate agent cache on cross-process session writes (#45966)

This commit is contained in:
kyssta-exe 2026-06-14 10:43:24 +00:00
parent 5105c3651a
commit 6d0f79defe

View file

@ -14201,20 +14201,57 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
agent = None
_cache_lock = getattr(self, "_agent_cache_lock", None)
_cache = getattr(self, "_agent_cache", None)
# Detect cross-process writes: when another process (e.g. hermes
# dashboard) appends to the same session in the shared SessionDB,
# the cached agent's in-memory transcript becomes stale. Compare
# the session's current message_count against the count recorded
# when the agent was cached; on mismatch, invalidate the cache
# so a fresh agent re-reads from disk. (#45966)
_current_msg_count = None
if self._session_db is not None and session_id:
try:
_sess_row = self._session_db.get_session(session_id)
if _sess_row:
_current_msg_count = _sess_row.get("message_count", 0)
except Exception:
pass
if _cache_lock and _cache is not None:
with _cache_lock:
cached = _cache.get(session_key)
if cached and cached[1] == _sig:
agent = cached[0]
# Refresh LRU order so the cap enforcement evicts
# truly-oldest entries, not the one we just used.
if hasattr(_cache, "move_to_end"):
try:
_cache.move_to_end(session_key)
except KeyError:
pass
self._init_cached_agent_for_turn(agent, _interrupt_depth)
logger.debug("Reusing cached agent for session %s", session_key)
# cached[2] is the message_count at cache time;
# stale when a second process appended rows.
_cached_mc = cached[2] if len(cached) > 2 else None
if (
_cached_mc is not None
and _current_msg_count is not None
and _current_msg_count != _cached_mc
):
# Cross-process write detected — discard stale
# agent so it rebuilds from fresh DB transcript.
logger.info(
"Agent cache invalidated for session %s: "
"message_count changed (%s -> %s), "
"possible cross-process write",
session_key, _cached_mc, _current_msg_count,
)
evicted = self._agent_cache.pop(session_key, None)
_ev_agent = evicted[0] if isinstance(evicted, tuple) and evicted else None
if _ev_agent and _ev_agent is not _AGENT_PENDING_SENTINEL:
self._cleanup_agent_resources(_ev_agent)
else:
agent = cached[0]
# Refresh LRU order so the cap enforcement evicts
# truly-oldest entries, not the one we just used.
if hasattr(_cache, "move_to_end"):
try:
_cache.move_to_end(session_key)
except KeyError:
pass
self._init_cached_agent_for_turn(agent, _interrupt_depth)
logger.debug("Reusing cached agent for session %s", session_key)
if agent is None:
# Config changed or first message — create fresh agent
@ -14252,7 +14289,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
)
if _cache_lock and _cache is not None:
with _cache_lock:
_cache[session_key] = (agent, _sig)
_cache[session_key] = (agent, _sig, _current_msg_count)
self._enforce_agent_cache_cap()
logger.debug("Created new agent for session %s (sig=%s)", session_key, _sig)