fix(gateway): offload remaining inline agent cleanup off the event loop (#53175)

#35994 moved /new reset cleanup off the loop, but _cleanup_agent_resources
(agent.close() subprocess teardown; shutdown_memory_provider() plugin IO) was
still called INLINE on the event loop from three other sites:

  - _session_expiry_watcher (5-min idle sweep) — live loop
  - _handle_message_with_agent cache-hygiene re-eviction — live loop
  - _finalize_shutdown_agents / stop() idle-cache loop — shutdown

A wedged memory provider on any of these froze the loop: bot goes silent,
runtime-status updated_at heartbeat stops advancing, and SIGTERM can't be
serviced (requires kill -9) — exactly the #53175 zombie pattern.

Adds _cleanup_agent_resources_off_loop: a bounded (30s) worker-thread offload
mirroring the #35994 reset fix, and routes all four sites through it.
This commit is contained in:
teknium1 2026-06-28 02:22:13 -07:00 committed by Teknium
parent aa50c1ba5d
commit ea5aaa7a22
4 changed files with 273 additions and 13 deletions

View file

@ -5162,7 +5162,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
e,
)
def _finalize_shutdown_agents(self, active_agents: Dict[str, Any]) -> None:
async def _finalize_shutdown_agents(self, active_agents: Dict[str, Any]) -> None:
for agent in active_agents.values():
# Persist any in-flight transcript to the SQLite session store
# before teardown (#13121). An agent forcibly interrupted by the
@ -5208,7 +5208,11 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
)
except Exception:
pass
self._cleanup_agent_resources(agent)
# Off-loop + bounded: a wedged memory provider here used to hang
# the whole shutdown so SIGTERM never completed (#53175).
await self._cleanup_agent_resources_off_loop(
agent, context="shutdown finalize"
)
def _should_emit_long_running_notification(
self,
@ -5231,6 +5235,52 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
return False
return True
# Upper bound on off-loop agent-resource cleanup invoked from coroutines
# running on the gateway's event loop (session-expiry sweep, in-turn
# cache-hygiene re-eviction). _cleanup_agent_resources is synchronous and
# can block for a long time (agent.close() does subprocess teardown;
# shutdown_memory_provider() may do network/SQLite IO via a memory plugin).
# Calling it inline wedges the whole loop — the bot goes silent, the
# runtime-status updated_at heartbeat freezes, and SIGTERM cannot be
# serviced (#53175). Offload to a worker thread under this timeout so the
# loop is never blocked; mirrors the /new reset path's fix (#35994).
_CLEANUP_TIMEOUT_S = 30.0
async def _cleanup_agent_resources_off_loop(
self, agent: Any, *, context: str = ""
) -> None:
"""Run _cleanup_agent_resources in a worker thread with a bounded wait.
Safe to await from coroutines on the gateway event loop: a slow or
wedged teardown (memory provider IO, subprocess close) can no longer
block message processing. On timeout the await is cancelled and the
worker thread is left to finish (or leak) on its own the caller
proceeds regardless, exactly as the /new reset path does (#35994).
"""
if agent is None:
return
try:
await asyncio.wait_for(
self._run_in_executor_with_context(
self._cleanup_agent_resources, agent
),
timeout=self._CLEANUP_TIMEOUT_S,
)
except asyncio.TimeoutError:
logger.warning(
"Agent resource cleanup%s exceeded %ss; proceeding without "
"blocking the event loop (the worker thread is left to finish "
"on its own). (#53175)",
f" ({context})" if context else "",
self._CLEANUP_TIMEOUT_S,
)
except Exception as cleanup_exc:
logger.warning(
"Agent resource cleanup%s failed: %s (#53175)",
f" ({context})" if context else "",
cleanup_exc,
)
def _cleanup_agent_resources(self, agent: Any) -> None:
"""Best-effort cleanup for temporary or cached agent instances."""
if agent is None:
@ -6743,7 +6793,9 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
if _cached_agent is None:
_cached_agent = self._running_agents.get(key)
if _cached_agent and _cached_agent is not _AGENT_PENDING_SENTINEL:
self._cleanup_agent_resources(_cached_agent)
await self._cleanup_agent_resources_off_loop(
_cached_agent, context="session expiry"
)
# Drop the cache entry so the AIAgent (and its LLM
# clients, tool schemas, memory provider refs) can
# be garbage-collected. Otherwise the cache grows
@ -7250,7 +7302,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
except Exception as e:
logger.error("Failed to launch detached gateway restart: %s", e)
self._finalize_shutdown_agents(active_agents)
await self._finalize_shutdown_agents(active_agents)
# Also shut down memory providers on idle cached agents.
# _finalize_shutdown_agents only handles agents that were
@ -7267,7 +7319,12 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
_agent = (
_entry[0] if isinstance(_entry, tuple) else _entry
)
self._cleanup_agent_resources(_agent)
# Bounded + off-loop so a wedged memory provider on one
# idle agent can't hang shutdown indefinitely — that path
# is why SIGTERM failed to kill the process (#53175).
await self._cleanup_agent_resources_off_loop(
_agent, context="shutdown idle-cache"
)
for platform, adapter in list(self.adapters.items()):
await self._bounded_adapter_teardown(adapter, platform)
@ -10089,7 +10146,9 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
# rebuilds its system prompt from current
# SOUL.md, memory, and skills.
self._evict_cached_agent(session_key)
self._cleanup_agent_resources(_hyg_agent)
await self._cleanup_agent_resources_off_loop(
_hyg_agent, context="session hygiene"
)
except Exception as e:
logger.warning(

View file

@ -32,6 +32,7 @@ real ``SessionStore.load_transcript``).
from __future__ import annotations
import asyncio
import sys
import types
from unittest.mock import MagicMock
@ -51,9 +52,25 @@ def _make_runner():
from gateway.run import GatewayRunner
runner = object.__new__(GatewayRunner)
# _finalize_shutdown_agents offloads _cleanup_agent_resources to a worker
# thread via _run_in_executor_with_context (#53175). Stub it to run inline
# so the bounded-cleanup path is exercised deterministically in tests.
async def _inline_executor(func, *args):
return func(*args)
runner._run_in_executor_with_context = _inline_executor
return runner
def _finalize(runner, agents):
"""Drive the now-async _finalize_shutdown_agents from sync test bodies."""
if not hasattr(runner, "_run_in_executor_with_context"):
async def _inline_executor(func, *args):
return func(*args)
runner._run_in_executor_with_context = _inline_executor
asyncio.run(runner._finalize_shutdown_agents(agents))
# ─────────────────────────────────────────────────────────────────────────
# Unit: _finalize_shutdown_agents calls the flush hook with the in-flight
# transcript before teardown.
@ -84,7 +101,7 @@ class TestFinalizeShutdownFlushesInflightTranscript:
]
agent = _FakeAgent(session_messages=inflight)
runner._finalize_shutdown_agents({"agent:main:discord:dm:42": agent})
_finalize(runner, {"agent:main:discord:dm:42": agent})
agent._flush_messages_to_session_db.assert_called_once_with(inflight)
# Cleanup still happens after the flush.
@ -96,7 +113,7 @@ class TestFinalizeShutdownFlushesInflightTranscript:
runner = _make_runner()
agent = _FakeAgent(session_messages=[])
runner._finalize_shutdown_agents({"k": agent})
_finalize(runner, {"k": agent})
agent._flush_messages_to_session_db.assert_not_called()
agent.close.assert_called_once()
@ -108,7 +125,7 @@ class TestFinalizeShutdownFlushesInflightTranscript:
agent = _FakeAgent(session_messages=[{"role": "user", "content": "x"}],
has_flush=False)
runner._finalize_shutdown_agents({"k": agent})
_finalize(runner, {"k": agent})
agent.close.assert_called_once()
@ -119,7 +136,7 @@ class TestFinalizeShutdownFlushesInflightTranscript:
agent = _FakeAgent(session_messages=[{"role": "user", "content": "x"}])
agent._flush_messages_to_session_db.side_effect = RuntimeError("db locked")
runner._finalize_shutdown_agents({"k": agent})
_finalize(runner, {"k": agent})
agent.close.assert_called_once()
@ -186,7 +203,7 @@ class TestShutdownTranscriptSurvivesResumeE2E:
# Drive the gateway shutdown finalization with this real agent.
from gateway.run import GatewayRunner
runner = object.__new__(GatewayRunner)
runner._finalize_shutdown_agents({"agent:main:discord:dm:7": agent})
_finalize(runner, {"agent:main:discord:dm:7": agent})
# The in-flight turn must now be durable and readable via the SAME
# path the resume logic uses (SessionStore.load_transcript → DB).
@ -237,7 +254,7 @@ class TestShutdownTranscriptSurvivesResumeE2E:
# Shutdown re-flush of the SAME list identity must add nothing.
from gateway.run import GatewayRunner
runner = object.__new__(GatewayRunner)
runner._finalize_shutdown_agents({"k": agent})
_finalize(runner, {"k": agent})
after = db.get_messages_as_conversation(session_id)
assert len(after) == 2, after

