From 10bad2faf1c9eec161da3c844359f4f914145f6c Mon Sep 17 00:00:00 2001 From: Teknium <127238744+teknium1@users.noreply.github.com> Date: Sun, 14 Jun 2026 03:21:06 -0700 Subject: [PATCH] fix(gateway): serialize startup auto-resume before inbound (#46074) Gateway startup now queues real inbound messages until restart-interrupted auto-resume turns have completed, preventing duplicate agents for the same session after a restart. --- gateway/run.py | 151 +++++++++++++++---- tests/gateway/test_restart_resume_pending.py | 80 ++++++++++ 2 files changed, 204 insertions(+), 27 deletions(-) diff --git a/gateway/run.py b/gateway/run.py index c91e04ac64a..2c3d7b11f29 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -2001,6 +2001,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew _stop_task: Optional[asyncio.Task] = None _session_model_overrides: Dict[str, Dict[str, str]] = {} _session_reasoning_overrides: Dict[str, Dict[str, Any]] = {} + _startup_restore_in_progress: bool = False def __init__(self, config: Optional[GatewayConfig] = None): global _gateway_runner_ref @@ -2081,6 +2082,13 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew self._pending_native_image_paths_by_session: Dict[str, List[str]] = {} self._busy_ack_ts: Dict[str, float] = {} # last busy-ack timestamp per session (debounce) self._session_run_generation: Dict[str, int] = {} + # Startup restore gate: while restart-interrupted sessions are being + # auto-resumed, real inbound messages are queued instead of competing + # with the synthetic resume turns for the same session. The queued + # events drain only after all startup resume tasks have finished. + self._startup_restore_in_progress = False + self._startup_restore_queue: List[MessageEvent] = [] + self._startup_restore_tasks: List[asyncio.Task] = [] # LRU cache of live SessionSources keyed by session_key. Used by # fallback routing paths (shutdown notifications, synthetic # background-process events) when the persisted origin is missing @@ -4496,6 +4504,94 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew {"restart_timeout", "shutdown_timeout", "restart_interrupted"} ) + async def _run_startup_resume_event( + self, + adapter: BasePlatformAdapter, + event: MessageEvent, + session_key: str, + ) -> None: + """Dispatch one synthetic startup resume and wait for its agent turn. + + ``BasePlatformAdapter.handle_message()`` returns after it installs the + adapter-level guard and spawns the background processing task. Startup + restore needs a stronger boundary: inbound messages must stay queued + until the resumed agent turn itself has finished, otherwise a user + message can race the restore turn immediately after ``handle_message`` + returns. + """ + try: + await adapter.handle_message(event) + session_tasks = getattr(adapter, "_session_tasks", {}) + task = session_tasks.get(session_key) if isinstance(session_tasks, dict) else None + if task is not None: + await asyncio.shield(task) + finally: + # _schedule_resume_pending_sessions pre-claims the runner slot + # before spawning this task. If adapter.handle_message raises + # before _handle_message takes ownership, release that pre-claim; + # otherwise the real run's normal cleanup owns the slot. + if self._running_agents.get(session_key) is _AGENT_PENDING_SENTINEL: + self._release_running_agent_state(session_key) + + def _queue_startup_restore_event(self, event: MessageEvent) -> None: + queue = getattr(self, "_startup_restore_queue", None) + if queue is None: + queue = [] + self._startup_restore_queue = queue + queue.append(event) + try: + source = event.source + logger.info( + "Queued inbound message during gateway startup restore: platform=%s chat=%s", + source.platform.value if source and source.platform else "unknown", + source.chat_id if source else "unknown", + ) + except Exception: + pass + + async def _drain_startup_restore_queue(self) -> int: + """Replay inbound messages queued while startup auto-resume ran.""" + drained = 0 + queue = getattr(self, "_startup_restore_queue", None) + if queue is None: + return 0 + while queue: + event = queue.pop(0) + source = getattr(event, "source", None) + adapter = self.adapters.get(source.platform) if source is not None else None + if adapter is None: + logger.debug( + "Dropping startup-restore queued message: adapter unavailable for %s", + getattr(getattr(source, "platform", None), "value", None), + ) + continue + # Mark this replay so _handle_message does not queue it again while + # the restore gate remains closed for any fresh inbound arrivals. + try: + setattr(event, "_hermes_startup_restore_replay", True) + except Exception: + pass + await adapter.handle_message(event) + drained += 1 + return drained + + async def _finish_startup_restore(self) -> None: + """Wait for startup auto-resume, then release and drain inbound queue.""" + tasks = list(getattr(self, "_startup_restore_tasks", []) or []) + if tasks: + results = await asyncio.gather(*tasks, return_exceptions=True) + for result in results: + if isinstance(result, Exception): + logger.debug( + "startup auto-resume task failed", + exc_info=(type(result), result, result.__traceback__), + ) + self._startup_restore_tasks = [] + drained = await self._drain_startup_restore_queue() + self._startup_restore_in_progress = False + if drained: + logger.info("Drained %d inbound message(s) queued during startup restore", drained) + def _schedule_resume_pending_sessions(self, platform=None) -> int: """Auto-continue fresh restart-interrupted sessions after startup. @@ -4574,35 +4670,17 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew source=source, internal=True, ) - - async def _guarded_handle_message( - _adapter: Any, _event: MessageEvent, _key: str = entry.session_key, - ) -> None: - """Ensure the pre-claimed sentinel is always released. - - In the normal flow the resume turn reaches - ``_handle_message``, which replaces our pre-claim with - its own ``_AGENT_PENDING_SENTINEL`` (and releases it in - its ``finally`` block) once the run begins. If - ``handle_message`` raises *before* the runner takes over - the slot (e.g. during topic recovery or session-key - resolution), nobody clears our pre-claim — so we do it - here unconditionally. The ``is _AGENT_PENDING_SENTINEL`` - guard below only releases the slot we ourselves placed, - never one a live run currently owns. - """ - try: - await _adapter.handle_message(_event) - finally: - # Only release if the sentinel we set is still there - # (i.e. _process_message_background hasn't replaced - # and cleaned it already). - if self._running_agents.get(_key) is _AGENT_PENDING_SENTINEL: - self._release_running_agent_state(_key) - - task = asyncio.create_task(_guarded_handle_message(adapter, event)) + task = asyncio.create_task( + self._run_startup_resume_event(adapter, event, entry.session_key) + ) self._background_tasks.add(task) task.add_done_callback(self._background_tasks.discard) + if getattr(self, "_startup_restore_in_progress", False): + tasks = getattr(self, "_startup_restore_tasks", None) + if tasks is None: + tasks = [] + self._startup_restore_tasks = tasks + tasks.append(task) scheduled += 1 if scheduled: @@ -4865,6 +4943,15 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew except Exception as e: logger.debug("Stuck-loop detection failed: %s", e) + # Serialize startup restore against inbound dispatch. Platform + # adapters can begin receiving messages as soon as they connect, but + # restart-interrupted sessions are not auto-resumed until all startup + # wiring below completes. Queue inbound messages until the resume + # pass runs and every synthetic resume turn has finished. + self._startup_restore_in_progress = True + self._startup_restore_queue = [] + self._startup_restore_tasks = [] + connected_count = 0 enabled_platform_count = 0 startup_nonretryable_errors: list[str] = [] @@ -4999,6 +5086,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew except Exception: pass self._request_clean_exit(reason) + self._startup_restore_in_progress = False return True if enabled_platform_count > 0: if startup_retryable_errors: @@ -5109,6 +5197,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew # by the normal successful-turn path, so a failed auto-resume remains # visible for manual recovery on the next user message. self._schedule_resume_pending_sessions() + await self._finish_startup_restore() # Drain any recovered process watchers (from crash recovery checkpoint) try: @@ -6411,6 +6500,14 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew """ source = event.source + if ( + getattr(self, "_startup_restore_in_progress", False) + and not getattr(event, "internal", False) + and not getattr(event, "_hermes_startup_restore_replay", False) + ): + self._queue_startup_restore_event(event) + return None + # Internal events (e.g. background-process completion notifications) # are system-generated and must skip user authorization. is_internal = bool(getattr(event, "internal", False)) diff --git a/tests/gateway/test_restart_resume_pending.py b/tests/gateway/test_restart_resume_pending.py index 60f733265da..3ccaf801d52 100644 --- a/tests/gateway/test_restart_resume_pending.py +++ b/tests/gateway/test_restart_resume_pending.py @@ -1189,6 +1189,86 @@ async def test_auto_resume_skips_sessions_with_running_agent(): adapter.handle_message.assert_not_called() +@pytest.mark.asyncio +async def test_startup_restore_gate_queues_real_inbound_messages(): + """Real inbound messages wait while startup restore is in progress.""" + runner, _adapter = make_restart_runner() + runner._startup_restore_in_progress = True + runner._startup_restore_queue = [] + + inbound = MessageEvent( + text="hello", + message_type=MessageType.TEXT, + source=make_restart_source(chat_id="restore-chat"), + ) + + result = await runner._handle_message(inbound) + + assert result is None + assert runner._startup_restore_queue == [inbound] + + +@pytest.mark.asyncio +async def test_startup_restore_waits_for_resume_before_draining_inbound(): + """Queued inbound turns replay only after startup resume tasks finish.""" + runner, adapter = make_restart_runner() + runner._startup_restore_in_progress = True + runner._startup_restore_queue = [] + runner._startup_restore_tasks = [] + + source = make_restart_source(chat_id="restore-chat") + pending_entry = SessionEntry( + session_key="agent:main:telegram:dm:restore-chat", + session_id="sid", + created_at=datetime.now(), + updated_at=datetime.now(), + origin=source, + platform=Platform.TELEGRAM, + chat_type="dm", + resume_pending=True, + resume_reason="restart_interrupted", + last_resume_marked_at=datetime.now(), + ) + runner.session_store._entries = {pending_entry.session_key: pending_entry} + + resume_done = asyncio.Event() + seen: list[str] = [] + + async def fake_handle_message(event: MessageEvent) -> None: + if event.internal: + seen.append("resume-start") + task = asyncio.create_task(resume_done.wait()) + adapter._session_tasks[pending_entry.session_key] = task + return + seen.append(f"inbound:{event.text}") + + adapter.handle_message = fake_handle_message + + scheduled = runner._schedule_resume_pending_sessions() + await asyncio.sleep(0) + + inbound = MessageEvent( + text="hello", + message_type=MessageType.TEXT, + source=source, + ) + assert await runner._handle_message(inbound) is None + assert scheduled == 1 + assert seen == ["resume-start"] + assert runner._startup_restore_queue == [inbound] + + finish_task = asyncio.create_task(runner._finish_startup_restore()) + await asyncio.sleep(0) + assert seen == ["resume-start"] + + resume_done.set() + await finish_task + + assert seen == ["resume-start", "inbound:hello"] + assert runner._startup_restore_queue == [] + assert runner._startup_restore_in_progress is False + + # --------------------------------------------------------------------------- # Shutdown banner wording # ---------------------------------------------------------------------------