Merge pull request #45753 from kshitijk4poor/salvage/gateway-auto-resume-duplicate-agent

fix(gateway): claim session slot before auto-resume task to prevent duplicate agents (#45456)
This commit is contained in:
kshitij 2026-06-13 23:46:17 +05:30 committed by GitHub
commit 7be22e37e1
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 228 additions and 1 deletions

View file

@ -4556,6 +4556,14 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
)
continue
# Claim the session slot *before* spawning the task so that an
# inbound message arriving between task creation and the task's
# first await (where _process_message_background sets the real
# sentinel) sees the slot as occupied and queues behind it
# instead of spinning up a duplicate AIAgent (#45456).
self._running_agents[entry.session_key] = _AGENT_PENDING_SENTINEL
self._running_agents_ts[entry.session_key] = time.time()
# Empty-text internal event — the _is_resume_pending branch in
# _handle_message_with_agent prepends the proper reason-aware
# system note before the turn runs.
@ -4565,7 +4573,33 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
source=source,
internal=True,
)
task = asyncio.create_task(adapter.handle_message(event))
async def _guarded_handle_message(
_adapter: Any, _event: MessageEvent, _key: str = entry.session_key,
) -> None:
"""Ensure the pre-claimed sentinel is always released.
In the normal flow the resume turn reaches
``_handle_message``, which replaces our pre-claim with
its own ``_AGENT_PENDING_SENTINEL`` (and releases it in
its ``finally`` block) once the run begins. If
``handle_message`` raises *before* the runner takes over
the slot (e.g. during topic recovery or session-key
resolution), nobody clears our pre-claim so we do it
here unconditionally. The ``is _AGENT_PENDING_SENTINEL``
guard below only releases the slot we ourselves placed,
never one a live run currently owns.
"""
try:
await _adapter.handle_message(_event)
finally:
# Only release if the sentinel we set is still there
# (i.e. _process_message_background hasn't replaced
# and cleaned it already).
if self._running_agents.get(_key) is _AGENT_PENDING_SENTINEL:
self._release_running_agent_state(_key)
task = asyncio.create_task(_guarded_handle_message(adapter, event))
self._background_tasks.add(task)
task.add_done_callback(self._background_tasks.discard)
scheduled += 1

View file

@ -1529,6 +1529,7 @@ AUTHOR_MAP = {
"chanhokyim@gmail.com": "joel611", # PR #33958 salvage (DISCORD_ALLOWED_ROLES role_authorized gateway flag)
"desg38@gmail.com": "dschnurbusch", # PR #42373 salvage (archive compressed conversation lineages)
"bsmith@bramarstrategicservices.com": "bcsmith528", # PR #20589 salvage (register_slack_action_handler plugin API)
"sunsky.lau@gmail.com": "liuhao1024", # PR #45494 salvage (claim session slot before auto-resume task; #45456)
}

View file

