diff --git a/gateway/run.py b/gateway/run.py index 0110e8cadc..dcee18e518 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -8665,7 +8665,12 @@ class GatewayRunner: override = self._session_model_overrides.get(session_key) return override is not None and override.get("model") == agent_model - def _release_running_agent_state(self, session_key: str) -> None: + def _release_running_agent_state( + self, + session_key: str, + *, + run_generation: Optional[int] = None, + ) -> bool: """Pop ALL per-running-agent state entries for ``session_key``. Replaces ad-hoc ``del self._running_agents[key]`` calls scattered @@ -8681,13 +8686,25 @@ class GatewayRunner: across turns (``_session_model_overrides``, ``_voice_mode``, ``_pending_approvals``, ``_update_prompt_pending``) is NOT touched here — those have their own lifecycles. + + When ``run_generation`` is provided, only clear the slot if that + generation is still current for the session. This prevents an + older async run whose generation was bumped by /stop or /new from + clobbering a newer run's state during its own unwind. Returns + True when the slot was cleared, False when an ownership guard + blocked it. """ if not session_key: - return + return False + if run_generation is not None and not self._is_session_run_current( + session_key, run_generation + ): + return False self._running_agents.pop(session_key, None) self._running_agents_ts.pop(session_key, None) if hasattr(self, "_busy_ack_ts"): self._busy_ack_ts.pop(session_key, None) + return True def _clear_session_boundary_security_state(self, session_key: str) -> None: """Clear approval state that must not survive a real conversation switch.""" @@ -10249,10 +10266,24 @@ class GatewayRunner: # Wait for agent to be created while agent_holder[0] is None: await asyncio.sleep(0.05) - if session_key: - self._running_agents[session_key] = agent_holder[0] - if self._draining: - self._update_runtime_status("draining") + if not session_key: + return + # Only promote the sentinel to the real agent if this run is still + # current. If /stop or /new bumped the generation while we were + # spinning up, leave the newer run's slot alone — we'll be + # discarded by the stale-result check in _handle_message_with_agent. + if run_generation is not None and not self._is_session_run_current( + session_key, run_generation + ): + logger.info( + "Skipping stale agent promotion for %s — generation %s is no longer current", + (session_key or "")[:20], + run_generation, + ) + return + self._running_agents[session_key] = agent_holder[0] + if self._draining: + self._update_runtime_status("draining") tracking_task = asyncio.create_task(track_agent()) @@ -10758,7 +10789,14 @@ class GatewayRunner: # Clean up tracking tracking_task.cancel() if session_key: - self._release_running_agent_state(session_key) + # Only release the slot if this run's generation still owns + # it. A /stop or /new that bumped the generation while we + # were unwinding has already installed its own state; this + # guard prevents an old run from clobbering it on the way + # out. + self._release_running_agent_state( + session_key, run_generation=run_generation + ) if self._draining: self._update_runtime_status("draining")