fix(gateway): serialize startup auto-resume before inbound (#46074)

Gateway startup now queues real inbound messages until restart-interrupted auto-resume turns have completed, preventing duplicate agents for the same session after a restart.
This commit is contained in:
Teknium 2026-06-14 03:21:06 -07:00 committed by GitHub
parent 2b4873f7fb
commit 10bad2faf1
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 204 additions and 27 deletions

View file

@ -2001,6 +2001,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
_stop_task: Optional[asyncio.Task] = None
_session_model_overrides: Dict[str, Dict[str, str]] = {}
_session_reasoning_overrides: Dict[str, Dict[str, Any]] = {}
_startup_restore_in_progress: bool = False
def __init__(self, config: Optional[GatewayConfig] = None):
global _gateway_runner_ref
@ -2081,6 +2082,13 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
self._pending_native_image_paths_by_session: Dict[str, List[str]] = {}
self._busy_ack_ts: Dict[str, float] = {} # last busy-ack timestamp per session (debounce)
self._session_run_generation: Dict[str, int] = {}
# Startup restore gate: while restart-interrupted sessions are being
# auto-resumed, real inbound messages are queued instead of competing
# with the synthetic resume turns for the same session. The queued
# events drain only after all startup resume tasks have finished.
self._startup_restore_in_progress = False
self._startup_restore_queue: List[MessageEvent] = []
self._startup_restore_tasks: List[asyncio.Task] = []
# LRU cache of live SessionSources keyed by session_key. Used by
# fallback routing paths (shutdown notifications, synthetic
# background-process events) when the persisted origin is missing
@ -4496,6 +4504,94 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
{"restart_timeout", "shutdown_timeout", "restart_interrupted"}
)
async def _run_startup_resume_event(
self,
adapter: BasePlatformAdapter,
event: MessageEvent,
session_key: str,
) -> None:
"""Dispatch one synthetic startup resume and wait for its agent turn.
``BasePlatformAdapter.handle_message()`` returns after it installs the
adapter-level guard and spawns the background processing task. Startup
restore needs a stronger boundary: inbound messages must stay queued
until the resumed agent turn itself has finished, otherwise a user
message can race the restore turn immediately after ``handle_message``
returns.
"""
try:
await adapter.handle_message(event)
session_tasks = getattr(adapter, "_session_tasks", {})
task = session_tasks.get(session_key) if isinstance(session_tasks, dict) else None
if task is not None:
await asyncio.shield(task)
finally:
# _schedule_resume_pending_sessions pre-claims the runner slot
# before spawning this task. If adapter.handle_message raises
# before _handle_message takes ownership, release that pre-claim;
# otherwise the real run's normal cleanup owns the slot.
if self._running_agents.get(session_key) is _AGENT_PENDING_SENTINEL:
self._release_running_agent_state(session_key)
def _queue_startup_restore_event(self, event: MessageEvent) -> None:
queue = getattr(self, "_startup_restore_queue", None)
if queue is None:
queue = []
self._startup_restore_queue = queue
queue.append(event)
try:
source = event.source
logger.info(
"Queued inbound message during gateway startup restore: platform=%s chat=%s",
source.platform.value if source and source.platform else "unknown",
source.chat_id if source else "unknown",
)
except Exception:
pass
async def _drain_startup_restore_queue(self) -> int:
"""Replay inbound messages queued while startup auto-resume ran."""
drained = 0
queue = getattr(self, "_startup_restore_queue", None)
if queue is None:
return 0
while queue:
event = queue.pop(0)
source = getattr(event, "source", None)
adapter = self.adapters.get(source.platform) if source is not None else None
if adapter is None:
logger.debug(
"Dropping startup-restore queued message: adapter unavailable for %s",
getattr(getattr(source, "platform", None), "value", None),
)
continue
# Mark this replay so _handle_message does not queue it again while
# the restore gate remains closed for any fresh inbound arrivals.
try:
setattr(event, "_hermes_startup_restore_replay", True)
except Exception:
pass
await adapter.handle_message(event)
drained += 1
return drained
async def _finish_startup_restore(self) -> None:
"""Wait for startup auto-resume, then release and drain inbound queue."""
tasks = list(getattr(self, "_startup_restore_tasks", []) or [])
if tasks:
results = await asyncio.gather(*tasks, return_exceptions=True)
for result in results:
if isinstance(result, Exception):
logger.debug(
"startup auto-resume task failed",
exc_info=(type(result), result, result.__traceback__),
)
self._startup_restore_tasks = []
drained = await self._drain_startup_restore_queue()
self._startup_restore_in_progress = False
if drained:
logger.info("Drained %d inbound message(s) queued during startup restore", drained)
def _schedule_resume_pending_sessions(self, platform=None) -> int:
"""Auto-continue fresh restart-interrupted sessions after startup.
@ -4574,35 +4670,17 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
source=source,
internal=True,
)
async def _guarded_handle_message(
_adapter: Any, _event: MessageEvent, _key: str = entry.session_key,
) -> None:
"""Ensure the pre-claimed sentinel is always released.
In the normal flow the resume turn reaches
``_handle_message``, which replaces our pre-claim with
its own ``_AGENT_PENDING_SENTINEL`` (and releases it in
its ``finally`` block) once the run begins. If
``handle_message`` raises *before* the runner takes over
the slot (e.g. during topic recovery or session-key
resolution), nobody clears our pre-claim so we do it
here unconditionally. The ``is _AGENT_PENDING_SENTINEL``
guard below only releases the slot we ourselves placed,
never one a live run currently owns.
"""
try:
await _adapter.handle_message(_event)
finally:
# Only release if the sentinel we set is still there
# (i.e. _process_message_background hasn't replaced
# and cleaned it already).
if self._running_agents.get(_key) is _AGENT_PENDING_SENTINEL:
self._release_running_agent_state(_key)
task = asyncio.create_task(_guarded_handle_message(adapter, event))
task = asyncio.create_task(
self._run_startup_resume_event(adapter, event, entry.session_key)
)
self._background_tasks.add(task)
task.add_done_callback(self._background_tasks.discard)
if getattr(self, "_startup_restore_in_progress", False):
tasks = getattr(self, "_startup_restore_tasks", None)
if tasks is None:
tasks = []
self._startup_restore_tasks = tasks
tasks.append(task)
scheduled += 1
if scheduled:
@ -4865,6 +4943,15 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
except Exception as e:
logger.debug("Stuck-loop detection failed: %s", e)
# Serialize startup restore against inbound dispatch. Platform
# adapters can begin receiving messages as soon as they connect, but
# restart-interrupted sessions are not auto-resumed until all startup
# wiring below completes. Queue inbound messages until the resume
# pass runs and every synthetic resume turn has finished.
self._startup_restore_in_progress = True
self._startup_restore_queue = []
self._startup_restore_tasks = []
connected_count = 0
enabled_platform_count = 0
startup_nonretryable_errors: list[str] = []
@ -4999,6 +5086,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
except Exception:
pass
self._request_clean_exit(reason)
self._startup_restore_in_progress = False
return True
if enabled_platform_count > 0:
if startup_retryable_errors:
@ -5109,6 +5197,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
# by the normal successful-turn path, so a failed auto-resume remains
# visible for manual recovery on the next user message.
self._schedule_resume_pending_sessions()
await self._finish_startup_restore()
# Drain any recovered process watchers (from crash recovery checkpoint)
try:
@ -6411,6 +6500,14 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
"""
source = event.source
if (
getattr(self, "_startup_restore_in_progress", False)
and not getattr(event, "internal", False)
and not getattr(event, "_hermes_startup_restore_replay", False)
):
self._queue_startup_restore_event(event)
return None
# Internal events (e.g. background-process completion notifications)
# are system-generated and must skip user authorization.
is_internal = bool(getattr(event, "internal", False))

