diff --git a/gateway/run.py b/gateway/run.py index 339d63c67c..e6ba607c5a 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -2759,44 +2759,67 @@ class GatewayRunner: task.add_done_callback(self._background_tasks.discard) return True + # Drain-timeout reasons set by _stop_impl() when a still-running turn is + # force-interrupted; "restart_interrupted" is set by + # SessionStore.suspend_recently_active() on crash recovery (no + # .clean_shutdown marker). All three mean "the agent was mid-turn and + # we killed it" — eligible for startup auto-resume. + _AUTO_RESUME_REASONS = frozenset( + {"restart_timeout", "shutdown_timeout", "restart_interrupted"} + ) + def _schedule_resume_pending_sessions(self) -> int: """Auto-continue fresh restart-interrupted sessions after startup. - ``resume_pending`` already preserves the transcript and injects the - recovery system note on the next user message. This method closes the - restart UX gap by synthesizing that next message once adapters are back - online, so users do not have to send a placeholder ping after restart. + ``resume_pending`` already preserves the transcript AND the existing + ``_is_resume_pending`` branch in ``_handle_message_with_agent`` + injects a reason-aware recovery system note on the next turn. This + method closes the UX gap by synthesizing that next turn once + adapters are back online — the event text is empty so the existing + injection path owns the wording and we never double up. + + Adapters that are not yet ready (adapter missing from + ``self.adapters``) are skipped silently; their sessions stay + ``resume_pending`` and will auto-resume on the next real user + message, or on the next gateway startup. """ + window = _auto_continue_freshness_window() try: - entries = self.session_store.list_resume_pending( - window_secs=_auto_continue_freshness_window(), - allowed_reasons={"restart_timeout", "shutdown_timeout"}, - ) + with self.session_store._lock: # noqa: SLF001 — snapshot under lock + self.session_store._ensure_loaded_locked() # noqa: SLF001 + candidates = [ + entry for entry in self.session_store._entries.values() # noqa: SLF001 + if entry.resume_pending + and not entry.suspended + and entry.origin is not None + and entry.resume_reason in self._AUTO_RESUME_REASONS + ] except Exception as exc: - logger.warning("Failed to list resume-pending sessions: %s", exc) + logger.warning("Failed to enumerate resume-pending sessions: %s", exc) return 0 + now = datetime.now() scheduled = 0 - for entry in entries: - source = getattr(entry, "origin", None) - platform = getattr(source, "platform", None) - adapter = self.adapters.get(platform) if platform is not None else None - if source is None or adapter is None: + for entry in candidates: + marker = entry.last_resume_marked_at or entry.updated_at + if marker is not None and (now - marker).total_seconds() > window: + continue + + source = entry.origin + adapter = self.adapters.get(source.platform) + if adapter is None: logger.debug( - "Skipping auto-resume for %s: adapter unavailable for %s", - getattr(entry, "session_key", "?"), - getattr(platform, "value", platform), + "Skipping auto-resume for %s: adapter not ready for %s", + entry.session_key, + getattr(source.platform, "value", source.platform), ) continue + # Empty-text internal event — the _is_resume_pending branch in + # _handle_message_with_agent prepends the proper reason-aware + # system note before the turn runs. event = MessageEvent( - text=( - "[System note: The gateway restarted after interrupting " - "this session. Resume the previous turn now. Reconcile " - "the transcript first: if tool results are already present, " - "process them before taking new action; never claim work " - "completed unless it is visible in the transcript/tool output.]" - ), + text="", message_type=MessageType.TEXT, source=source, internal=True, @@ -2807,7 +2830,10 @@ class GatewayRunner: scheduled += 1 if scheduled: - logger.info("Scheduled auto-resume for %d restart-interrupted session(s)", scheduled) + logger.info( + "Scheduled auto-resume for %d restart-interrupted session(s)", + scheduled, + ) return scheduled async def start(self) -> bool: diff --git a/gateway/session.py b/gateway/session.py index 8602fd54f6..be393e48e6 100644 --- a/gateway/session.py +++ b/gateway/session.py @@ -1028,42 +1028,6 @@ class SessionStore: self._save() return True - def list_resume_pending( - self, - *, - window_secs: Optional[float] = None, - now: Optional[float] = None, - allowed_reasons: Optional[set[str]] = None, - ) -> List[SessionEntry]: - """Return fresh restart-interrupted sessions eligible for resume. - - Only entries that still have an origin are returned; the gateway needs - that origin to route continuation back through the original - platform/chat/thread. ``suspended`` entries are excluded because - explicit suspension/stuck-loop escalation must win over resume. - """ - current = datetime.fromtimestamp(now) if now is not None else _now() - window = float(window_secs) if window_secs is not None else None - - with self._lock: - self._ensure_loaded_locked() - entries = list(self._entries.values()) - - pending: List[SessionEntry] = [] - for entry in entries: - if not entry.resume_pending or entry.suspended or entry.origin is None: - continue - if allowed_reasons is not None and entry.resume_reason not in allowed_reasons: - continue - if window is not None and window > 0: - marker = entry.last_resume_marked_at or entry.updated_at - if marker is not None and (current - marker).total_seconds() > window: - continue - pending.append(entry) - - pending.sort(key=lambda entry: entry.last_resume_marked_at or entry.updated_at) - return pending - def prune_old_entries(self, max_age_days: int) -> int: """Drop SessionEntry records older than max_age_days. diff --git a/scripts/release.py b/scripts/release.py index 1929d9dd8f..f62f755770 100755 --- a/scripts/release.py +++ b/scripts/release.py @@ -54,6 +54,7 @@ AUTHOR_MAP = { "ngusev@astralinux.ru": "NikolayGusev-astra", "liuguangyong201@hellobike.com": "liuguangyong93", "2093036+exiao@users.noreply.github.com": "exiao", + "kevyan1998@gmail.com": "kyan12", "rylen.anil@gmail.com": "rylena", "godnanijatin@gmail.com": "jatingodnani", "252811164+adybag14-cyber@users.noreply.github.com": "adybag14-cyber", diff --git a/tests/gateway/test_restart_resume_pending.py b/tests/gateway/test_restart_resume_pending.py index 64778c4469..13ef2f6f99 100644 --- a/tests/gateway/test_restart_resume_pending.py +++ b/tests/gateway/test_restart_resume_pending.py @@ -245,30 +245,6 @@ class TestSessionEntryResumeFields: class TestMarkResumePending: - def test_list_resume_pending_returns_fresh_entries_with_origins(self, tmp_path): - store = _make_store(tmp_path) - fresh = store.get_or_create_session(_make_source(chat_id="fresh")) - stale = store.get_or_create_session(_make_source(chat_id="stale")) - missing_origin = store.get_or_create_session(_make_source(chat_id="missing-origin")) - suspended = store.get_or_create_session(_make_source(chat_id="suspended")) - - store.mark_resume_pending(fresh.session_key, reason="restart_timeout") - store.mark_resume_pending(stale.session_key, reason="restart_timeout") - store.mark_resume_pending(missing_origin.session_key, reason="restart_timeout") - store.mark_resume_pending(suspended.session_key, reason="restart_timeout") - old = datetime.now() - timedelta(hours=3) - store._entries[stale.session_key].last_resume_marked_at = old - store._entries[missing_origin.session_key].origin = None - store._entries[suspended.session_key].suspended = True - - pending = store.list_resume_pending( - window_secs=3600, - now=datetime.now().timestamp(), - allowed_reasons={"restart_timeout"}, - ) - - assert [entry.session_key for entry in pending] == [fresh.session_key] - def test_marks_existing_session(self, tmp_path): store = _make_store(tmp_path) source = _make_source() @@ -978,24 +954,158 @@ async def test_startup_auto_resume_schedules_fresh_pending_sessions(): resume_reason="restart_timeout", last_resume_marked_at=datetime.now(), ) - runner.session_store.list_resume_pending = MagicMock(return_value=[pending_entry]) + runner.session_store._entries = {pending_entry.session_key: pending_entry} adapter.handle_message = AsyncMock() scheduled = runner._schedule_resume_pending_sessions() await asyncio.sleep(0) assert scheduled == 1 - runner.session_store.list_resume_pending.assert_called_once_with( - window_secs=_auto_continue_freshness_window(), - allowed_reasons={"restart_timeout", "shutdown_timeout"}, - ) adapter.handle_message.assert_awaited_once() event = adapter.handle_message.await_args.args[0] assert isinstance(event, MessageEvent) assert event.internal is True assert event.message_type == MessageType.TEXT assert event.source == source - assert event.text.startswith("[System note: The gateway restarted") + # Text is empty — the existing _is_resume_pending branch in + # _handle_message_with_agent owns the system-note injection so we don't + # double it up. + assert event.text == "" + + +@pytest.mark.asyncio +async def test_startup_auto_resume_includes_crash_recovery(): + """Crash-recovered sessions (reason=restart_interrupted) are also auto-resumed. + + suspend_recently_active() marks in-flight sessions with resume_reason + "restart_interrupted" when the previous gateway exit was not clean + (crash/SIGKILL/OOM). These should get the same magic continuation as + drain-timeout interruptions. + """ + runner, adapter = make_restart_runner() + source = make_restart_source(chat_id="crash-chat") + pending_entry = SessionEntry( + session_key="agent:main:telegram:dm:crash-chat", + session_id="sid", + created_at=datetime.now(), + updated_at=datetime.now(), + origin=source, + platform=Platform.TELEGRAM, + chat_type="dm", + resume_pending=True, + resume_reason="restart_interrupted", + last_resume_marked_at=datetime.now(), + ) + runner.session_store._entries = {pending_entry.session_key: pending_entry} + adapter.handle_message = AsyncMock() + + scheduled = runner._schedule_resume_pending_sessions() + await asyncio.sleep(0) + + assert scheduled == 1 + adapter.handle_message.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_startup_auto_resume_skips_stale_entries(): + """Entries older than the freshness window must not be auto-resumed.""" + runner, adapter = make_restart_runner() + source = make_restart_source(chat_id="stale-chat") + stale_marker = datetime.now() - timedelta( + seconds=_auto_continue_freshness_window() + 60 + ) + stale_entry = SessionEntry( + session_key="agent:main:telegram:dm:stale-chat", + session_id="sid", + created_at=stale_marker, + updated_at=stale_marker, + origin=source, + platform=Platform.TELEGRAM, + chat_type="dm", + resume_pending=True, + resume_reason="restart_timeout", + last_resume_marked_at=stale_marker, + ) + runner.session_store._entries = {stale_entry.session_key: stale_entry} + adapter.handle_message = AsyncMock() + + scheduled = runner._schedule_resume_pending_sessions() + + assert scheduled == 0 + adapter.handle_message.assert_not_called() + + +@pytest.mark.asyncio +async def test_startup_auto_resume_skips_suspended_and_originless(): + """suspended entries and entries with no origin are excluded.""" + runner, adapter = make_restart_runner() + source = make_restart_source(chat_id="ok") + suspended_entry = SessionEntry( + session_key="agent:main:telegram:dm:suspended", + session_id="sid-s", + created_at=datetime.now(), + updated_at=datetime.now(), + origin=source, + platform=Platform.TELEGRAM, + chat_type="dm", + resume_pending=True, + resume_reason="restart_timeout", + suspended=True, + last_resume_marked_at=datetime.now(), + ) + originless = SessionEntry( + session_key="agent:main:telegram:dm:originless", + session_id="sid-o", + created_at=datetime.now(), + updated_at=datetime.now(), + origin=None, + platform=Platform.TELEGRAM, + chat_type="dm", + resume_pending=True, + resume_reason="restart_timeout", + last_resume_marked_at=datetime.now(), + ) + runner.session_store._entries = { + suspended_entry.session_key: suspended_entry, + originless.session_key: originless, + } + adapter.handle_message = AsyncMock() + + scheduled = runner._schedule_resume_pending_sessions() + + assert scheduled == 0 + adapter.handle_message.assert_not_called() + + +@pytest.mark.asyncio +async def test_startup_auto_resume_skips_disallowed_reasons(): + """Reasons outside the auto-resume set (e.g. a future custom reason) are skipped. + + These sessions still auto-resume on the next real user message via the + existing _is_resume_pending branch — we just don't synthesize a turn + for them at startup. + """ + runner, adapter = make_restart_runner() + source = make_restart_source(chat_id="other") + other_entry = SessionEntry( + session_key="agent:main:telegram:dm:other", + session_id="sid", + created_at=datetime.now(), + updated_at=datetime.now(), + origin=source, + platform=Platform.TELEGRAM, + chat_type="dm", + resume_pending=True, + resume_reason="manual_resume_request", + last_resume_marked_at=datetime.now(), + ) + runner.session_store._entries = {other_entry.session_key: other_entry} + adapter.handle_message = AsyncMock() + + scheduled = runner._schedule_resume_pending_sessions() + + assert scheduled == 0 + adapter.handle_message.assert_not_called() @pytest.mark.asyncio @@ -1014,7 +1124,7 @@ async def test_startup_auto_resume_skips_when_adapter_unavailable(): resume_reason="restart_timeout", last_resume_marked_at=datetime.now(), ) - runner.session_store.list_resume_pending = MagicMock(return_value=[pending_entry]) + runner.session_store._entries = {pending_entry.session_key: pending_entry} runner.adapters = {} adapter.handle_message = AsyncMock()