fix(gateway): preserve _session_tasks on guard mismatch to enable stale lock healing (#48300)

_session_task_is_stale() failed to detect a stale session lock when the owner
task completed and cleaned _session_tasks (del in _process_message_background's
finally) but _active_sessions was NOT released because _release_session_guard
skipped on a guard mismatch (a concurrent reset/new command or drain handoff
swapped _active_sessions[key] to a different guard). With no owner task left to
inspect, _session_task_is_stale reported 'not stale', the orphaned guard was
never healed, and the session deadlocked permanently — later messages received
but never dispatched.

Reorder the finally cleanup to release-then-conditional-delete: release the
guard first, then drop the _session_tasks entry ONLY if the guard was actually
released (session_key no longer in _active_sessions). On a guard mismatch the
done-task entry survives, so the on-entry self-heal (_session_task_is_stale ->
_heal_stale_session_lock) detects the stale lock and clears it on the next
inbound message.

Extracted the cleanup into a callable _cleanup_finished_session_task() helper so
the regression test drives the REAL production code path rather than a copy of
its logic (the original test inlined the fixed logic and passed regardless of
the production order — mutation-verified the rewritten tests now fail on the
buggy del-first order). Added a positive-path test (guard matches -> release +
delete) so both branches are pinned.

Co-authored-by: kshitijk4poor <82637225+kshitijk4poor@users.noreply.github.com>
This commit is contained in:
islam666 2026-06-24 03:06:21 +05:30
parent 292a456c06
commit 0c79992db5
2 changed files with 93 additions and 2 deletions

View file

@ -4976,8 +4976,27 @@ class BasePlatformAdapter(ABC):
# same session.
current_task = asyncio.current_task()
if current_task is not None and self._session_tasks.get(session_key) is current_task:
del self._session_tasks[session_key]
self._release_session_guard(session_key, guard=interrupt_event)
self._cleanup_finished_session_task(session_key, interrupt_event)
def _cleanup_finished_session_task(
self, session_key: str, interrupt_event: Optional[asyncio.Event]
) -> None:
"""Release the session guard for a finished owner task, then drop its
``_session_tasks`` entry ONLY if the guard was actually released.
Release-then-conditional-delete is the #48300 fix: when a concurrent
path (reset/new command, drain handoff) swapped ``_active_sessions[key]``
to a different guard, ``_release_session_guard`` skips on the guard
mismatch and the lock stays installed. If we deleted ``_session_tasks``
unconditionally (the old order), ``_session_task_is_stale`` would later
see no owner task and report "not stale", so the orphaned guard would
never be healed a permanent session deadlock. Keeping the done-task
entry when the guard survives lets the on-entry self-heal detect the
stale lock and clear it on the next inbound message.
"""
self._release_session_guard(session_key, guard=interrupt_event)
if session_key not in self._active_sessions:
self._session_tasks.pop(session_key, None)
async def cancel_background_tasks(self) -> None:
"""Cancel any in-flight background message-processing tasks.

View file

@ -299,6 +299,78 @@ class TestStaleSessionLockSelfHeal:
assert sk in adapter._active_sessions
assert sk in adapter._session_tasks
@pytest.mark.asyncio
async def test_guard_mismatch_preserves_session_task_for_stale_detection(self):
"""When guard mismatch skips _release_session_guard, _session_tasks is preserved.
This is the core of the production split-brain fix: the finally block
only deletes _session_tasks[key] if _active_sessions[key] was actually
released. If the guard was swapped (e.g., by a reset command), the
_session_tasks entry remains so _session_task_is_stale can detect the
done task and heal the lock on the next inbound message.
"""
adapter = _make_adapter()
sk = _session_key()
# Simulate: task recorded with guard=event_a
event_a = asyncio.Event()
async def _done():
return None
done_task = asyncio.create_task(_done())
await done_task
adapter._active_sessions[sk] = event_a
adapter._session_tasks[sk] = done_task
# Simulate guard swap (as reset/new command would do)
event_b = asyncio.Event()
adapter._active_sessions[sk] = event_b
# Drive the REAL finally-block cleanup helper (not a copy of its logic):
# _release_session_guard sees event_b != event_a → skips releasing, so
# _session_tasks must be preserved for stale detection.
adapter._cleanup_finished_session_task(sk, event_a)
# _session_tasks preserved because guard mismatch kept _active_sessions
assert sk in adapter._session_tasks, (
"_session_tasks entry must survive guard mismatch so stale detection works"
)
assert adapter._session_tasks[sk] is done_task
# Stale detection now works: task is done, guard is stale
assert adapter._session_task_is_stale(sk) is True
# Heal clears both
assert adapter._heal_stale_session_lock(sk) is True
assert sk not in adapter._active_sessions
assert sk not in adapter._session_tasks
@pytest.mark.asyncio
async def test_cleanup_releases_and_deletes_when_guard_matches(self):
"""Positive path for #48300: when the guard still matches (normal
completion), the helper releases the guard AND drops the task entry
the release-then-conditional-delete must not strand a healthy session."""
adapter = _make_adapter()
sk = _session_key()
event_a = asyncio.Event()
async def _done():
return None
done_task = asyncio.create_task(_done())
await done_task
adapter._active_sessions[sk] = event_a
adapter._session_tasks[sk] = done_task
# No guard swap → _release_session_guard matches event_a and releases.
adapter._cleanup_finished_session_task(sk, event_a)
assert sk not in adapter._active_sessions, "guard must be released on match"
assert sk not in adapter._session_tasks, "task entry must be dropped after release"
# ===========================================================================
# Layer 3: Runner-side generation guard on slot promotion + release