From e2a1a2bf13fedaf3012c6e5720e4b7dd8ce9d3a8 Mon Sep 17 00:00:00 2001 From: LifeJiggy <141562589+LifeJiggy@users.noreply.github.com> Date: Mon, 18 May 2026 23:59:25 -0700 Subject: [PATCH] fix(gateway): pre-mark sessions as resume_pending before drain to prevent data loss (#27856) Pre-mark all running agent sessions as resume_pending BEFORE the drain wait begins. If the service manager kills the process during the drain (window), the durable marker is already written so the next gateway boot can recover in-flight sessions. On graceful drain completion, clear the early markers for sessions that finished successfully. --- gateway/run.py | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/gateway/run.py b/gateway/run.py index 73af453b128..7a6990c998e 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -5574,6 +5574,24 @@ class GatewayRunner: ) timeout = self._restart_drain_timeout + + # Pre-mark sessions as resume_pending BEFORE the drain wait. + # If the process is killed by the service manager during the + # drain, the durable marker is already written so the next + # gateway boot can recover in-flight sessions (#27856). + _pre_drain_keys: list[str] = [] + for _sk, _agent in list(self._running_agents.items()): + if _agent is _AGENT_PENDING_SENTINEL: + continue + try: + self.session_store.mark_resume_pending( + _sk, + "restart_timeout" if self._restart_requested else "shutdown_timeout", + ) + _pre_drain_keys.append(_sk) + except Exception as _e: + logger.debug("pre-drain mark_resume_pending failed for %s: %s", _sk, _e) + _drain_started_at = time.monotonic() active_agents, timed_out = await self._drain_active_agents(timeout) logger.info( @@ -5585,6 +5603,21 @@ class GatewayRunner: len(active_agents), self._running_agent_count(), ) + + if not timed_out: + # Drain completed gracefully — all running sessions finished. + # Clear the pre-drain resume_pending markers so sessions that + # completed during the drain window don't carry a stale flag. + for _sk in _pre_drain_keys: + if _sk not in self._running_agents: + try: + self.session_store.clear_resume_pending(_sk) + except Exception as _e: + logger.debug( + "clear_resume_pending after drain failed for %s: %s", + _sk, _e, + ) + if timed_out: logger.warning( "Gateway drain timed out after %.1fs with %d active agent(s); interrupting remaining work.",