View file

@ -1189,6 +1189,86 @@ async def test_auto_resume_skips_sessions_with_running_agent():
adapter.handle_message.assert_not_called()
@pytest.mark.asyncio
async def test_startup_restore_gate_queues_real_inbound_messages():
"""Real inbound messages wait while startup restore is in progress."""
runner, _adapter = make_restart_runner()
runner._startup_restore_in_progress = True
runner._startup_restore_queue = []
inbound = MessageEvent(
text="hello",
message_type=MessageType.TEXT,
source=make_restart_source(chat_id="restore-chat"),
)
result = await runner._handle_message(inbound)
assert result is None
assert runner._startup_restore_queue == [inbound]
@pytest.mark.asyncio
async def test_startup_restore_waits_for_resume_before_draining_inbound():
"""Queued inbound turns replay only after startup resume tasks finish."""
runner, adapter = make_restart_runner()
runner._startup_restore_in_progress = True
runner._startup_restore_queue = []
runner._startup_restore_tasks = []
source = make_restart_source(chat_id="restore-chat")
pending_entry = SessionEntry(
session_key="agent:main:telegram:dm:restore-chat",
session_id="sid",
created_at=datetime.now(),
updated_at=datetime.now(),
origin=source,
platform=Platform.TELEGRAM,
chat_type="dm",
resume_pending=True,
resume_reason="restart_interrupted",
last_resume_marked_at=datetime.now(),
)
runner.session_store._entries = {pending_entry.session_key: pending_entry}
resume_done = asyncio.Event()
seen: list[str] = []
async def fake_handle_message(event: MessageEvent) -> None:
if event.internal:
seen.append("resume-start")
task = asyncio.create_task(resume_done.wait())
adapter._session_tasks[pending_entry.session_key] = task
return
seen.append(f"inbound:{event.text}")
adapter.handle_message = fake_handle_message
scheduled = runner._schedule_resume_pending_sessions()
await asyncio.sleep(0)
inbound = MessageEvent(
text="hello",
message_type=MessageType.TEXT,
source=source,
)
assert await runner._handle_message(inbound) is None
assert scheduled == 1
assert seen == ["resume-start"]
assert runner._startup_restore_queue == [inbound]
finish_task = asyncio.create_task(runner._finish_startup_restore())
await asyncio.sleep(0)
assert seen == ["resume-start"]
resume_done.set()
await finish_task
assert seen == ["resume-start", "inbound:hello"]
assert runner._startup_restore_queue == []
assert runner._startup_restore_in_progress is False
# ---------------------------------------------------------------------------
# Shutdown banner wording
# ---------------------------------------------------------------------------