diff --git a/gateway/run.py b/gateway/run.py index 2bd4930059..469abe9ec0 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -1465,7 +1465,18 @@ class GatewayRunner: logger.info("Recovered %s background process(es) from previous run", recovered) except Exception as e: logger.warning("Process checkpoint recovery: %s", e) - + + # Suspend sessions that were active when the gateway last exited. + # This prevents stuck sessions from being blindly resumed on restart, + # which can create an unrecoverable loop (#7536). Suspended sessions + # auto-reset on the next incoming message, giving the user a clean start. + try: + suspended = self.session_store.suspend_recently_active() + if suspended: + logger.info("Suspended %d in-flight session(s) from previous run", suspended) + except Exception as e: + logger.warning("Session suspension on startup failed: %s", e) + connected_count = 0 enabled_platform_count = 0 startup_nonretryable_errors: list[str] = [] @@ -2377,8 +2388,11 @@ class GatewayRunner: self._pending_messages.pop(_quick_key, None) if _quick_key in self._running_agents: del self._running_agents[_quick_key] - logger.info("HARD STOP for session %s — session lock released", _quick_key[:20]) - return "⚡ Force-stopped. The session is unlocked — you can send a new message." + # Mark session suspended so the next message starts fresh + # instead of resuming the stuck context (#7536). + self.session_store.suspend_session(_quick_key) + logger.info("HARD STOP for session %s — suspended, session lock released", _quick_key[:20]) + return "⚡ Force-stopped. The session is suspended — your next message will start fresh." # /reset and /new must bypass the running-agent guard so they # actually dispatch as commands instead of being queued as user @@ -2812,7 +2826,9 @@ class GatewayRunner: # so the agent knows this is a fresh conversation (not an intentional /reset). if getattr(session_entry, 'was_auto_reset', False): reset_reason = getattr(session_entry, 'auto_reset_reason', None) or 'idle' - if reset_reason == "daily": + if reset_reason == "suspended": + context_note = "[System note: The user's previous session was stopped and suspended. This is a fresh conversation with no prior context.]" + elif reset_reason == "daily": context_note = "[System note: The user's session was automatically reset by the daily schedule. This is a fresh conversation with no prior context.]" else: context_note = "[System note: The user's previous session expired due to inactivity. This is a fresh conversation with no prior context.]" @@ -2829,7 +2845,9 @@ class GatewayRunner: ) platform_name = source.platform.value if source.platform else "" had_activity = getattr(session_entry, 'reset_had_activity', False) - should_notify = ( + # Suspended sessions always notify (they were explicitly stopped + # or crashed mid-operation) — skip the policy check. + should_notify = reset_reason == "suspended" or ( policy.notify and had_activity and platform_name not in policy.notify_exclude_platforms @@ -2837,7 +2855,9 @@ class GatewayRunner: if should_notify: adapter = self.adapters.get(source.platform) if adapter: - if reset_reason == "daily": + if reset_reason == "suspended": + reason_text = "previous session was stopped or interrupted" + elif reset_reason == "daily": reason_text = f"daily schedule at {policy.at_hour}:00" else: hours = policy.idle_minutes // 60 @@ -3920,25 +3940,31 @@ class GatewayRunner: handles /stop before this method is reached. This handler fires only through normal command dispatch (no running agent) or as a fallback. Force-clean the session lock in all cases for safety. + + When there IS a running/pending agent, the session is also marked + as *suspended* so the next message starts a fresh session instead + of resuming the stuck context (#7536). """ source = event.source session_entry = self.session_store.get_or_create_session(source) session_key = session_entry.session_key - + agent = self._running_agents.get(session_key) if agent is _AGENT_PENDING_SENTINEL: # Force-clean the sentinel so the session is unlocked. if session_key in self._running_agents: del self._running_agents[session_key] - logger.info("HARD STOP (pending) for session %s — sentinel cleared", session_key[:20]) - return "⚡ Force-stopped. The agent was still starting — session unlocked." + self.session_store.suspend_session(session_key) + logger.info("HARD STOP (pending) for session %s — suspended, sentinel cleared", session_key[:20]) + return "⚡ Force-stopped. The agent was still starting — your next message will start fresh." if agent: agent.interrupt("Stop requested") # Force-clean the session lock so a truly hung agent doesn't # keep it locked forever. if session_key in self._running_agents: del self._running_agents[session_key] - return "⚡ Force-stopped. The session is unlocked — you can send a new message." + self.session_store.suspend_session(session_key) + return "⚡ Force-stopped. Your next message will start a fresh session." else: return "No active task to stop." diff --git a/gateway/session.py b/gateway/session.py index 2b32c18895..96013df513 100644 --- a/gateway/session.py +++ b/gateway/session.py @@ -368,6 +368,11 @@ class SessionEntry: # survives gateway restarts (the old in-memory _pre_flushed_sessions # set was lost on restart, causing redundant re-flushes). memory_flushed: bool = False + + # When True the next call to get_or_create_session() will auto-reset + # this session (create a new session_id) so the user starts fresh. + # Set by /stop to break stuck-resume loops (#7536). + suspended: bool = False def to_dict(self) -> Dict[str, Any]: result = { @@ -387,6 +392,7 @@ class SessionEntry: "estimated_cost_usd": self.estimated_cost_usd, "cost_status": self.cost_status, "memory_flushed": self.memory_flushed, + "suspended": self.suspended, } if self.origin: result["origin"] = self.origin.to_dict() @@ -423,6 +429,7 @@ class SessionEntry: estimated_cost_usd=data.get("estimated_cost_usd", 0.0), cost_status=data.get("cost_status", "unknown"), memory_flushed=data.get("memory_flushed", False), + suspended=data.get("suspended", False), ) @@ -698,7 +705,12 @@ class SessionStore: if session_key in self._entries and not force_new: entry = self._entries[session_key] - reset_reason = self._should_reset(entry, source) + # Auto-reset sessions marked as suspended (e.g. after /stop + # broke a stuck loop — #7536). + if entry.suspended: + reset_reason = "suspended" + else: + reset_reason = self._should_reset(entry, source) if not reset_reason: entry.updated_at = now self._save() @@ -771,6 +783,44 @@ class SessionStore: entry.last_prompt_tokens = last_prompt_tokens self._save() + def suspend_session(self, session_key: str) -> bool: + """Mark a session as suspended so it auto-resets on next access. + + Used by ``/stop`` to prevent stuck sessions from being resumed + after a gateway restart (#7536). Returns True if the session + existed and was marked. + """ + with self._lock: + self._ensure_loaded_locked() + if session_key in self._entries: + self._entries[session_key].suspended = True + self._save() + return True + return False + + def suspend_recently_active(self, max_age_seconds: int = 120) -> int: + """Mark recently-active sessions as suspended. + + Called on gateway startup to prevent sessions that were likely + in-flight when the gateway last exited from being blindly resumed + (#7536). Only suspends sessions updated within *max_age_seconds* + to avoid resetting long-idle sessions that are harmless to resume. + Returns the number of sessions that were suspended. + """ + import time as _time + + cutoff = _time.time() - max_age_seconds + count = 0 + with self._lock: + self._ensure_loaded_locked() + for entry in self._entries.values(): + if not entry.suspended and entry.updated_at >= cutoff: + entry.suspended = True + count += 1 + if count: + self._save() + return count + def reset_session(self, session_key: str) -> Optional[SessionEntry]: """Force reset a session, creating a new session ID.""" db_end_session_id = None