View file

@ -0,0 +1,174 @@
"""Regression test for #53175: gateway event loop wedged by synchronous
agent-resource cleanup run inline from loop coroutines.
#35994 fixed the /new reset path, but the same synchronous
``_cleanup_agent_resources`` (agent.close() tears down terminal sandboxes /
browser daemons / background processes; shutdown_memory_provider() may do
SQLite / network IO via a memory plugin) was still called INLINE on the event
loop from three other places:
* ``_session_expiry_watcher`` (the 5-minute idle sweep) live loop
* ``_handle_message_with_agent`` cache-hygiene re-eviction live loop
* ``_finalize_shutdown_agents`` / ``stop()`` idle-cache loop shutdown
A wedged provider on any of these froze the whole loop: the bot went silent,
the runtime-status ``updated_at`` heartbeat stopped advancing (the symptom the
reporter's watchdog keyed on), and SIGTERM could not be serviced (requiring
``kill -9``).
The fix routes all four call sites through ``_cleanup_agent_resources_off_loop``
which offloads to a worker thread under a bounded ``asyncio.wait_for``, so the
loop is never blocked and a stuck teardown degrades gracefully.
These tests drive that shared helper directly it is the single chokepoint
every fixed call site now uses.
"""
import asyncio
import logging
import threading
from contextvars import copy_context
from types import SimpleNamespace
import pytest
def _make_runner():
"""Bare GatewayRunner with a real thread-pool-backed executor helper."""
from gateway.run import GatewayRunner
runner = object.__new__(GatewayRunner)
from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor(max_workers=2)
runner._get_executor = lambda: executor
async def _run_in_executor_with_context(func, *args):
loop = asyncio.get_running_loop()
ctx = copy_context()
return await loop.run_in_executor(executor, lambda: ctx.run(func, *args))
runner._run_in_executor_with_context = _run_in_executor_with_context
return runner, executor
def _agent_with_close(close_fn):
return SimpleNamespace(
close=close_fn,
shutdown_memory_provider=lambda *a, **k: None,
_session_messages=None,
)
@pytest.mark.asyncio
async def test_cleanup_off_loop_does_not_block_event_loop():
"""A slow agent.close() must NOT freeze the loop. A concurrent heartbeat
keeps ticking WHILE close() blocks in its worker thread proving the
cleanup was offloaded, not run inline (which would freeze the loop and
stall the runtime-status updated_at heartbeat, #53175)."""
runner, executor = _make_runner()
close_started = threading.Event()
release = threading.Event()
def slow_close():
close_started.set()
release.wait(timeout=5) # block the WORKER thread, not the loop
agent = _agent_with_close(slow_close)
ticks = {"n": 0}
stop = threading.Event()
async def _heartbeat():
while not stop.is_set():
ticks["n"] += 1
await asyncio.sleep(0.005)
hb = asyncio.create_task(_heartbeat())
cleanup_task = asyncio.create_task(
runner._cleanup_agent_resources_off_loop(agent, context="test")
)
for _ in range(200):
if close_started.is_set():
break
await asyncio.sleep(0.005)
assert close_started.is_set(), "close() never ran"
ticks_at_block = ticks["n"]
await asyncio.sleep(0.1)
ticks_during_block = ticks["n"] - ticks_at_block
release.set()
await cleanup_task
stop.set()
await hb
executor.shutdown(wait=False)
assert ticks_during_block >= 5, (
f"event loop was blocked during agent cleanup (#53175): only "
f"{ticks_during_block} ticks while close() was running"
)
@pytest.mark.asyncio
async def test_cleanup_off_loop_times_out_gracefully(caplog):
"""A cleanup that exceeds the bounded timeout logs a warning and returns —
the caller (sweep / shutdown / hygiene) proceeds rather than hanging."""
runner, executor = _make_runner()
async def _instant_timeout(aw, timeout=None):
if asyncio.iscoroutine(aw):
aw.close()
raise asyncio.TimeoutError
import gateway.run as _run
agent = _agent_with_close(lambda: None)
with caplog.at_level(logging.WARNING, logger="gateway.run"):
# Patch the wait_for the helper uses so we don't actually wait 30s.
orig = _run.asyncio.wait_for
_run.asyncio.wait_for = _instant_timeout
try:
await runner._cleanup_agent_resources_off_loop(agent, context="sweep")
finally:
_run.asyncio.wait_for = orig
executor.shutdown(wait=False)
assert any(
"exceeded" in r.message and "#53175" in r.message for r in caplog.records
), "expected the timeout warning to be logged"
@pytest.mark.asyncio
async def test_cleanup_off_loop_swallows_executor_failure(caplog):
"""If the offloaded cleanup raises, the helper logs and returns — a
teardown failure must never abort the loop coroutine that triggered it."""
runner, executor = _make_runner()
def boom():
raise RuntimeError("provider shutdown blew up")
# _cleanup_agent_resources swallows its own internal errors, so to reach
# the helper's except branch make the offloaded call itself raise.
def _boom_cleanup(agent):
raise RuntimeError("boom")
runner._cleanup_agent_resources = _boom_cleanup
with caplog.at_level(logging.WARNING, logger="gateway.run"):
await runner._cleanup_agent_resources_off_loop(
_agent_with_close(boom), context="shutdown finalize"
)
executor.shutdown(wait=False)
assert any(
"failed" in r.message and "#53175" in r.message for r in caplog.records
), "expected the cleanup-failure warning to be logged"
@pytest.mark.asyncio
async def test_cleanup_off_loop_none_agent_is_noop():
"""A None agent (None cache entry) is a no-op and never touches the loop."""
runner, executor = _make_runner()
await runner._cleanup_agent_resources_off_loop(None)
executor.shutdown(wait=False)

View file

@ -57,13 +57,23 @@ class _FakeGateway:
def _update_runtime_status(self, *_a, **_kw):
pass
async def _run_in_executor_with_context(self, func, *args):
# stop() offloads agent-resource cleanup off the loop (#53175); run
# inline in tests so the bounded-cleanup path is exercised.
return func(*args)
async def _cleanup_agent_resources_off_loop(self, agent, *, context=""):
# Mirror the real bounded helper, inline (no executor/timeout) so the
# fake exercises the same call shape stop() now uses.
self._cleanup_agent_resources(agent)
async def _notify_active_sessions_of_shutdown(self):
pass
async def _drain_active_agents(self, timeout):
return {}, False
def _finalize_shutdown_agents(self, agents):
async def _finalize_shutdown_agents(self, agents):
for agent in agents.values():
self._cleanup_agent_resources(agent)