From ea5aaa7a22e0d5d036da0130a659d2e4126f7d41 Mon Sep 17 00:00:00 2001 From: teknium1 <127238744+teknium1@users.noreply.github.com> Date: Sun, 28 Jun 2026 02:22:13 -0700 Subject: [PATCH] fix(gateway): offload remaining inline agent cleanup off the event loop (#53175) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit #35994 moved /new reset cleanup off the loop, but _cleanup_agent_resources (agent.close() subprocess teardown; shutdown_memory_provider() plugin IO) was still called INLINE on the event loop from three other sites: - _session_expiry_watcher (5-min idle sweep) — live loop - _handle_message_with_agent cache-hygiene re-eviction — live loop - _finalize_shutdown_agents / stop() idle-cache loop — shutdown A wedged memory provider on any of these froze the loop: bot goes silent, runtime-status updated_at heartbeat stops advancing, and SIGTERM can't be serviced (requires kill -9) — exactly the #53175 zombie pattern. Adds _cleanup_agent_resources_off_loop: a bounded (30s) worker-thread offload mirroring the #35994 reset fix, and routes all four sites through it. --- gateway/run.py | 71 ++++++- ...3121_shutdown_inflight_transcript_flush.py | 29 ++- tests/gateway/test_53175_cleanup_off_loop.py | 174 ++++++++++++++++++ tests/gateway/test_shutdown_cache_cleanup.py | 12 +- 4 files changed, 273 insertions(+), 13 deletions(-) create mode 100644 tests/gateway/test_53175_cleanup_off_loop.py diff --git a/gateway/run.py b/gateway/run.py index 0289afb3c4a..7b33f77161e 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -5162,7 +5162,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew e, ) - def _finalize_shutdown_agents(self, active_agents: Dict[str, Any]) -> None: + async def _finalize_shutdown_agents(self, active_agents: Dict[str, Any]) -> None: for agent in active_agents.values(): # Persist any in-flight transcript to the SQLite session store # before teardown (#13121). An agent forcibly interrupted by the @@ -5208,7 +5208,11 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew ) except Exception: pass - self._cleanup_agent_resources(agent) + # Off-loop + bounded: a wedged memory provider here used to hang + # the whole shutdown so SIGTERM never completed (#53175). + await self._cleanup_agent_resources_off_loop( + agent, context="shutdown finalize" + ) def _should_emit_long_running_notification( self, @@ -5231,6 +5235,52 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew return False return True + # Upper bound on off-loop agent-resource cleanup invoked from coroutines + # running on the gateway's event loop (session-expiry sweep, in-turn + # cache-hygiene re-eviction). _cleanup_agent_resources is synchronous and + # can block for a long time (agent.close() does subprocess teardown; + # shutdown_memory_provider() may do network/SQLite IO via a memory plugin). + # Calling it inline wedges the whole loop — the bot goes silent, the + # runtime-status updated_at heartbeat freezes, and SIGTERM cannot be + # serviced (#53175). Offload to a worker thread under this timeout so the + # loop is never blocked; mirrors the /new reset path's fix (#35994). + _CLEANUP_TIMEOUT_S = 30.0 + + async def _cleanup_agent_resources_off_loop( + self, agent: Any, *, context: str = "" + ) -> None: + """Run _cleanup_agent_resources in a worker thread with a bounded wait. + + Safe to await from coroutines on the gateway event loop: a slow or + wedged teardown (memory provider IO, subprocess close) can no longer + block message processing. On timeout the await is cancelled and the + worker thread is left to finish (or leak) on its own — the caller + proceeds regardless, exactly as the /new reset path does (#35994). + """ + if agent is None: + return + try: + await asyncio.wait_for( + self._run_in_executor_with_context( + self._cleanup_agent_resources, agent + ), + timeout=self._CLEANUP_TIMEOUT_S, + ) + except asyncio.TimeoutError: + logger.warning( + "Agent resource cleanup%s exceeded %ss; proceeding without " + "blocking the event loop (the worker thread is left to finish " + "on its own). (#53175)", + f" ({context})" if context else "", + self._CLEANUP_TIMEOUT_S, + ) + except Exception as cleanup_exc: + logger.warning( + "Agent resource cleanup%s failed: %s (#53175)", + f" ({context})" if context else "", + cleanup_exc, + ) + def _cleanup_agent_resources(self, agent: Any) -> None: """Best-effort cleanup for temporary or cached agent instances.""" if agent is None: @@ -6743,7 +6793,9 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew if _cached_agent is None: _cached_agent = self._running_agents.get(key) if _cached_agent and _cached_agent is not _AGENT_PENDING_SENTINEL: - self._cleanup_agent_resources(_cached_agent) + await self._cleanup_agent_resources_off_loop( + _cached_agent, context="session expiry" + ) # Drop the cache entry so the AIAgent (and its LLM # clients, tool schemas, memory provider refs) can # be garbage-collected. Otherwise the cache grows @@ -7250,7 +7302,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew except Exception as e: logger.error("Failed to launch detached gateway restart: %s", e) - self._finalize_shutdown_agents(active_agents) + await self._finalize_shutdown_agents(active_agents) # Also shut down memory providers on idle cached agents. # _finalize_shutdown_agents only handles agents that were @@ -7267,7 +7319,12 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew _agent = ( _entry[0] if isinstance(_entry, tuple) else _entry ) - self._cleanup_agent_resources(_agent) + # Bounded + off-loop so a wedged memory provider on one + # idle agent can't hang shutdown indefinitely — that path + # is why SIGTERM failed to kill the process (#53175). + await self._cleanup_agent_resources_off_loop( + _agent, context="shutdown idle-cache" + ) for platform, adapter in list(self.adapters.items()): await self._bounded_adapter_teardown(adapter, platform) @@ -10089,7 +10146,9 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew # rebuilds its system prompt from current # SOUL.md, memory, and skills. self._evict_cached_agent(session_key) - self._cleanup_agent_resources(_hyg_agent) + await self._cleanup_agent_resources_off_loop( + _hyg_agent, context="session hygiene" + ) except Exception as e: logger.warning( diff --git a/tests/gateway/test_13121_shutdown_inflight_transcript_flush.py b/tests/gateway/test_13121_shutdown_inflight_transcript_flush.py index d726ea34352..ca6d2839a1b 100644 --- a/tests/gateway/test_13121_shutdown_inflight_transcript_flush.py +++ b/tests/gateway/test_13121_shutdown_inflight_transcript_flush.py @@ -32,6 +32,7 @@ real ``SessionStore.load_transcript``). from __future__ import annotations +import asyncio import sys import types from unittest.mock import MagicMock @@ -51,9 +52,25 @@ def _make_runner(): from gateway.run import GatewayRunner runner = object.__new__(GatewayRunner) + # _finalize_shutdown_agents offloads _cleanup_agent_resources to a worker + # thread via _run_in_executor_with_context (#53175). Stub it to run inline + # so the bounded-cleanup path is exercised deterministically in tests. + async def _inline_executor(func, *args): + return func(*args) + + runner._run_in_executor_with_context = _inline_executor return runner +def _finalize(runner, agents): + """Drive the now-async _finalize_shutdown_agents from sync test bodies.""" + if not hasattr(runner, "_run_in_executor_with_context"): + async def _inline_executor(func, *args): + return func(*args) + runner._run_in_executor_with_context = _inline_executor + asyncio.run(runner._finalize_shutdown_agents(agents)) + + # ───────────────────────────────────────────────────────────────────────── # Unit: _finalize_shutdown_agents calls the flush hook with the in-flight # transcript before teardown. @@ -84,7 +101,7 @@ class TestFinalizeShutdownFlushesInflightTranscript: ] agent = _FakeAgent(session_messages=inflight) - runner._finalize_shutdown_agents({"agent:main:discord:dm:42": agent}) + _finalize(runner, {"agent:main:discord:dm:42": agent}) agent._flush_messages_to_session_db.assert_called_once_with(inflight) # Cleanup still happens after the flush. @@ -96,7 +113,7 @@ class TestFinalizeShutdownFlushesInflightTranscript: runner = _make_runner() agent = _FakeAgent(session_messages=[]) - runner._finalize_shutdown_agents({"k": agent}) + _finalize(runner, {"k": agent}) agent._flush_messages_to_session_db.assert_not_called() agent.close.assert_called_once() @@ -108,7 +125,7 @@ class TestFinalizeShutdownFlushesInflightTranscript: agent = _FakeAgent(session_messages=[{"role": "user", "content": "x"}], has_flush=False) - runner._finalize_shutdown_agents({"k": agent}) + _finalize(runner, {"k": agent}) agent.close.assert_called_once() @@ -119,7 +136,7 @@ class TestFinalizeShutdownFlushesInflightTranscript: agent = _FakeAgent(session_messages=[{"role": "user", "content": "x"}]) agent._flush_messages_to_session_db.side_effect = RuntimeError("db locked") - runner._finalize_shutdown_agents({"k": agent}) + _finalize(runner, {"k": agent}) agent.close.assert_called_once() @@ -186,7 +203,7 @@ class TestShutdownTranscriptSurvivesResumeE2E: # Drive the gateway shutdown finalization with this real agent. from gateway.run import GatewayRunner runner = object.__new__(GatewayRunner) - runner._finalize_shutdown_agents({"agent:main:discord:dm:7": agent}) + _finalize(runner, {"agent:main:discord:dm:7": agent}) # The in-flight turn must now be durable and readable via the SAME # path the resume logic uses (SessionStore.load_transcript → DB). @@ -237,7 +254,7 @@ class TestShutdownTranscriptSurvivesResumeE2E: # Shutdown re-flush of the SAME list identity must add nothing. from gateway.run import GatewayRunner runner = object.__new__(GatewayRunner) - runner._finalize_shutdown_agents({"k": agent}) + _finalize(runner, {"k": agent}) after = db.get_messages_as_conversation(session_id) assert len(after) == 2, after diff --git a/tests/gateway/test_53175_cleanup_off_loop.py b/tests/gateway/test_53175_cleanup_off_loop.py new file mode 100644 index 00000000000..ca7955905d0 --- /dev/null +++ b/tests/gateway/test_53175_cleanup_off_loop.py @@ -0,0 +1,174 @@ +"""Regression test for #53175: gateway event loop wedged by synchronous +agent-resource cleanup run inline from loop coroutines. + +#35994 fixed the /new reset path, but the same synchronous +``_cleanup_agent_resources`` (agent.close() tears down terminal sandboxes / +browser daemons / background processes; shutdown_memory_provider() may do +SQLite / network IO via a memory plugin) was still called INLINE on the event +loop from three other places: + + * ``_session_expiry_watcher`` (the 5-minute idle sweep) — live loop + * ``_handle_message_with_agent`` cache-hygiene re-eviction — live loop + * ``_finalize_shutdown_agents`` / ``stop()`` idle-cache loop — shutdown + +A wedged provider on any of these froze the whole loop: the bot went silent, +the runtime-status ``updated_at`` heartbeat stopped advancing (the symptom the +reporter's watchdog keyed on), and SIGTERM could not be serviced (requiring +``kill -9``). + +The fix routes all four call sites through ``_cleanup_agent_resources_off_loop`` +which offloads to a worker thread under a bounded ``asyncio.wait_for``, so the +loop is never blocked and a stuck teardown degrades gracefully. + +These tests drive that shared helper directly — it is the single chokepoint +every fixed call site now uses. +""" +import asyncio +import logging +import threading +from contextvars import copy_context +from types import SimpleNamespace + +import pytest + + +def _make_runner(): + """Bare GatewayRunner with a real thread-pool-backed executor helper.""" + from gateway.run import GatewayRunner + + runner = object.__new__(GatewayRunner) + from concurrent.futures import ThreadPoolExecutor + + executor = ThreadPoolExecutor(max_workers=2) + runner._get_executor = lambda: executor + + async def _run_in_executor_with_context(func, *args): + loop = asyncio.get_running_loop() + ctx = copy_context() + return await loop.run_in_executor(executor, lambda: ctx.run(func, *args)) + + runner._run_in_executor_with_context = _run_in_executor_with_context + return runner, executor + + +def _agent_with_close(close_fn): + return SimpleNamespace( + close=close_fn, + shutdown_memory_provider=lambda *a, **k: None, + _session_messages=None, + ) + + +@pytest.mark.asyncio +async def test_cleanup_off_loop_does_not_block_event_loop(): + """A slow agent.close() must NOT freeze the loop. A concurrent heartbeat + keeps ticking WHILE close() blocks in its worker thread — proving the + cleanup was offloaded, not run inline (which would freeze the loop and + stall the runtime-status updated_at heartbeat, #53175).""" + runner, executor = _make_runner() + close_started = threading.Event() + release = threading.Event() + + def slow_close(): + close_started.set() + release.wait(timeout=5) # block the WORKER thread, not the loop + + agent = _agent_with_close(slow_close) + + ticks = {"n": 0} + stop = threading.Event() + + async def _heartbeat(): + while not stop.is_set(): + ticks["n"] += 1 + await asyncio.sleep(0.005) + + hb = asyncio.create_task(_heartbeat()) + cleanup_task = asyncio.create_task( + runner._cleanup_agent_resources_off_loop(agent, context="test") + ) + + for _ in range(200): + if close_started.is_set(): + break + await asyncio.sleep(0.005) + assert close_started.is_set(), "close() never ran" + + ticks_at_block = ticks["n"] + await asyncio.sleep(0.1) + ticks_during_block = ticks["n"] - ticks_at_block + + release.set() + await cleanup_task + stop.set() + await hb + executor.shutdown(wait=False) + + assert ticks_during_block >= 5, ( + f"event loop was blocked during agent cleanup (#53175): only " + f"{ticks_during_block} ticks while close() was running" + ) + + +@pytest.mark.asyncio +async def test_cleanup_off_loop_times_out_gracefully(caplog): + """A cleanup that exceeds the bounded timeout logs a warning and returns — + the caller (sweep / shutdown / hygiene) proceeds rather than hanging.""" + runner, executor = _make_runner() + + async def _instant_timeout(aw, timeout=None): + if asyncio.iscoroutine(aw): + aw.close() + raise asyncio.TimeoutError + + import gateway.run as _run + + agent = _agent_with_close(lambda: None) + with caplog.at_level(logging.WARNING, logger="gateway.run"): + # Patch the wait_for the helper uses so we don't actually wait 30s. + orig = _run.asyncio.wait_for + _run.asyncio.wait_for = _instant_timeout + try: + await runner._cleanup_agent_resources_off_loop(agent, context="sweep") + finally: + _run.asyncio.wait_for = orig + executor.shutdown(wait=False) + + assert any( + "exceeded" in r.message and "#53175" in r.message for r in caplog.records + ), "expected the timeout warning to be logged" + + +@pytest.mark.asyncio +async def test_cleanup_off_loop_swallows_executor_failure(caplog): + """If the offloaded cleanup raises, the helper logs and returns — a + teardown failure must never abort the loop coroutine that triggered it.""" + runner, executor = _make_runner() + + def boom(): + raise RuntimeError("provider shutdown blew up") + + # _cleanup_agent_resources swallows its own internal errors, so to reach + # the helper's except branch make the offloaded call itself raise. + def _boom_cleanup(agent): + raise RuntimeError("boom") + + runner._cleanup_agent_resources = _boom_cleanup + + with caplog.at_level(logging.WARNING, logger="gateway.run"): + await runner._cleanup_agent_resources_off_loop( + _agent_with_close(boom), context="shutdown finalize" + ) + executor.shutdown(wait=False) + + assert any( + "failed" in r.message and "#53175" in r.message for r in caplog.records + ), "expected the cleanup-failure warning to be logged" + + +@pytest.mark.asyncio +async def test_cleanup_off_loop_none_agent_is_noop(): + """A None agent (None cache entry) is a no-op and never touches the loop.""" + runner, executor = _make_runner() + await runner._cleanup_agent_resources_off_loop(None) + executor.shutdown(wait=False) diff --git a/tests/gateway/test_shutdown_cache_cleanup.py b/tests/gateway/test_shutdown_cache_cleanup.py index 156e47b62b5..1b122a0d105 100644 --- a/tests/gateway/test_shutdown_cache_cleanup.py +++ b/tests/gateway/test_shutdown_cache_cleanup.py @@ -57,13 +57,23 @@ class _FakeGateway: def _update_runtime_status(self, *_a, **_kw): pass + async def _run_in_executor_with_context(self, func, *args): + # stop() offloads agent-resource cleanup off the loop (#53175); run + # inline in tests so the bounded-cleanup path is exercised. + return func(*args) + + async def _cleanup_agent_resources_off_loop(self, agent, *, context=""): + # Mirror the real bounded helper, inline (no executor/timeout) so the + # fake exercises the same call shape stop() now uses. + self._cleanup_agent_resources(agent) + async def _notify_active_sessions_of_shutdown(self): pass async def _drain_active_agents(self, timeout): return {}, False - def _finalize_shutdown_agents(self, agents): + async def _finalize_shutdown_agents(self, agents): for agent in agents.values(): self._cleanup_agent_resources(agent)