diff --git a/gateway/run.py b/gateway/run.py index cd0834301d5..b3bd6520bfb 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -14201,20 +14201,57 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew agent = None _cache_lock = getattr(self, "_agent_cache_lock", None) _cache = getattr(self, "_agent_cache", None) + + # Detect cross-process writes: when another process (e.g. hermes + # dashboard) appends to the same session in the shared SessionDB, + # the cached agent's in-memory transcript becomes stale. Compare + # the session's current message_count against the count recorded + # when the agent was cached; on mismatch, invalidate the cache + # so a fresh agent re-reads from disk. (#45966) + _current_msg_count = None + if self._session_db is not None and session_id: + try: + _sess_row = self._session_db.get_session(session_id) + if _sess_row: + _current_msg_count = _sess_row.get("message_count", 0) + except Exception: + pass + if _cache_lock and _cache is not None: with _cache_lock: cached = _cache.get(session_key) if cached and cached[1] == _sig: - agent = cached[0] - # Refresh LRU order so the cap enforcement evicts - # truly-oldest entries, not the one we just used. - if hasattr(_cache, "move_to_end"): - try: - _cache.move_to_end(session_key) - except KeyError: - pass - self._init_cached_agent_for_turn(agent, _interrupt_depth) - logger.debug("Reusing cached agent for session %s", session_key) + # cached[2] is the message_count at cache time; + # stale when a second process appended rows. + _cached_mc = cached[2] if len(cached) > 2 else None + if ( + _cached_mc is not None + and _current_msg_count is not None + and _current_msg_count != _cached_mc + ): + # Cross-process write detected — discard stale + # agent so it rebuilds from fresh DB transcript. + logger.info( + "Agent cache invalidated for session %s: " + "message_count changed (%s -> %s), " + "possible cross-process write", + session_key, _cached_mc, _current_msg_count, + ) + evicted = self._agent_cache.pop(session_key, None) + _ev_agent = evicted[0] if isinstance(evicted, tuple) and evicted else None + if _ev_agent and _ev_agent is not _AGENT_PENDING_SENTINEL: + self._cleanup_agent_resources(_ev_agent) + else: + agent = cached[0] + # Refresh LRU order so the cap enforcement evicts + # truly-oldest entries, not the one we just used. + if hasattr(_cache, "move_to_end"): + try: + _cache.move_to_end(session_key) + except KeyError: + pass + self._init_cached_agent_for_turn(agent, _interrupt_depth) + logger.debug("Reusing cached agent for session %s", session_key) if agent is None: # Config changed or first message — create fresh agent @@ -14252,7 +14289,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew ) if _cache_lock and _cache is not None: with _cache_lock: - _cache[session_key] = (agent, _sig) + _cache[session_key] = (agent, _sig, _current_msg_count) self._enforce_agent_cache_cap() logger.debug("Created new agent for session %s (sig=%s)", session_key, _sig)