From 0225480369f576ddac88152b857098309d60fb69 Mon Sep 17 00:00:00 2001 From: kshitijk4poor <82637225+kshitijk4poor@users.noreply.github.com> Date: Thu, 25 Jun 2026 00:15:41 +0530 Subject: [PATCH] fix(gateway): offload agent cleanup off the event loop in /new reset (#35994) The /new (and /reset) confirmation-button callback runs the slash-confirm handler on the asyncio event loop (see _request_slash_confirm). That handler calls _handle_reset_command, which invoked the SYNCHRONOUS, potentially long-blocking _cleanup_agent_resources inline: agent.close() tears down terminal sandboxes, browser daemons and background processes (subprocess waits), and shutdown_memory_provider() can make a network call. A slow teardown wedged the entire event loop, so the bot went silent and stopped processing all messages until a manual restart. Offload _cleanup_agent_resources via the existing contextvar-preserving _run_in_executor_with_context helper, bounded by asyncio.wait_for with a named _RESET_CLEANUP_TIMEOUT_S (30s). The loop is never blocked; on timeout the reset proceeds and the worker thread is left to finish on its own (it cannot be cancelled). The text /new path is unaffected (already off-loop). Tests (tests/gateway/test_35994_reset_button_deadlock.py): the loop keeps ticking while close() blocks in its worker thread; a cleanup that raises is swallowed (warning logged) and the reset still rotates the session; a cleanup that times out degrades gracefully. All three are mutation-verified to fail without their respective production branch. --- gateway/slash_commands.py | 39 +++- .../test_35994_reset_button_deadlock.py | 200 ++++++++++++++++++ 2 files changed, 238 insertions(+), 1 deletion(-) create mode 100644 tests/gateway/test_35994_reset_button_deadlock.py diff --git a/gateway/slash_commands.py b/gateway/slash_commands.py index 9aa5c47060d..71d5214e321 100644 --- a/gateway/slash_commands.py +++ b/gateway/slash_commands.py @@ -44,6 +44,12 @@ from utils import ( logger = logging.getLogger("gateway.run") +# Upper bound on the off-loop agent-resource cleanup during a /new or /reset +# (see _handle_reset_command). A stuck teardown must not block the event loop; +# past this the reset proceeds and the cleanup is left to finish (or leak) in +# its worker thread. (#35994) +_RESET_CLEANUP_TIMEOUT_S = 30.0 + def _model_switch_skew_guard() -> Optional[str]: """Refuse a model switch when the gateway is running stale code. @@ -111,13 +117,44 @@ class GatewaySlashCommandsMixin: # Close tool resources on the old agent (terminal sandboxes, browser # daemons, background processes) before evicting from cache. # Guard with getattr because test fixtures may skip __init__. + # + # _cleanup_agent_resources is synchronous and can block for a long time + # (agent.close() does subprocess teardown; shutdown_memory_provider() + # may do network IO). This handler runs ON the event loop when a + # Telegram/Discord/Slack confirm-button click resolves the slash-confirm + # (see _request_slash_confirm), so an inline call wedges the whole loop + # and the bot goes silent until restart (#35994). Offload it to a worker + # thread (via the contextvar-preserving executor helper) with a bounded + # timeout so the loop is never blocked. _cache_lock = getattr(self, "_agent_cache_lock", None) if _cache_lock is not None: with _cache_lock: _cached = self._agent_cache.get(session_key) _old_agent = _cached[0] if isinstance(_cached, tuple) else _cached if _cached else None if _old_agent is not None: - self._cleanup_agent_resources(_old_agent) + try: + await asyncio.wait_for( + self._run_in_executor_with_context( + self._cleanup_agent_resources, _old_agent + ), + timeout=_RESET_CLEANUP_TIMEOUT_S, + ) + except asyncio.TimeoutError: + # wait_for cancels the await, but the worker thread cannot be + # cancelled — a wedged teardown keeps running (or leaks) for + # the gateway's lifetime. The reset proceeds regardless. + logger.warning( + "Agent resource cleanup for session %s exceeded %ss during " + "/new reset; proceeding with reset (the worker thread is left " + "to finish on its own). (#35994)", + session_key, _RESET_CLEANUP_TIMEOUT_S, + ) + except Exception as cleanup_exc: + logger.warning( + "Agent resource cleanup for session %s failed during /new " + "reset: %s (#35994)", + session_key, cleanup_exc, + ) self._evict_cached_agent(session_key) # Discard any /queue overflow for this session — /new is a diff --git a/tests/gateway/test_35994_reset_button_deadlock.py b/tests/gateway/test_35994_reset_button_deadlock.py new file mode 100644 index 00000000000..2c22b4c3c0d --- /dev/null +++ b/tests/gateway/test_35994_reset_button_deadlock.py @@ -0,0 +1,200 @@ +"""Regression test for #35994: Telegram /new confirm-button deadlock. + +The /new confirmation button callback runs the slash-confirm handler on the +asyncio event loop (see GatewayRunner._request_slash_confirm). That handler +calls _handle_reset_command, which used to invoke the SYNCHRONOUS, potentially +long-blocking _cleanup_agent_resources (agent.close() tears down terminal +sandboxes / browser daemons / background processes; shutdown_memory_provider() +may make a network call) inline on the loop. A slow teardown wedged the entire +event loop, so the bot went silent until a manual restart. + +The fix offloads _cleanup_agent_resources to a worker thread with a bounded +timeout, so the loop is never blocked and a stuck teardown degrades gracefully. +""" +import asyncio +import logging +import threading +import time +from datetime import datetime +from types import SimpleNamespace +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from gateway.config import GatewayConfig, Platform, PlatformConfig +from gateway.platforms.base import MessageEvent +from gateway.session import SessionEntry, SessionSource, build_session_key + + +def _make_source() -> SessionSource: + return SessionSource( + platform=Platform.TELEGRAM, + user_id="u1", + chat_id="c1", + user_name="tester", + chat_type="dm", + ) + + +def _make_event(text: str) -> MessageEvent: + return MessageEvent(text=text, source=_make_source(), message_id="m1") + + +def _make_runner_with_cached_agent(close_fn): + """Build a bare GatewayRunner with a cached agent whose close() runs + ``close_fn`` (used to simulate slow / blocking teardown).""" + from gateway.run import GatewayRunner + + runner = object.__new__(GatewayRunner) + runner.config = GatewayConfig( + platforms={Platform.TELEGRAM: PlatformConfig(enabled=True, token="***")} + ) + adapter = MagicMock() + adapter.send = AsyncMock() + runner.adapters = {Platform.TELEGRAM: adapter} + runner._voice_mode = {} + runner.hooks = SimpleNamespace(emit=AsyncMock(), loaded_hooks=False) + runner._session_model_overrides = {} + runner._pending_model_notes = {} + runner._background_tasks = set() + + session_key = build_session_key(_make_source()) + session_entry = SessionEntry( + session_key=session_key, session_id="sess-old", + created_at=datetime.now(), updated_at=datetime.now(), + platform=Platform.TELEGRAM, chat_type="dm", + ) + new_entry = SessionEntry( + session_key=session_key, session_id="sess-new", + created_at=datetime.now(), updated_at=datetime.now(), + platform=Platform.TELEGRAM, chat_type="dm", + ) + runner.session_store = MagicMock() + runner.session_store.reset_session.return_value = new_entry + runner.session_store._entries = {session_key: session_entry} + runner.session_store._generate_session_key.return_value = session_key + runner._running_agents = {} + runner._pending_messages = {} + runner._pending_approvals = {} + runner._session_db = None + runner._is_user_authorized = lambda _source: True + runner._format_session_info = lambda: "" + + # Enable the cache-lock path (this is what the button callback exercises) + runner._agent_cache_lock = threading.RLock() + agent = MagicMock() + agent.close = close_fn + agent.shutdown_memory_provider = MagicMock() + runner._agent_cache = {session_key: agent} + return runner + + +@pytest.mark.asyncio +async def test_reset_does_not_block_event_loop_during_cleanup(): + """#35994: a slow agent.close() must NOT block the event loop. A + concurrent loop task must keep ticking WHILE close() is still blocking + (proving cleanup was offloaded to a worker thread, not run inline on + the loop). With the pre-fix inline call, the loop is frozen for the + whole duration of close() and no ticks accumulate until it returns.""" + close_started = threading.Event() + release = threading.Event() + + def slow_close(): + close_started.set() + # Block the WORKER thread (not the loop) until released. + release.wait(timeout=5) + + runner = _make_runner_with_cached_agent(slow_close) + + ticks = {"n": 0} + stop = threading.Event() + + async def _heartbeat(): + while not stop.is_set(): + ticks["n"] += 1 + await asyncio.sleep(0.005) + + hb = asyncio.create_task(_heartbeat()) + reset_task = asyncio.create_task( + runner._handle_reset_command(_make_event("/new")) + ) + + # Wait until close() has actually started blocking in its worker thread. + for _ in range(200): + if close_started.is_set(): + break + await asyncio.sleep(0.005) + assert close_started.is_set(), "close() never ran" + + # Now sample ticks while close() is STILL blocking. If the loop were + # frozen (pre-fix inline call), this stays ~0. + ticks_at_block = ticks["n"] + await asyncio.sleep(0.1) + ticks_during_block = ticks["n"] - ticks_at_block + + release.set() + await reset_task + stop.set() + await hb + + assert ticks_during_block >= 5, ( + f"event loop was blocked during agent cleanup (#35994): only " + f"{ticks_during_block} ticks while close() was running" + ) + runner.session_store.reset_session.assert_called_once() + + +@pytest.mark.asyncio +async def test_reset_completes_when_cleanup_raises(caplog): + """#35994: if the offloaded cleanup itself raises, the handler swallows it + (logs a warning) and still rotates the session — it must not abort /new. + + Note: _cleanup_agent_resources swallows its own internal errors, so to + exercise the handler's `except Exception` branch we make the cleanup call + itself raise (patched on the instance), then assert the warning fired — + proving the branch executed rather than the success path. + """ + runner = _make_runner_with_cached_agent(lambda: None) + + def boom_cleanup(_agent): + raise RuntimeError("cleanup blew up") + + runner._cleanup_agent_resources = boom_cleanup + + with caplog.at_level(logging.WARNING, logger="gateway.run"): + result = await asyncio.wait_for( + runner._handle_reset_command(_make_event("/new")), timeout=3 + ) + + assert any( + "failed during /new reset" in r.message and "#35994" in r.message + for r in caplog.records + ), "expected the cleanup-failure warning to be logged (except branch not hit)" + runner.session_store.reset_session.assert_called_once() + assert result is not None + + +@pytest.mark.asyncio +async def test_reset_completes_when_cleanup_times_out(caplog): + """#35994: if cleanup exceeds the bounded timeout, the reset still completes + (graceful degradation) and the timeout warning fires.""" + import gateway.slash_commands as _sc + + # Force the wait_for to time out immediately, closing the offloaded awaitable + # so no worker thread dangles past the test. + async def _instant_timeout(aw, timeout=None): + if asyncio.iscoroutine(aw): + aw.close() + raise asyncio.TimeoutError + + runner = _make_runner_with_cached_agent(lambda: None) + + with caplog.at_level(logging.WARNING, logger="gateway.run"): + with patch.object(_sc.asyncio, "wait_for", _instant_timeout): + result = await runner._handle_reset_command(_make_event("/new")) + + assert any( + "exceeded" in r.message and "#35994" in r.message for r in caplog.records + ), "expected the timeout warning to be logged" + runner.session_store.reset_session.assert_called_once() + assert result is not None