Merge pull request #51553 from NousResearch/salvage/48300-stale-session-lock

fix(gateway): preserve _session_tasks on guard mismatch to heal stale session lock (#48300)
This commit is contained in:
kshitij 2026-06-24 03:21:38 +05:30 committed by GitHub
commit 89538d47b8
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 93 additions and 2 deletions

View file

@ -4976,8 +4976,27 @@ class BasePlatformAdapter(ABC):
# same session.
current_task = asyncio.current_task()
if current_task is not None and self._session_tasks.get(session_key) is current_task:
del self._session_tasks[session_key]
self._release_session_guard(session_key, guard=interrupt_event)
self._cleanup_finished_session_task(session_key, interrupt_event)
def _cleanup_finished_session_task(
self, session_key: str, interrupt_event: Optional[asyncio.Event]
) -> None:
"""Release the session guard for a finished owner task, then drop its
``_session_tasks`` entry ONLY if the guard was actually released.
Release-then-conditional-delete is the #48300 fix: when a concurrent
path (reset/new command, drain handoff) swapped ``_active_sessions[key]``
to a different guard, ``_release_session_guard`` skips on the guard
mismatch and the lock stays installed. If we deleted ``_session_tasks``
unconditionally (the old order), ``_session_task_is_stale`` would later
see no owner task and report "not stale", so the orphaned guard would
never be healed a permanent session deadlock. Keeping the done-task
entry when the guard survives lets the on-entry self-heal detect the
stale lock and clear it on the next inbound message.
"""
self._release_session_guard(session_key, guard=interrupt_event)
if session_key not in self._active_sessions:
self._session_tasks.pop(session_key, None)
async def cancel_background_tasks(self) -> None:
"""Cancel any in-flight background message-processing tasks.

View file

@ -299,6 +299,78 @@ class TestStaleSessionLockSelfHeal:
assert sk in adapter._active_sessions
assert sk in adapter._session_tasks
@pytest.mark.asyncio
async def test_guard_mismatch_preserves_session_task_for_stale_detection(self):
"""When guard mismatch skips _release_session_guard, _session_tasks is preserved.
This is the core of the production split-brain fix: the finally block
only deletes _session_tasks[key] if _active_sessions[key] was actually
released. If the guard was swapped (e.g., by a reset command), the
_session_tasks entry remains so _session_task_is_stale can detect the
done task and heal the lock on the next inbound message.
"""
adapter = _make_adapter()
sk = _session_key()
# Simulate: task recorded with guard=event_a
event_a = asyncio.Event()
async def _done():
return None
done_task = asyncio.create_task(_done())
await done_task
adapter._active_sessions[sk] = event_a
adapter._session_tasks[sk] = done_task
# Simulate guard swap (as reset/new command would do)
event_b = asyncio.Event()
adapter._active_sessions[sk] = event_b
# Drive the REAL finally-block cleanup helper (not a copy of its logic):
# _release_session_guard sees event_b != event_a → skips releasing, so
# _session_tasks must be preserved for stale detection.
adapter._cleanup_finished_session_task(sk, event_a)
# _session_tasks preserved because guard mismatch kept _active_sessions
assert sk in adapter._session_tasks, (
"_session_tasks entry must survive guard mismatch so stale detection works"
)
assert adapter._session_tasks[sk] is done_task
# Stale detection now works: task is done, guard is stale
assert adapter._session_task_is_stale(sk) is True
# Heal clears both
assert adapter._heal_stale_session_lock(sk) is True
assert sk not in adapter._active_sessions
assert sk not in adapter._session_tasks
@pytest.mark.asyncio
async def test_cleanup_releases_and_deletes_when_guard_matches(self):
"""Positive path for #48300: when the guard still matches (normal
completion), the helper releases the guard AND drops the task entry
the release-then-conditional-delete must not strand a healthy session."""
adapter = _make_adapter()
sk = _session_key()
event_a = asyncio.Event()
async def _done():
return None
done_task = asyncio.create_task(_done())
await done_task
adapter._active_sessions[sk] = event_a
adapter._session_tasks[sk] = done_task
# No guard swap → _release_session_guard matches event_a and releases.
adapter._cleanup_finished_session_task(sk, event_a)
assert sk not in adapter._active_sessions, "guard must be released on match"
assert sk not in adapter._session_tasks, "task entry must be dropped after release"
# ===========================================================================
# Layer 3: Runner-side generation guard on slot promotion + release