@ -35,6 +35,7 @@ import pytest
from gateway.config import GatewayConfig, HomeChannel, Platform
from gateway.platforms.base import MessageEvent, MessageType, SendResult
from gateway.run import (
_AGENT_PENDING_SENTINEL,
_auto_continue_freshness_window,
_coerce_gateway_timestamp,
_is_fresh_gateway_interruption,
@ -1420,3 +1421,194 @@ class TestStuckLoopEscalation:
{"indent": None},
)
]
@pytest.mark.asyncio
async def test_auto_resume_sets_sentinel_before_task_execution():
"""Auto-resume must claim the session slot before the task starts.
Regression for #45456: between ``asyncio.create_task()`` and the task's
first await (where ``_process_message_background`` sets the real
sentinel), an inbound message could arrive and spin up a duplicate
AIAgent. The fix pre-claims the slot so the inbound path sees it as
occupied.
"""
runner, adapter = make_restart_runner()
source = make_restart_source(chat_id="race-chat")
pending_entry = SessionEntry(
session_key="agent:main:telegram:dm:race-chat",
session_id="sid",
created_at=datetime.now(),
updated_at=datetime.now(),
origin=source,
platform=Platform.TELEGRAM,
chat_type="dm",
resume_pending=True,
resume_reason="restart_interrupted",
last_resume_marked_at=datetime.now(),
)
runner.session_store._entries = {pending_entry.session_key: pending_entry}
# Slow mock: hold the task open so we can inspect _running_agents
# while it's in-flight.
gate = asyncio.Event()
async def _slow_handle(event):
await gate.wait()
adapter.handle_message = _slow_handle
scheduled = runner._schedule_resume_pending_sessions()
assert scheduled == 1
# The sentinel must be set immediately — before the task starts executing.
assert pending_entry.session_key in runner._running_agents
assert runner._running_agents[pending_entry.session_key] is _AGENT_PENDING_SENTINEL
assert pending_entry.session_key in runner._running_agents_ts
# Release the task and let it complete.
gate.set()
await asyncio.sleep(0.05)
# After the task completes, the sentinel should be cleaned up.
assert pending_entry.session_key not in runner._running_agents
@pytest.mark.asyncio
async def test_auto_resume_sentinel_cleaned_on_task_failure():
"""If handle_message raises before _process_message_background, the
sentinel must still be released so the session is not locked forever.
"""
runner, adapter = make_restart_runner()
source = make_restart_source(chat_id="fail-chat")
pending_entry = SessionEntry(
session_key="agent:main:telegram:dm:fail-chat",
session_id="sid",
created_at=datetime.now(),
updated_at=datetime.now(),
origin=source,
platform=Platform.TELEGRAM,
chat_type="dm",
resume_pending=True,
resume_reason="restart_interrupted",
last_resume_marked_at=datetime.now(),
)
runner.session_store._entries = {pending_entry.session_key: pending_entry}
async def _failing_handle(event):
raise RuntimeError("adapter exploded")
adapter.handle_message = _failing_handle
scheduled = runner._schedule_resume_pending_sessions()
assert scheduled == 1
# Sentinel is set immediately.
assert pending_entry.session_key in runner._running_agents
# Let the task run and fail.
await asyncio.sleep(0.05)
# The sentinel must be cleaned up despite the failure.
assert pending_entry.session_key not in runner._running_agents
@pytest.mark.asyncio
async def test_auto_resume_runs_agent_exactly_once_through_full_path():
"""Full-path regression: the pre-claim must NOT make auto-resume a no-op.
The two tests above mock ``adapter.handle_message`` outright, so they
only prove the sentinel is set/cleaned around a stub they never
exercise the real dispatch chain. This drives the production path
end to end:
_schedule_resume_pending_sessions
-> _guarded_handle_message
-> adapter.handle_message (real)
-> _process_message_background (real)
-> _handle_message (real)
The risk the pre-claim introduces is a *self-bounce*: the resume
turn's own ``_handle_message`` sees the sentinel it pre-claimed at
the early running-agent guard, queues the event into
``_pending_messages`` and returns ``None`` without running the
agent. The adapter's late-arrival drain (in
``_process_message_background``'s ``finally``) re-dispatches the
queued event, and because the guard wrapper's ``finally`` releases
the pre-claim before the spawned drain task starts, the agent runs
exactly once. This test locks that invariant in: the resume agent
must run once never zero (regression) and never twice (the bug
the fix targets).
"""
runner, adapter = make_restart_runner()
source = make_restart_source(chat_id="full-path-chat")
session_key = runner._session_key_for_source(source)
pending_entry = SessionEntry(
session_key=session_key,
session_id="sid",
created_at=datetime.now(),
updated_at=datetime.now(),
origin=source,
platform=Platform.TELEGRAM,
chat_type="dm",
resume_pending=True,
resume_reason="restart_interrupted",
last_resume_marked_at=datetime.now(),
)
runner.session_store._entries = {session_key: pending_entry}
# Wire the REAL runner pipeline that _handle_message depends on.
from gateway.run import GatewayRunner
runner._handle_message = GatewayRunner._handle_message.__get__(
runner, GatewayRunner
)
runner._release_running_agent_state = (
GatewayRunner._release_running_agent_state.__get__(runner, GatewayRunner)
)
runner._check_slash_access = lambda *a, **k: None
runner._begin_session_run_generation = lambda session_key: 1
runner._is_session_run_current = lambda session_key, generation: True
runner._invalidate_session_run_generation = lambda *a, **k: 0
runner._claim_active_session_slot = lambda session_key, source: (object(), None)
runner._active_session_leases = {}
runner._busy_ack_ts = {}
runner._post_turn_goal_continuation = AsyncMock()
runner.session_store.get_or_create_session.return_value = None
# Count how many times an actual agent run is started for this session.
agent_runs: list[str] = []
async def _fake_run(event, source, _quick_key, run_generation):
agent_runs.append(_quick_key)
return "RESUMED OK"
runner._handle_message_with_agent = _fake_run
# Route the adapter's real background pipeline at the real handler,
# and stub the leaf send/typing calls so delivery is a no-op.
adapter.set_message_handler(runner._handle_message)
adapter.send = AsyncMock()
adapter._keep_typing = AsyncMock()
adapter._stop_typing_refresh = AsyncMock()
adapter._send_with_retry = AsyncMock(
return_value=SendResult(success=True, message_id="1")
)
adapter._run_processing_hook = AsyncMock()
scheduled = runner._schedule_resume_pending_sessions()
assert scheduled == 1
# Pre-claim must be visible immediately.
assert runner._running_agents.get(session_key) is _AGENT_PENDING_SENTINEL
# Let the guarded task, the background task, and the late-arrival
# drain task all settle.
for _ in range(20):
await asyncio.sleep(0.02)
# Exactly one agent run for the resumed session — not zero (the
# pre-claim did not swallow the resume) and not two (no duplicate).
assert agent_runs == [session_key]
# No leaked sentinel and no orphaned queued event.
assert session_key not in runner._running_agents
assert session_key not in getattr(adapter, "_pending_messages", {})