diff --git a/gateway/platforms/base.py b/gateway/platforms/base.py index 8b9de642ff8..db76034989c 100644 --- a/gateway/platforms/base.py +++ b/gateway/platforms/base.py @@ -2379,15 +2379,14 @@ class BasePlatformAdapter(ABC): # Leave _active_sessions[session_key] populated — the drain # task's own lifecycle will clean it up. else: - # Clean up session tracking. Only release the guard if this - # task still owns it (protects against the case where a - # reset-like command already swapped in its own guard while - # we were unwinding). + # Clean up session tracking. Guard-match both deletes so a + # reset-like command that already swapped in its own + # command_guard (and cancelled us) can't be accidentally + # cleared by our unwind. The command owns the session now. current_task = asyncio.current_task() if current_task is not None and self._session_tasks.get(session_key) is current_task: del self._session_tasks[session_key] - if session_key in self._active_sessions: - del self._active_sessions[session_key] + self._release_session_guard(session_key, guard=interrupt_event) async def cancel_background_tasks(self) -> None: """Cancel any in-flight background message-processing tasks. diff --git a/tests/gateway/test_session_split_brain_11016.py b/tests/gateway/test_session_split_brain_11016.py index af5a935db15..1076a77c44c 100644 --- a/tests/gateway/test_session_split_brain_11016.py +++ b/tests/gateway/test_session_split_brain_11016.py @@ -353,3 +353,47 @@ class TestRunnerSessionGenerationGuard: gen2 = runner._begin_session_run_generation(sk) assert gen2 > gen1 assert runner._is_session_run_current(sk, gen2) is True + + +# =========================================================================== +# Layer 1 (regression): old task's finally must NOT delete a newer guard +# =========================================================================== + + +class TestOldTaskCannotClobberNewerGuard: + """Direct regression for the unconditional-delete bug. + + Before the guard-match fix, a task in its finally would delete + ``_active_sessions[session_key]`` unconditionally — even if a + /stop/ /new command had already swapped in its own command_guard + (which then gets clobbered, opening a race for follow-up messages). + """ + + def test_release_session_guard_matches_on_event_identity(self): + adapter = _make_adapter() + sk = _session_key() + + old_guard = asyncio.Event() + new_guard = asyncio.Event() + # Command swapped in a newer guard. + adapter._active_sessions[sk] = new_guard + + # Old task tries to release using its captured (stale) guard. + adapter._release_session_guard(sk, guard=old_guard) + + # The newer guard survives. + assert adapter._active_sessions.get(sk) is new_guard + + # Now the command itself releases using the matching guard. + adapter._release_session_guard(sk, guard=new_guard) + assert sk not in adapter._active_sessions + + def test_release_session_guard_without_guard_releases_unconditionally(self): + adapter = _make_adapter() + sk = _session_key() + adapter._active_sessions[sk] = asyncio.Event() + # Callers that don't know the guard (e.g. cancel_session_processing's + # default path) still work. + adapter._release_session_guard(sk) + assert sk not in adapter._active_sessions +