fix(gateway,feishu): refuse executor resurrection during real shutdown

Add an explicit _closing guard to both owned executors so the
recreate-on-shutdown path only recovers from an *external* teardown of
the loop default — never resurrects a pool the gateway/adapter itself
stopped. _shutdown_*executor() sets the flag; _get_*executor() raises if
closing; feishu connect() re-arms on reconnect. Updates the gateway
recreate test to assert the refusal contract and adds feishu coverage.
This commit is contained in:
teknium1 2026-06-27 03:59:17 -07:00 committed by Teknium
parent b296915c82
commit 7ee0b68973
4 changed files with 66 additions and 13 deletions

View file

@ -2644,7 +2644,9 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
self._restart_task: Optional[asyncio.Task] = None
self._executor_lock = threading.Lock()
self._executor: Optional[concurrent.futures.ThreadPoolExecutor] = None
# Set on gateway stop so the recreate-on-shutdown path can't resurrect
# the pool during a real shutdown.
self._executor_closing = False
# Track running agents per session for interrupt support
# Key: session_key, Value: AIAgent instance
self._running_agents: Dict[str, Any] = {}
@ -13415,6 +13417,8 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
self._executor_lock = lock
with lock:
if getattr(self, "_executor_closing", False):
raise RuntimeError("Gateway is shutting down; executor unavailable")
executor = getattr(self, "_executor", None)
if executor is None or getattr(executor, "_shutdown", False):
executor = concurrent.futures.ThreadPoolExecutor(
@ -13431,6 +13435,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
return
with lock:
self._executor_closing = True
executor = getattr(self, "_executor", None)
self._executor = None

View file

@ -1438,6 +1438,9 @@ class FeishuAdapter(BasePlatformAdapter):
# if it has been shut down. See issue #10849.
self._sdk_executor_lock = threading.Lock()
self._sdk_executor: Optional[concurrent.futures.ThreadPoolExecutor] = None
# Set on disconnect/shutdown so a real teardown can't be resurrected
# by the recreate-on-shutdown path; cleared on connect for reconnects.
self._sdk_executor_closing = False
self._ws_client: Optional[Any] = None
self._ws_future: Optional[asyncio.Future] = None
self._ws_thread_loop: Optional[asyncio.AbstractEventLoop] = None
@ -1652,14 +1655,18 @@ class FeishuAdapter(BasePlatformAdapter):
def _get_sdk_executor(self) -> concurrent.futures.ThreadPoolExecutor:
"""Return the adapter-owned executor for blocking Feishu SDK calls.
Recreates the pool if it was never built or has been shut down, so a
torn-down executor can no longer permanently wedge sends (#10849).
Recreates the pool if it was never built or was shut down by an
*external* teardown of the loop's default executor, so that can no
longer permanently wedge sends (#10849). Refuses to resurrect once
the adapter itself is closing a real disconnect/shutdown stays shut.
"""
lock = getattr(self, "_sdk_executor_lock", None)
if lock is None:
lock = threading.Lock()
self._sdk_executor_lock = lock
with lock:
if getattr(self, "_sdk_executor_closing", False):
raise RuntimeError("Feishu adapter is shutting down; SDK executor unavailable")
executor = getattr(self, "_sdk_executor", None)
if executor is None or getattr(executor, "_shutdown", False):
executor = concurrent.futures.ThreadPoolExecutor(
@ -1680,6 +1687,7 @@ class FeishuAdapter(BasePlatformAdapter):
if lock is None:
return
with lock:
self._sdk_executor_closing = True
executor = getattr(self, "_sdk_executor", None)
self._sdk_executor = None
if executor is None:
@ -1691,6 +1699,9 @@ class FeishuAdapter(BasePlatformAdapter):
async def connect(self, *, is_reconnect: bool = False) -> bool:
"""Connect to Feishu/Lark."""
# A fresh connect (or reconnect) re-arms the SDK executor after a prior
# disconnect set the closing flag.
self._sdk_executor_closing = False
if not FEISHU_AVAILABLE:
logger.error("[Feishu] lark-oapi not installed")
return False

View file

@ -22,6 +22,7 @@ def _bare_adapter() -> FeishuAdapter:
adapter._sdk_executor_lock = threading.Lock()
adapter._sdk_executor = None
adapter._sdk_executor_closing = False
return adapter
@ -82,7 +83,41 @@ async def test_run_blocking_survives_pool_shutdown():
adapter._shutdown_sdk_executor()
# The old pool is gone; the next call rebuilds one instead of raising
# "cannot schedule new futures after shutdown".
# _shutdown set the closing flag, so this would now refuse — re-arm first
# the way a reconnect does, then the next call rebuilds the pool.
adapter._sdk_executor_closing = False
assert await adapter._run_blocking(lambda: "second") == "second"
adapter._shutdown_sdk_executor()
def test_closing_flag_refuses_resurrection():
"""A real disconnect/shutdown must NOT be resurrected by the recreate path."""
adapter = _bare_adapter()
adapter._get_sdk_executor() # build a live pool
adapter._shutdown_sdk_executor() # real teardown sets _closing
assert adapter._sdk_executor_closing is True
with pytest.raises(RuntimeError, match="shutting down"):
adapter._get_sdk_executor()
@pytest.mark.asyncio
async def test_reconnect_rearms_executor():
"""connect() clears the closing flag so a reconnect can use the pool again."""
import threading
adapter = object.__new__(FeishuAdapter)
adapter._sdk_executor_lock = threading.Lock()
adapter._sdk_executor = None
adapter._sdk_executor_closing = True # as if a prior disconnect ran
# connect() bails early (no creds) but must still re-arm the executor.
adapter._app_id = ""
adapter._app_secret = ""
ok = await adapter.connect()
assert ok is False # bailed on missing creds
assert adapter._sdk_executor_closing is False
# And now the executor is usable again.
assert await adapter._run_blocking(lambda: "rearmed") == "rearmed"
adapter._shutdown_sdk_executor()

View file

@ -375,20 +375,22 @@ async def test_run_in_executor_with_context_survives_default_executor_shutdown()
@pytest.mark.asyncio
async def test_run_in_executor_with_context_recreates_shutdown_gateway_executor():
"""A stopped gateway-owned executor should be replaced on the next use."""
async def test_gateway_executor_refuses_resurrection_after_shutdown():
"""A real gateway shutdown must NOT be resurrected by the recreate path.
_shutdown_executor() means "we're stopping" the recreate-on-shutdown
logic exists to survive an *external* teardown of the loop default
(test_..._survives_default_executor_shutdown), not to undo our own stop.
"""
runner = object.__new__(GatewayRunner)
try:
first = await runner._run_in_executor_with_context(lambda: "first")
first_executor = runner._executor
assert first == "first"
runner._shutdown_executor()
second = await runner._run_in_executor_with_context(lambda: "second")
second_executor = runner._executor
with pytest.raises(RuntimeError, match="shutting down"):
await runner._run_in_executor_with_context(lambda: "second")
finally:
runner._shutdown_executor()
assert first == "first"
assert second == "second"
assert second_executor is not first_executor