From 6e2fd955ca64e921361c201fb540b4561befede0 Mon Sep 17 00:00:00 2001 From: liuhao1024 Date: Sat, 13 Jun 2026 17:07:51 +0800 Subject: [PATCH 1/3] fix(gateway): claim session slot before auto-resume task to prevent duplicate agents MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When the gateway restarts and auto-resumes an interrupted session, an inbound message arriving in the window between `asyncio.create_task()` and the task's first await could spin up a second AIAgent for the same session. Both agents would then process messages concurrently, producing interleaved duplicate responses (#45456). Fix: set `_AGENT_PENDING_SENTINEL` in `_running_agents` immediately after the "already running" check, before creating the task. This closes the race window — any inbound message sees the slot as occupied and queues behind the auto-resume. A `_guarded_handle_message` wrapper ensures the pre-claimed sentinel is always released, even if `handle_message` raises before reaching `_process_message_background` (whose `finally` block handles normal cleanup). (cherry picked from commit 85150c976bcd067d96900dbf85a4616bb4851e1c) --- gateway/run.py | 32 ++++++- tests/gateway/test_restart_resume_pending.py | 91 ++++++++++++++++++++ 2 files changed, 122 insertions(+), 1 deletion(-) diff --git a/gateway/run.py b/gateway/run.py index 5266b7b033a..268e69ea82f 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -4556,6 +4556,14 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew ) continue + # Claim the session slot *before* spawning the task so that an + # inbound message arriving between task creation and the task's + # first await (where _process_message_background sets the real + # sentinel) sees the slot as occupied and queues behind it + # instead of spinning up a duplicate AIAgent (#45456). + self._running_agents[entry.session_key] = _AGENT_PENDING_SENTINEL + self._running_agents_ts[entry.session_key] = time.time() + # Empty-text internal event — the _is_resume_pending branch in # _handle_message_with_agent prepends the proper reason-aware # system note before the turn runs. @@ -4565,7 +4573,29 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew source=source, internal=True, ) - task = asyncio.create_task(adapter.handle_message(event)) + + async def _guarded_handle_message( + _adapter: Any, _event: MessageEvent, _key: str = entry.session_key, + ) -> None: + """Ensure the pre-claimed sentinel is always released. + + In the normal flow, ``_process_message_background`` sets + its own sentinel and releases it in its ``finally`` block. + If ``handle_message`` raises *before* reaching that + method (e.g. during topic recovery or session key + resolution), nobody clears our pre-claim — so we do it + here unconditionally. + """ + try: + await _adapter.handle_message(_event) + finally: + # Only release if the sentinel we set is still there + # (i.e. _process_message_background hasn't replaced + # and cleaned it already). + if self._running_agents.get(_key) is _AGENT_PENDING_SENTINEL: + self._release_running_agent_state(_key) + + task = asyncio.create_task(_guarded_handle_message(adapter, event)) self._background_tasks.add(task) task.add_done_callback(self._background_tasks.discard) scheduled += 1 diff --git a/tests/gateway/test_restart_resume_pending.py b/tests/gateway/test_restart_resume_pending.py index 420328639c7..1f90787cec1 100644 --- a/tests/gateway/test_restart_resume_pending.py +++ b/tests/gateway/test_restart_resume_pending.py @@ -35,6 +35,7 @@ import pytest from gateway.config import GatewayConfig, HomeChannel, Platform from gateway.platforms.base import MessageEvent, MessageType, SendResult from gateway.run import ( + _AGENT_PENDING_SENTINEL, _auto_continue_freshness_window, _coerce_gateway_timestamp, _is_fresh_gateway_interruption, @@ -1420,3 +1421,93 @@ class TestStuckLoopEscalation: {"indent": None}, ) ] + + +@pytest.mark.asyncio +async def test_auto_resume_sets_sentinel_before_task_execution(): + """Auto-resume must claim the session slot before the task starts. + + Regression for #45456: between ``asyncio.create_task()`` and the task's + first await (where ``_process_message_background`` sets the real + sentinel), an inbound message could arrive and spin up a duplicate + AIAgent. The fix pre-claims the slot so the inbound path sees it as + occupied. + """ + runner, adapter = make_restart_runner() + source = make_restart_source(chat_id="race-chat") + pending_entry = SessionEntry( + session_key="agent:main:telegram:dm:race-chat", + session_id="sid", + created_at=datetime.now(), + updated_at=datetime.now(), + origin=source, + platform=Platform.TELEGRAM, + chat_type="dm", + resume_pending=True, + resume_reason="restart_interrupted", + last_resume_marked_at=datetime.now(), + ) + runner.session_store._entries = {pending_entry.session_key: pending_entry} + + # Slow mock: hold the task open so we can inspect _running_agents + # while it's in-flight. + gate = asyncio.Event() + + async def _slow_handle(event): + await gate.wait() + + adapter.handle_message = _slow_handle + + scheduled = runner._schedule_resume_pending_sessions() + + assert scheduled == 1 + # The sentinel must be set immediately — before the task starts executing. + assert pending_entry.session_key in runner._running_agents + assert runner._running_agents[pending_entry.session_key] is _AGENT_PENDING_SENTINEL + assert pending_entry.session_key in runner._running_agents_ts + + # Release the task and let it complete. + gate.set() + await asyncio.sleep(0.05) + + # After the task completes, the sentinel should be cleaned up. + assert pending_entry.session_key not in runner._running_agents + + +@pytest.mark.asyncio +async def test_auto_resume_sentinel_cleaned_on_task_failure(): + """If handle_message raises before _process_message_background, the + sentinel must still be released so the session is not locked forever. + """ + runner, adapter = make_restart_runner() + source = make_restart_source(chat_id="fail-chat") + pending_entry = SessionEntry( + session_key="agent:main:telegram:dm:fail-chat", + session_id="sid", + created_at=datetime.now(), + updated_at=datetime.now(), + origin=source, + platform=Platform.TELEGRAM, + chat_type="dm", + resume_pending=True, + resume_reason="restart_interrupted", + last_resume_marked_at=datetime.now(), + ) + runner.session_store._entries = {pending_entry.session_key: pending_entry} + + async def _failing_handle(event): + raise RuntimeError("adapter exploded") + + adapter.handle_message = _failing_handle + + scheduled = runner._schedule_resume_pending_sessions() + assert scheduled == 1 + + # Sentinel is set immediately. + assert pending_entry.session_key in runner._running_agents + + # Let the task run and fail. + await asyncio.sleep(0.05) + + # The sentinel must be cleaned up despite the failure. + assert pending_entry.session_key not in runner._running_agents From 63097ee0d7ec3f05d69ae5a0b6b9d33004bce032 Mon Sep 17 00:00:00 2001 From: kshitijk4poor <82637225+kshitijk4poor@users.noreply.github.com> Date: Sat, 13 Jun 2026 23:39:35 +0530 Subject: [PATCH 2/3] test(gateway): cover auto-resume full-path no-regression; clarify guard docstring MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The salvaged fix's two regression tests mock adapter.handle_message, so they only assert the pre-claimed sentinel is set/cleaned around a stub — they never drive the real dispatch chain. Add a full-path test that exercises _schedule_resume_pending_sessions -> _guarded_handle_message -> adapter.handle_message -> _process_message_background -> _handle_message and asserts the resumed session's agent runs EXACTLY ONCE: not zero (the pre-claim must not self-bounce the resume into a queued no-op) and not twice (the duplicate-agent bug #45456 the fix targets). Also assert no leaked sentinel and no orphaned pending event after the drain settles. Tighten the _guarded_handle_message docstring: on current main the real sentinel is taken over inside _handle_message (not _process_message_background), and note the `is _AGENT_PENDING_SENTINEL` guard only releases the slot we ourselves placed, never one a live run owns. --- gateway/run.py | 14 ++- tests/gateway/test_restart_resume_pending.py | 101 +++++++++++++++++++ 2 files changed, 110 insertions(+), 5 deletions(-) diff --git a/gateway/run.py b/gateway/run.py index 268e69ea82f..f95a535bede 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -4579,12 +4579,16 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew ) -> None: """Ensure the pre-claimed sentinel is always released. - In the normal flow, ``_process_message_background`` sets - its own sentinel and releases it in its ``finally`` block. - If ``handle_message`` raises *before* reaching that - method (e.g. during topic recovery or session key + In the normal flow the resume turn reaches + ``_handle_message``, which replaces our pre-claim with + its own ``_AGENT_PENDING_SENTINEL`` (and releases it in + its ``finally`` block) once the run begins. If + ``handle_message`` raises *before* the runner takes over + the slot (e.g. during topic recovery or session-key resolution), nobody clears our pre-claim — so we do it - here unconditionally. + here unconditionally. The ``is _AGENT_PENDING_SENTINEL`` + guard below only releases the slot we ourselves placed, + never one a live run currently owns. """ try: await _adapter.handle_message(_event) diff --git a/tests/gateway/test_restart_resume_pending.py b/tests/gateway/test_restart_resume_pending.py index 1f90787cec1..60f733265da 100644 --- a/tests/gateway/test_restart_resume_pending.py +++ b/tests/gateway/test_restart_resume_pending.py @@ -1511,3 +1511,104 @@ async def test_auto_resume_sentinel_cleaned_on_task_failure(): # The sentinel must be cleaned up despite the failure. assert pending_entry.session_key not in runner._running_agents + + +@pytest.mark.asyncio +async def test_auto_resume_runs_agent_exactly_once_through_full_path(): + """Full-path regression: the pre-claim must NOT make auto-resume a no-op. + + The two tests above mock ``adapter.handle_message`` outright, so they + only prove the sentinel is set/cleaned around a stub — they never + exercise the real dispatch chain. This drives the production path + end to end: + + _schedule_resume_pending_sessions + -> _guarded_handle_message + -> adapter.handle_message (real) + -> _process_message_background (real) + -> _handle_message (real) + + The risk the pre-claim introduces is a *self-bounce*: the resume + turn's own ``_handle_message`` sees the sentinel it pre-claimed at + the early running-agent guard, queues the event into + ``_pending_messages`` and returns ``None`` without running the + agent. The adapter's late-arrival drain (in + ``_process_message_background``'s ``finally``) re-dispatches the + queued event, and because the guard wrapper's ``finally`` releases + the pre-claim before the spawned drain task starts, the agent runs + exactly once. This test locks that invariant in: the resume agent + must run once — never zero (regression) and never twice (the bug + the fix targets). + """ + runner, adapter = make_restart_runner() + source = make_restart_source(chat_id="full-path-chat") + session_key = runner._session_key_for_source(source) + pending_entry = SessionEntry( + session_key=session_key, + session_id="sid", + created_at=datetime.now(), + updated_at=datetime.now(), + origin=source, + platform=Platform.TELEGRAM, + chat_type="dm", + resume_pending=True, + resume_reason="restart_interrupted", + last_resume_marked_at=datetime.now(), + ) + runner.session_store._entries = {session_key: pending_entry} + + # Wire the REAL runner pipeline that _handle_message depends on. + from gateway.run import GatewayRunner + + runner._handle_message = GatewayRunner._handle_message.__get__( + runner, GatewayRunner + ) + runner._release_running_agent_state = ( + GatewayRunner._release_running_agent_state.__get__(runner, GatewayRunner) + ) + runner._check_slash_access = lambda *a, **k: None + runner._begin_session_run_generation = lambda session_key: 1 + runner._is_session_run_current = lambda session_key, generation: True + runner._invalidate_session_run_generation = lambda *a, **k: 0 + runner._claim_active_session_slot = lambda session_key, source: (object(), None) + runner._active_session_leases = {} + runner._busy_ack_ts = {} + runner._post_turn_goal_continuation = AsyncMock() + runner.session_store.get_or_create_session.return_value = None + + # Count how many times an actual agent run is started for this session. + agent_runs: list[str] = [] + + async def _fake_run(event, source, _quick_key, run_generation): + agent_runs.append(_quick_key) + return "RESUMED OK" + + runner._handle_message_with_agent = _fake_run + + # Route the adapter's real background pipeline at the real handler, + # and stub the leaf send/typing calls so delivery is a no-op. + adapter.set_message_handler(runner._handle_message) + adapter.send = AsyncMock() + adapter._keep_typing = AsyncMock() + adapter._stop_typing_refresh = AsyncMock() + adapter._send_with_retry = AsyncMock( + return_value=SendResult(success=True, message_id="1") + ) + adapter._run_processing_hook = AsyncMock() + + scheduled = runner._schedule_resume_pending_sessions() + assert scheduled == 1 + # Pre-claim must be visible immediately. + assert runner._running_agents.get(session_key) is _AGENT_PENDING_SENTINEL + + # Let the guarded task, the background task, and the late-arrival + # drain task all settle. + for _ in range(20): + await asyncio.sleep(0.02) + + # Exactly one agent run for the resumed session — not zero (the + # pre-claim did not swallow the resume) and not two (no duplicate). + assert agent_runs == [session_key] + # No leaked sentinel and no orphaned queued event. + assert session_key not in runner._running_agents + assert session_key not in getattr(adapter, "_pending_messages", {}) From 28902dc8906b50077026e063080599f386a900b6 Mon Sep 17 00:00:00 2001 From: kshitijk4poor <82637225+kshitijk4poor@users.noreply.github.com> Date: Sat, 13 Jun 2026 23:39:49 +0530 Subject: [PATCH 3/3] chore: map liuhao1024 contributor email for attribution --- scripts/release.py | 1 + 1 file changed, 1 insertion(+) diff --git a/scripts/release.py b/scripts/release.py index 7ef4f46cbab..1de1747a36d 100755 --- a/scripts/release.py +++ b/scripts/release.py @@ -1529,6 +1529,7 @@ AUTHOR_MAP = { "chanhokyim@gmail.com": "joel611", # PR #33958 salvage (DISCORD_ALLOWED_ROLES role_authorized gateway flag) "desg38@gmail.com": "dschnurbusch", # PR #42373 salvage (archive compressed conversation lineages) "bsmith@bramarstrategicservices.com": "bcsmith528", # PR #20589 salvage (register_slack_action_handler plugin API) + "sunsky.lau@gmail.com": "liuhao1024", # PR #45494 salvage (claim session slot before auto-resume task; #45456) }