From 0c79992db565de298ca694cf2278a094ed601f1a Mon Sep 17 00:00:00 2001 From: islam666 Date: Wed, 24 Jun 2026 03:06:21 +0530 Subject: [PATCH] fix(gateway): preserve _session_tasks on guard mismatch to enable stale lock healing (#48300) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit _session_task_is_stale() failed to detect a stale session lock when the owner task completed and cleaned _session_tasks (del in _process_message_background's finally) but _active_sessions was NOT released because _release_session_guard skipped on a guard mismatch (a concurrent reset/new command or drain handoff swapped _active_sessions[key] to a different guard). With no owner task left to inspect, _session_task_is_stale reported 'not stale', the orphaned guard was never healed, and the session deadlocked permanently — later messages received but never dispatched. Reorder the finally cleanup to release-then-conditional-delete: release the guard first, then drop the _session_tasks entry ONLY if the guard was actually released (session_key no longer in _active_sessions). On a guard mismatch the done-task entry survives, so the on-entry self-heal (_session_task_is_stale -> _heal_stale_session_lock) detects the stale lock and clears it on the next inbound message. Extracted the cleanup into a callable _cleanup_finished_session_task() helper so the regression test drives the REAL production code path rather than a copy of its logic (the original test inlined the fixed logic and passed regardless of the production order — mutation-verified the rewritten tests now fail on the buggy del-first order). Added a positive-path test (guard matches -> release + delete) so both branches are pinned. Co-authored-by: kshitijk4poor <82637225+kshitijk4poor@users.noreply.github.com> --- gateway/platforms/base.py | 23 +++++- .../gateway/test_session_split_brain_11016.py | 72 +++++++++++++++++++ 2 files changed, 93 insertions(+), 2 deletions(-) diff --git a/gateway/platforms/base.py b/gateway/platforms/base.py index 55f74f88f0c..ac1eeef0b89 100644 --- a/gateway/platforms/base.py +++ b/gateway/platforms/base.py @@ -4976,8 +4976,27 @@ class BasePlatformAdapter(ABC): # same session. current_task = asyncio.current_task() if current_task is not None and self._session_tasks.get(session_key) is current_task: - del self._session_tasks[session_key] - self._release_session_guard(session_key, guard=interrupt_event) + self._cleanup_finished_session_task(session_key, interrupt_event) + + def _cleanup_finished_session_task( + self, session_key: str, interrupt_event: Optional[asyncio.Event] + ) -> None: + """Release the session guard for a finished owner task, then drop its + ``_session_tasks`` entry ONLY if the guard was actually released. + + Release-then-conditional-delete is the #48300 fix: when a concurrent + path (reset/new command, drain handoff) swapped ``_active_sessions[key]`` + to a different guard, ``_release_session_guard`` skips on the guard + mismatch and the lock stays installed. If we deleted ``_session_tasks`` + unconditionally (the old order), ``_session_task_is_stale`` would later + see no owner task and report "not stale", so the orphaned guard would + never be healed — a permanent session deadlock. Keeping the done-task + entry when the guard survives lets the on-entry self-heal detect the + stale lock and clear it on the next inbound message. + """ + self._release_session_guard(session_key, guard=interrupt_event) + if session_key not in self._active_sessions: + self._session_tasks.pop(session_key, None) async def cancel_background_tasks(self) -> None: """Cancel any in-flight background message-processing tasks. diff --git a/tests/gateway/test_session_split_brain_11016.py b/tests/gateway/test_session_split_brain_11016.py index 85fe274ab2e..4a00f31b138 100644 --- a/tests/gateway/test_session_split_brain_11016.py +++ b/tests/gateway/test_session_split_brain_11016.py @@ -299,6 +299,78 @@ class TestStaleSessionLockSelfHeal: assert sk in adapter._active_sessions assert sk in adapter._session_tasks + @pytest.mark.asyncio + async def test_guard_mismatch_preserves_session_task_for_stale_detection(self): + """When guard mismatch skips _release_session_guard, _session_tasks is preserved. + + This is the core of the production split-brain fix: the finally block + only deletes _session_tasks[key] if _active_sessions[key] was actually + released. If the guard was swapped (e.g., by a reset command), the + _session_tasks entry remains so _session_task_is_stale can detect the + done task and heal the lock on the next inbound message. + """ + adapter = _make_adapter() + sk = _session_key() + + # Simulate: task recorded with guard=event_a + event_a = asyncio.Event() + async def _done(): + return None + + done_task = asyncio.create_task(_done()) + await done_task + + adapter._active_sessions[sk] = event_a + adapter._session_tasks[sk] = done_task + + # Simulate guard swap (as reset/new command would do) + event_b = asyncio.Event() + adapter._active_sessions[sk] = event_b + + # Drive the REAL finally-block cleanup helper (not a copy of its logic): + # _release_session_guard sees event_b != event_a → skips releasing, so + # _session_tasks must be preserved for stale detection. + adapter._cleanup_finished_session_task(sk, event_a) + + # _session_tasks preserved because guard mismatch kept _active_sessions + assert sk in adapter._session_tasks, ( + "_session_tasks entry must survive guard mismatch so stale detection works" + ) + assert adapter._session_tasks[sk] is done_task + + # Stale detection now works: task is done, guard is stale + assert adapter._session_task_is_stale(sk) is True + + # Heal clears both + assert adapter._heal_stale_session_lock(sk) is True + assert sk not in adapter._active_sessions + assert sk not in adapter._session_tasks + + @pytest.mark.asyncio + async def test_cleanup_releases_and_deletes_when_guard_matches(self): + """Positive path for #48300: when the guard still matches (normal + completion), the helper releases the guard AND drops the task entry — + the release-then-conditional-delete must not strand a healthy session.""" + adapter = _make_adapter() + sk = _session_key() + + event_a = asyncio.Event() + + async def _done(): + return None + + done_task = asyncio.create_task(_done()) + await done_task + + adapter._active_sessions[sk] = event_a + adapter._session_tasks[sk] = done_task + + # No guard swap → _release_session_guard matches event_a and releases. + adapter._cleanup_finished_session_task(sk, event_a) + + assert sk not in adapter._active_sessions, "guard must be released on match" + assert sk not in adapter._session_tasks, "task entry must be dropped after release" + # =========================================================================== # Layer 3: Runner-side generation guard on slot promotion + release