test(gateway): pin cleanup invariants for #17758 in-band drain hand-off

Belt-and-suspenders on top of @briandevans' #17758 fix.  The in-band
drain hand-off (await->create_task + session-guard preservation)
changed cleanup semantics in three places that the original PR
reasoned about but didn't test directly.  Pin each invariant so a
future refactor can't silently regress them:

1. Normal single-message path still releases _active_sessions[sk] and
   _session_tasks[sk] through end-of-finally.  The #17758 follow-up
   moved _release_session_guard under
     if current_task is self._session_tasks.get(session_key)
   For the 99%-common case current_task IS the stored task, so the
   guard must still fire.  Test would fail if the conditional were
   ever tightened in a way that dropped the normal path.

2. Drain-task cancellation releases the session.  If the drain task
   spawned by the in-band hand-off is cancelled mid-handler (e.g.
   /stop fired while draining a follow-up), its own finally must
   fire _release_session_guard.  Without this a cancel would leave
   the session permanently pinned busy.

3. Late-arrival drain still spawns when no in-band drain preceded
   it.  Pre-existing path, but the #17758 follow-up added a
   re-queue branch that only fires when ownership was already
   handed off.  When no handoff happened the else branch must still
   spawn a fresh drain task — otherwise a message arriving during
   stop_typing gets silently dropped.

All three tests pass against current main.  Zero production code
changes.
This commit is contained in:
Teknium 2026-04-30 04:54:42 -07:00
parent a845177ebe
commit 4c792865b4

View file

@ -186,3 +186,166 @@ async def test_in_band_drain_preserves_active_session_guard():
"the handoff window would bypass the Level-1 guard and spawn a "
"second handler for the same session."
)
# ---------------------------------------------------------------------------
# Follow-up guardrails (belt-and-suspenders on top of the #17758 fix).
#
# The in-band drain hand-off changed cleanup semantics in three subtle ways
# that the original fix reasoned about but didn't test directly. These
# tests pin each invariant so future refactors can't silently regress them.
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_normal_path_releases_session_guard():
"""The common path — one message, nothing queued — must still
fully release ``_active_sessions[sk]`` and ``_session_tasks[sk]``
through the end-of-finally block.
The #17758 fix moved ``_release_session_guard(...)`` under an
``if current_task is self._session_tasks.get(session_key)``
conditional. For the 99%-common case (no pending message, no
handoff) ``current_task`` IS the stored task, so the guard must
still fire. This test would fail if the conditional were ever
tightened in a way that dropped the normal path."""
adapter = _make_adapter()
sk = _sk()
async def handler(event):
return "ok"
adapter._message_handler = handler
await adapter.handle_message(_make_event(text="solo"))
# Wait for the single-shot handler to fully unwind.
for _ in range(200):
if sk not in adapter._active_sessions and sk not in adapter._session_tasks:
break
await asyncio.sleep(0.01)
await adapter.cancel_background_tasks()
assert sk not in adapter._active_sessions, (
"normal-path unwind left _active_sessions[sk] populated — future "
"messages would take the busy-handler path forever"
)
assert sk not in adapter._session_tasks, (
"normal-path unwind left _session_tasks[sk] populated — "
"stale-lock detection will treat a dead task as alive"
)
@pytest.mark.asyncio
async def test_drain_task_cancellation_releases_session():
"""If the in-band drain task is cancelled (e.g. user sent ``/stop``
mid-drain), the session guard and task registry must still get
cleaned up the cancelled drain task's own ``finally`` runs and
fires ``_release_session_guard``.
The #17758 fix transfers ownership of ``_session_tasks[sk]`` to
the drain task; the drain task's ``except asyncio.CancelledError``
branch must then own the cleanup. Without this test a future
refactor could move cancellation handling in a way that leaves
the session permanently pinned as busy after a cancel."""
adapter = _make_adapter()
sk = _sk()
turn_started = asyncio.Event()
drain_hit_handler = asyncio.Event()
async def handler(event):
if event.text == "M0":
# Queue a pending follow-up so an in-band drain task gets spawned.
adapter._pending_messages[sk] = _make_event(text="M1")
turn_started.set()
return "ok"
# M1 is the drained follow-up — hang so we can cancel the drain task.
drain_hit_handler.set()
try:
await asyncio.sleep(10)
except asyncio.CancelledError:
raise
adapter._message_handler = handler
await adapter.handle_message(_make_event(text="M0"))
# Wait for the drain task to actually start running M1.
await asyncio.wait_for(drain_hit_handler.wait(), timeout=2)
# Cancel the drain task mid-handler.
drain_task = adapter._session_tasks.get(sk)
assert drain_task is not None, "in-band drain did not install a drain task"
assert not drain_task.done(), "drain task finished before we could cancel"
drain_task.cancel()
# Drain task's finally must release both registries.
for _ in range(200):
if sk not in adapter._active_sessions and sk not in adapter._session_tasks:
break
await asyncio.sleep(0.01)
await adapter.cancel_background_tasks()
assert sk not in adapter._active_sessions, (
"cancelled drain task did not release _active_sessions[sk] — "
"the session stays permanently pinned as busy after a /stop mid-drain"
)
assert sk not in adapter._session_tasks, (
"cancelled drain task did not release _session_tasks[sk] — "
"stale-lock detection will treat the dead task as alive"
)
@pytest.mark.asyncio
async def test_late_arrival_drain_still_fires_when_no_in_band_drain():
"""The late-arrival drain in ``finally`` must still spawn a fresh
task when no in-band drain preceded it.
Pre-#17758 this path already existed; the #17758 follow-up guard
only re-queues when ``_session_tasks[sk] is not current_task``.
For a late-arrival with no in-band drain, ``_session_tasks[sk]``
IS the current task, so the ``else`` branch must fire and spawn
a drain task for the queued message.
Queue a pending message *after* M0's handler returns (so the
in-band drain block sees nothing) but *before* ``finally`` runs
the late-arrival check we do this by hooking ``_stop_typing``,
which runs in finally before the late-arrival check."""
adapter = _make_adapter()
sk = _sk()
results: list[str] = []
original_stop_typing = getattr(adapter, "stop_typing", None)
async def injecting_stop_typing(chat_id):
# Simulate a message landing during the cleanup awaits.
adapter._pending_messages[sk] = _make_event(text="late")
if original_stop_typing:
await original_stop_typing(chat_id)
adapter.stop_typing = injecting_stop_typing
async def handler(event):
results.append(event.text)
return "ok"
adapter._message_handler = handler
await adapter.handle_message(_make_event(text="first"))
# Wait for the late-arrival drain task to finish the second event.
for _ in range(400):
if "late" in results and sk not in adapter._active_sessions:
break
await asyncio.sleep(0.01)
await adapter.cancel_background_tasks()
assert "first" in results, "original message handler did not run"
assert "late" in results, (
"late-arrival drain did not spawn a drain task — a message that "
"landed during cleanup awaits was silently dropped"
)