From 4c792865b44d73aaf763aaa2b79d7179ed531ee8 Mon Sep 17 00:00:00 2001 From: Teknium <127238744+teknium1@users.noreply.github.com> Date: Thu, 30 Apr 2026 04:54:42 -0700 Subject: [PATCH] test(gateway): pin cleanup invariants for #17758 in-band drain hand-off MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Belt-and-suspenders on top of @briandevans' #17758 fix. The in-band drain hand-off (await->create_task + session-guard preservation) changed cleanup semantics in three places that the original PR reasoned about but didn't test directly. Pin each invariant so a future refactor can't silently regress them: 1. Normal single-message path still releases _active_sessions[sk] and _session_tasks[sk] through end-of-finally. The #17758 follow-up moved _release_session_guard under if current_task is self._session_tasks.get(session_key) For the 99%-common case current_task IS the stored task, so the guard must still fire. Test would fail if the conditional were ever tightened in a way that dropped the normal path. 2. Drain-task cancellation releases the session. If the drain task spawned by the in-band hand-off is cancelled mid-handler (e.g. /stop fired while draining a follow-up), its own finally must fire _release_session_guard. Without this a cancel would leave the session permanently pinned busy. 3. Late-arrival drain still spawns when no in-band drain preceded it. Pre-existing path, but the #17758 follow-up added a re-queue branch that only fires when ownership was already handed off. When no handoff happened the else branch must still spawn a fresh drain task — otherwise a message arriving during stop_typing gets silently dropped. All three tests pass against current main. Zero production code changes. --- .../test_pending_drain_no_recursion.py | 163 ++++++++++++++++++ 1 file changed, 163 insertions(+) diff --git a/tests/gateway/test_pending_drain_no_recursion.py b/tests/gateway/test_pending_drain_no_recursion.py index 620dfe9f3d..b7569b8d02 100644 --- a/tests/gateway/test_pending_drain_no_recursion.py +++ b/tests/gateway/test_pending_drain_no_recursion.py @@ -186,3 +186,166 @@ async def test_in_band_drain_preserves_active_session_guard(): "the handoff window would bypass the Level-1 guard and spawn a " "second handler for the same session." ) + + +# --------------------------------------------------------------------------- +# Follow-up guardrails (belt-and-suspenders on top of the #17758 fix). +# +# The in-band drain hand-off changed cleanup semantics in three subtle ways +# that the original fix reasoned about but didn't test directly. These +# tests pin each invariant so future refactors can't silently regress them. +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_normal_path_releases_session_guard(): + """The common path — one message, nothing queued — must still + fully release ``_active_sessions[sk]`` and ``_session_tasks[sk]`` + through the end-of-finally block. + + The #17758 fix moved ``_release_session_guard(...)`` under an + ``if current_task is self._session_tasks.get(session_key)`` + conditional. For the 99%-common case (no pending message, no + handoff) ``current_task`` IS the stored task, so the guard must + still fire. This test would fail if the conditional were ever + tightened in a way that dropped the normal path.""" + adapter = _make_adapter() + sk = _sk() + + async def handler(event): + return "ok" + + adapter._message_handler = handler + + await adapter.handle_message(_make_event(text="solo")) + + # Wait for the single-shot handler to fully unwind. + for _ in range(200): + if sk not in adapter._active_sessions and sk not in adapter._session_tasks: + break + await asyncio.sleep(0.01) + + await adapter.cancel_background_tasks() + + assert sk not in adapter._active_sessions, ( + "normal-path unwind left _active_sessions[sk] populated — future " + "messages would take the busy-handler path forever" + ) + assert sk not in adapter._session_tasks, ( + "normal-path unwind left _session_tasks[sk] populated — " + "stale-lock detection will treat a dead task as alive" + ) + + +@pytest.mark.asyncio +async def test_drain_task_cancellation_releases_session(): + """If the in-band drain task is cancelled (e.g. user sent ``/stop`` + mid-drain), the session guard and task registry must still get + cleaned up — the cancelled drain task's own ``finally`` runs and + fires ``_release_session_guard``. + + The #17758 fix transfers ownership of ``_session_tasks[sk]`` to + the drain task; the drain task's ``except asyncio.CancelledError`` + branch must then own the cleanup. Without this test a future + refactor could move cancellation handling in a way that leaves + the session permanently pinned as busy after a cancel.""" + adapter = _make_adapter() + sk = _sk() + + turn_started = asyncio.Event() + drain_hit_handler = asyncio.Event() + + async def handler(event): + if event.text == "M0": + # Queue a pending follow-up so an in-band drain task gets spawned. + adapter._pending_messages[sk] = _make_event(text="M1") + turn_started.set() + return "ok" + # M1 is the drained follow-up — hang so we can cancel the drain task. + drain_hit_handler.set() + try: + await asyncio.sleep(10) + except asyncio.CancelledError: + raise + + adapter._message_handler = handler + + await adapter.handle_message(_make_event(text="M0")) + + # Wait for the drain task to actually start running M1. + await asyncio.wait_for(drain_hit_handler.wait(), timeout=2) + + # Cancel the drain task mid-handler. + drain_task = adapter._session_tasks.get(sk) + assert drain_task is not None, "in-band drain did not install a drain task" + assert not drain_task.done(), "drain task finished before we could cancel" + drain_task.cancel() + + # Drain task's finally must release both registries. + for _ in range(200): + if sk not in adapter._active_sessions and sk not in adapter._session_tasks: + break + await asyncio.sleep(0.01) + + await adapter.cancel_background_tasks() + + assert sk not in adapter._active_sessions, ( + "cancelled drain task did not release _active_sessions[sk] — " + "the session stays permanently pinned as busy after a /stop mid-drain" + ) + assert sk not in adapter._session_tasks, ( + "cancelled drain task did not release _session_tasks[sk] — " + "stale-lock detection will treat the dead task as alive" + ) + + +@pytest.mark.asyncio +async def test_late_arrival_drain_still_fires_when_no_in_band_drain(): + """The late-arrival drain in ``finally`` must still spawn a fresh + task when no in-band drain preceded it. + + Pre-#17758 this path already existed; the #17758 follow-up guard + only re-queues when ``_session_tasks[sk] is not current_task``. + For a late-arrival with no in-band drain, ``_session_tasks[sk]`` + IS the current task, so the ``else`` branch must fire and spawn + a drain task for the queued message. + + Queue a pending message *after* M0's handler returns (so the + in-band drain block sees nothing) but *before* ``finally`` runs + the late-arrival check — we do this by hooking ``_stop_typing``, + which runs in finally before the late-arrival check.""" + adapter = _make_adapter() + sk = _sk() + + results: list[str] = [] + original_stop_typing = getattr(adapter, "stop_typing", None) + + async def injecting_stop_typing(chat_id): + # Simulate a message landing during the cleanup awaits. + adapter._pending_messages[sk] = _make_event(text="late") + if original_stop_typing: + await original_stop_typing(chat_id) + + adapter.stop_typing = injecting_stop_typing + + async def handler(event): + results.append(event.text) + return "ok" + + adapter._message_handler = handler + + await adapter.handle_message(_make_event(text="first")) + + # Wait for the late-arrival drain task to finish the second event. + for _ in range(400): + if "late" in results and sk not in adapter._active_sessions: + break + await asyncio.sleep(0.01) + + await adapter.cancel_background_tasks() + + assert "first" in results, "original message handler did not run" + assert "late" in results, ( + "late-arrival drain did not spawn a drain task — a message that " + "landed during cleanup awaits was silently dropped" + )