diff --git a/gateway/platforms/base.py b/gateway/platforms/base.py index 55f74f88f0c..ac1eeef0b89 100644 --- a/gateway/platforms/base.py +++ b/gateway/platforms/base.py @@ -4976,8 +4976,27 @@ class BasePlatformAdapter(ABC): # same session. current_task = asyncio.current_task() if current_task is not None and self._session_tasks.get(session_key) is current_task: - del self._session_tasks[session_key] - self._release_session_guard(session_key, guard=interrupt_event) + self._cleanup_finished_session_task(session_key, interrupt_event) + + def _cleanup_finished_session_task( + self, session_key: str, interrupt_event: Optional[asyncio.Event] + ) -> None: + """Release the session guard for a finished owner task, then drop its + ``_session_tasks`` entry ONLY if the guard was actually released. + + Release-then-conditional-delete is the #48300 fix: when a concurrent + path (reset/new command, drain handoff) swapped ``_active_sessions[key]`` + to a different guard, ``_release_session_guard`` skips on the guard + mismatch and the lock stays installed. If we deleted ``_session_tasks`` + unconditionally (the old order), ``_session_task_is_stale`` would later + see no owner task and report "not stale", so the orphaned guard would + never be healed — a permanent session deadlock. Keeping the done-task + entry when the guard survives lets the on-entry self-heal detect the + stale lock and clear it on the next inbound message. + """ + self._release_session_guard(session_key, guard=interrupt_event) + if session_key not in self._active_sessions: + self._session_tasks.pop(session_key, None) async def cancel_background_tasks(self) -> None: """Cancel any in-flight background message-processing tasks. diff --git a/tests/gateway/test_session_split_brain_11016.py b/tests/gateway/test_session_split_brain_11016.py index 85fe274ab2e..4a00f31b138 100644 --- a/tests/gateway/test_session_split_brain_11016.py +++ b/tests/gateway/test_session_split_brain_11016.py @@ -299,6 +299,78 @@ class TestStaleSessionLockSelfHeal: assert sk in adapter._active_sessions assert sk in adapter._session_tasks + @pytest.mark.asyncio + async def test_guard_mismatch_preserves_session_task_for_stale_detection(self): + """When guard mismatch skips _release_session_guard, _session_tasks is preserved. + + This is the core of the production split-brain fix: the finally block + only deletes _session_tasks[key] if _active_sessions[key] was actually + released. If the guard was swapped (e.g., by a reset command), the + _session_tasks entry remains so _session_task_is_stale can detect the + done task and heal the lock on the next inbound message. + """ + adapter = _make_adapter() + sk = _session_key() + + # Simulate: task recorded with guard=event_a + event_a = asyncio.Event() + async def _done(): + return None + + done_task = asyncio.create_task(_done()) + await done_task + + adapter._active_sessions[sk] = event_a + adapter._session_tasks[sk] = done_task + + # Simulate guard swap (as reset/new command would do) + event_b = asyncio.Event() + adapter._active_sessions[sk] = event_b + + # Drive the REAL finally-block cleanup helper (not a copy of its logic): + # _release_session_guard sees event_b != event_a → skips releasing, so + # _session_tasks must be preserved for stale detection. + adapter._cleanup_finished_session_task(sk, event_a) + + # _session_tasks preserved because guard mismatch kept _active_sessions + assert sk in adapter._session_tasks, ( + "_session_tasks entry must survive guard mismatch so stale detection works" + ) + assert adapter._session_tasks[sk] is done_task + + # Stale detection now works: task is done, guard is stale + assert adapter._session_task_is_stale(sk) is True + + # Heal clears both + assert adapter._heal_stale_session_lock(sk) is True + assert sk not in adapter._active_sessions + assert sk not in adapter._session_tasks + + @pytest.mark.asyncio + async def test_cleanup_releases_and_deletes_when_guard_matches(self): + """Positive path for #48300: when the guard still matches (normal + completion), the helper releases the guard AND drops the task entry — + the release-then-conditional-delete must not strand a healthy session.""" + adapter = _make_adapter() + sk = _session_key() + + event_a = asyncio.Event() + + async def _done(): + return None + + done_task = asyncio.create_task(_done()) + await done_task + + adapter._active_sessions[sk] = event_a + adapter._session_tasks[sk] = done_task + + # No guard swap → _release_session_guard matches event_a and releases. + adapter._cleanup_finished_session_task(sk, event_a) + + assert sk not in adapter._active_sessions, "guard must be released on match" + assert sk not in adapter._session_tasks, "task entry must be dropped after release" + # =========================================================================== # Layer 3: Runner-side generation guard on slot promotion + release