fix(gateway): guard-match the finally-block _active_sessions delete

Before this, _process_message_background's finally did an unconditional
'del self._active_sessions[session_key]' — even if a /stop/ /new
command had already swapped in its own command_guard via
_dispatch_active_session_command and cancelled us.  The old task's
unwind would clobber the newer guard, opening a race for follow-ups.

Replace with _release_session_guard(session_key, guard=interrupt_event)
so the delete only fires when the guard we captured is still the one
installed.  The sibling _session_tasks pop already had equivalent
ownership matching via asyncio.current_task() identity; this closes the
asymmetry.

Adds two direct regressions in test_session_split_brain_11016:
- stale guard reference must not clobber a newer guard by identity
- guard=None default still releases unconditionally (for callers that
  don't have a captured guard to match against)

Refs #11016
This commit is contained in:
Teknium 2026-04-23 05:12:41 -07:00 committed by Teknium
parent 81d925f2a5
commit 5651a73331
2 changed files with 49 additions and 6 deletions

View file

@ -2379,15 +2379,14 @@ class BasePlatformAdapter(ABC):
# Leave _active_sessions[session_key] populated — the drain
# task's own lifecycle will clean it up.
else:
# Clean up session tracking. Only release the guard if this
# task still owns it (protects against the case where a
# reset-like command already swapped in its own guard while
# we were unwinding).
# Clean up session tracking. Guard-match both deletes so a
# reset-like command that already swapped in its own
# command_guard (and cancelled us) can't be accidentally
# cleared by our unwind. The command owns the session now.
current_task = asyncio.current_task()
if current_task is not None and self._session_tasks.get(session_key) is current_task:
del self._session_tasks[session_key]
if session_key in self._active_sessions:
del self._active_sessions[session_key]
self._release_session_guard(session_key, guard=interrupt_event)
async def cancel_background_tasks(self) -> None:
"""Cancel any in-flight background message-processing tasks.

View file

@ -353,3 +353,47 @@ class TestRunnerSessionGenerationGuard:
gen2 = runner._begin_session_run_generation(sk)
assert gen2 > gen1
assert runner._is_session_run_current(sk, gen2) is True
# ===========================================================================
# Layer 1 (regression): old task's finally must NOT delete a newer guard
# ===========================================================================
class TestOldTaskCannotClobberNewerGuard:
"""Direct regression for the unconditional-delete bug.
Before the guard-match fix, a task in its finally would delete
``_active_sessions[session_key]`` unconditionally even if a
/stop/ /new command had already swapped in its own command_guard
(which then gets clobbered, opening a race for follow-up messages).
"""
def test_release_session_guard_matches_on_event_identity(self):
adapter = _make_adapter()
sk = _session_key()
old_guard = asyncio.Event()
new_guard = asyncio.Event()
# Command swapped in a newer guard.
adapter._active_sessions[sk] = new_guard
# Old task tries to release using its captured (stale) guard.
adapter._release_session_guard(sk, guard=old_guard)
# The newer guard survives.
assert adapter._active_sessions.get(sk) is new_guard
# Now the command itself releases using the matching guard.
adapter._release_session_guard(sk, guard=new_guard)
assert sk not in adapter._active_sessions
def test_release_session_guard_without_guard_releases_unconditionally(self):
adapter = _make_adapter()
sk = _session_key()
adapter._active_sessions[sk] = asyncio.Event()
# Callers that don't know the guard (e.g. cancel_session_processing's
# default path) still work.
adapter._release_session_guard(sk)
assert sk not in adapter._active_sessions