From 663ba9a58fc60b3fa3e224248bc05af1d1f09635 Mon Sep 17 00:00:00 2001 From: briandevans <252620095+briandevans@users.noreply.github.com> Date: Wed, 29 Apr 2026 22:19:00 -0700 Subject: [PATCH] fix(gateway): drain pending messages via fresh task, not recursion (#17758) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `_process_message_background` finished a turn, found a queued follow-up, and drained it via `await self._process_message_background(pending_event, session_key)`. Each chained follow-up added a frame to the call stack instead of starting fresh. Under sustained pending-queue activity (e.g. a user sending follow-ups faster than the agent finishes turns) the C stack would exhaust at ~2000 nested frames and SIGSEGV the process. Mirror the late-arrival drain pattern that already exists in the same function: spawn a new `asyncio.create_task(...)` for the pending event and return so the current frame can unwind. The new task takes ownership via `_session_tasks[session_key]`. The late-arrival drain in `finally` could now race with the in-band drain across the `await typing_task` / `await stop_typing` window, so add a guard: if `_session_tasks[session_key]` is no longer the current task, an in-band drain already spawned a follow-up task — re-queue the late-arrival event so that task picks it up after its current event, instead of spawning a second concurrent task for the same session_key. Regression test (`test_pending_drain_no_recursion.py`) chains 12 follow-ups and asserts the recorded `_process_message_background` stack depth stays bounded at handler entry. Pre-fix: depths grow linearly `[1,2,3,…,12]`. Post-fix: all depths are `1`. `test_duplicate_reply_suppression::test_stale_response_suppressed_when_interrupted` called `_process_message_background` directly and implicitly relied on the old recursive `await` semantic — updated to wait for the spawned drain task before checking the sent list. Co-Authored-By: Claude Opus 4.7 (1M context) --- gateway/platforms/base.py | 78 ++++++++--- .../test_duplicate_reply_suppression.py | 9 ++ .../test_pending_drain_no_recursion.py | 129 ++++++++++++++++++ 3 files changed, 194 insertions(+), 22 deletions(-) create mode 100644 tests/gateway/test_pending_drain_no_recursion.py diff --git a/gateway/platforms/base.py b/gateway/platforms/base.py index da992792e3..2eae88ccdb 100644 --- a/gateway/platforms/base.py +++ b/gateway/platforms/base.py @@ -2708,9 +2708,27 @@ class BasePlatformAdapter(ABC): if _active is not None: _active.clear() await _stop_typing_task() - # Process pending message in new background task - await self._process_message_background(pending_event, session_key) - return # Already cleaned up + # Spawn a fresh task for the pending message instead of + # recursing. Issue #17758: `await + # self._process_message_background(...)` here grew the + # call stack one frame per chained follow-up, and under + # sustained pending-queue activity the C stack would + # exhaust at ~2000 frames and SIGSEGV the process. + # Mirror the late-arrival drain pattern below: hand off + # to a new task and return so this frame can unwind. + drain_task = asyncio.create_task( + self._process_message_background(pending_event, session_key) + ) + # Hand ownership of the session to the drain task so + # stale-lock detection keeps working while it runs. + self._session_tasks[session_key] = drain_task + try: + self._background_tasks.add(drain_task) + drain_task.add_done_callback(self._background_tasks.discard) + except TypeError: + # Tests stub create_task() with non-hashable sentinels; tolerate. + pass + return # Drain task owns the session now. except asyncio.CancelledError: current_task = asyncio.current_task() @@ -2772,25 +2790,41 @@ class BasePlatformAdapter(ABC): # dropped (user never gets a reply). late_pending = self._pending_messages.pop(session_key, None) if late_pending is not None: - logger.debug( - "[%s] Late-arrival pending message during cleanup — spawning drain task", - self.name, - ) - _active = self._active_sessions.get(session_key) - if _active is not None: - _active.clear() - drain_task = asyncio.create_task( - self._process_message_background(late_pending, session_key) - ) - # Hand ownership of the session to the drain task so stale-lock - # detection keeps working while it runs. - self._session_tasks[session_key] = drain_task - try: - self._background_tasks.add(drain_task) - drain_task.add_done_callback(self._background_tasks.discard) - except TypeError: - # Tests stub create_task() with non-hashable sentinels; tolerate. - pass + current_task = asyncio.current_task() + existing_task = self._session_tasks.get(session_key) + if ( + existing_task is not None + and existing_task is not current_task + ): + # The in-band drain (or an earlier late-arrival drain) + # already spawned a follow-up task that owns this + # session. Re-queue the late-arrival event so that + # task picks it up — avoids spawning two concurrent + # _process_message_background tasks for the same key + # (#17758 follow-up: prevents the create_task path + # from racing with itself across the in-band/finally + # boundary). + self._pending_messages[session_key] = late_pending + else: + logger.debug( + "[%s] Late-arrival pending message during cleanup — spawning drain task", + self.name, + ) + _active = self._active_sessions.get(session_key) + if _active is not None: + _active.clear() + drain_task = asyncio.create_task( + self._process_message_background(late_pending, session_key) + ) + # Hand ownership of the session to the drain task so stale-lock + # detection keeps working while it runs. + self._session_tasks[session_key] = drain_task + try: + self._background_tasks.add(drain_task) + drain_task.add_done_callback(self._background_tasks.discard) + except TypeError: + # Tests stub create_task() with non-hashable sentinels; tolerate. + pass # Leave _active_sessions[session_key] populated — the drain # task's own lifecycle will clean it up. else: diff --git a/tests/gateway/test_duplicate_reply_suppression.py b/tests/gateway/test_duplicate_reply_suppression.py index c275a12c07..908e023d88 100644 --- a/tests/gateway/test_duplicate_reply_suppression.py +++ b/tests/gateway/test_duplicate_reply_suppression.py @@ -108,6 +108,15 @@ class TestBaseInterruptSuppression: await adapter._process_message_background(event_a, session_key) + # The in-band pending-drain now hands off to a fresh task instead + # of recursing (#17758). Wait for that task to finish before + # checking the sent list. + for _ in range(200): + if any(s["content"] == pending_response for s in adapter.sent): + break + await asyncio.sleep(0.01) + await adapter.cancel_background_tasks() + # The stale response should NOT have been sent. stale_sends = [s for s in adapter.sent if s["content"] == stale_response] assert len(stale_sends) == 0, ( diff --git a/tests/gateway/test_pending_drain_no_recursion.py b/tests/gateway/test_pending_drain_no_recursion.py new file mode 100644 index 0000000000..a005061ebc --- /dev/null +++ b/tests/gateway/test_pending_drain_no_recursion.py @@ -0,0 +1,129 @@ +"""Regression test for #17758 — chained pending-message drains must not +grow the call stack. + +Before the fix, ``_process_message_background`` finished a turn, found a +pending follow-up, and drained it via ``await +self._process_message_background(pending_event, session_key)``. Each +queued follow-up added a frame to the call stack instead of starting +fresh, so under sustained pending-queue activity the C stack would +exhaust at ~2000 nested frames and the process would crash with +SIGSEGV. + +After the fix, the in-band drain spawns a fresh task (mirroring the +late-arrival drain pattern), so the stack stays bounded regardless of +chain length. + +We assert the invariant directly: count nested +``_process_message_background`` frames at handler entry across a chain +of N follow-ups. Recursion makes depth grow linearly (1, 2, 3, …, N); +task spawning keeps it constant (1 every time). +""" + +import asyncio +import sys + +import pytest +from unittest.mock import AsyncMock + +from gateway.config import Platform, PlatformConfig +from gateway.platforms.base import ( + BasePlatformAdapter, + MessageEvent, + MessageType, +) +from gateway.session import SessionSource, build_session_key + + +class _StubAdapter(BasePlatformAdapter): + async def connect(self): + pass + + async def disconnect(self): + pass + + async def send(self, chat_id, text, **kwargs): + return None + + async def get_chat_info(self, chat_id): + return {} + + +def _make_adapter(): + adapter = _StubAdapter(PlatformConfig(enabled=True, token="t"), Platform.TELEGRAM) + adapter._send_with_retry = AsyncMock(return_value=None) + return adapter + + +def _make_event(text="hi", chat_id="42"): + return MessageEvent( + text=text, + message_type=MessageType.TEXT, + source=SessionSource(platform=Platform.TELEGRAM, chat_id=chat_id, chat_type="dm"), + ) + + +def _sk(chat_id="42"): + return build_session_key( + SessionSource(platform=Platform.TELEGRAM, chat_id=chat_id, chat_type="dm") + ) + + +def _count_pmb_frames() -> int: + """Walk the current call stack and count nested + ``_process_message_background`` frames. Used to detect recursive + in-band drains.""" + f = sys._getframe() + n = 0 + while f is not None: + if f.f_code.co_name == "_process_message_background": + n += 1 + f = f.f_back + return n + + +@pytest.mark.asyncio +async def test_in_band_drain_does_not_grow_stack(): + """Issue #17758: chained pending-message drains must not recurse. + + Queue a fresh pending message inside each handler invocation so the + in-band drain block fires for every turn in the chain. After N + turns, the recorded stack depth at handler entry must stay bounded. + Pre-fix, depths would be 1, 2, 3, …, N; post-fix, depths are 1 + every time because each drain runs in its own task. + """ + N = 12 + adapter = _make_adapter() + sk = _sk() + + depths: list[int] = [] + next_index = [1] + + async def handler(event): + depths.append(_count_pmb_frames()) + if next_index[0] < N: + adapter._pending_messages[sk] = _make_event(text=f"M{next_index[0]}") + next_index[0] += 1 + return "ok" + + adapter._message_handler = handler + + await adapter.handle_message(_make_event(text="M0")) + + # Drain the chain. Each turn schedules the next via the in-band + # drain block, so we wait until N handler runs have completed and + # the session has been released. + for _ in range(400): + if len(depths) >= N and sk not in adapter._active_sessions: + break + await asyncio.sleep(0.01) + + await adapter.cancel_background_tasks() + + assert len(depths) == N, ( + f"expected {N} handler runs in the chain, got {len(depths)}: depths={depths!r}" + ) + max_depth = max(depths) + assert max_depth <= 2, ( + f"in-band drain is recursing instead of spawning a fresh task — " + f"stack depth grew with chain length: {depths!r}" + )