From 7ee0b689739e80c75d7c32f42cf2a79a2207c0a0 Mon Sep 17 00:00:00 2001 From: teknium1 <127238744+teknium1@users.noreply.github.com> Date: Sat, 27 Jun 2026 03:59:17 -0700 Subject: [PATCH] fix(gateway,feishu): refuse executor resurrection during real shutdown MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add an explicit _closing guard to both owned executors so the recreate-on-shutdown path only recovers from an *external* teardown of the loop default — never resurrects a pool the gateway/adapter itself stopped. _shutdown_*executor() sets the flag; _get_*executor() raises if closing; feishu connect() re-arms on reconnect. Updates the gateway recreate test to assert the refusal contract and adds feishu coverage. --- gateway/run.py | 7 +++- plugins/platforms/feishu/adapter.py | 15 +++++++-- tests/gateway/test_feishu_sdk_executor.py | 39 +++++++++++++++++++++-- tests/gateway/test_session_env.py | 18 ++++++----- 4 files changed, 66 insertions(+), 13 deletions(-) diff --git a/gateway/run.py b/gateway/run.py index cfbabf1a717..77d0c5d088b 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -2644,7 +2644,9 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew self._restart_task: Optional[asyncio.Task] = None self._executor_lock = threading.Lock() self._executor: Optional[concurrent.futures.ThreadPoolExecutor] = None - + # Set on gateway stop so the recreate-on-shutdown path can't resurrect + # the pool during a real shutdown. + self._executor_closing = False # Track running agents per session for interrupt support # Key: session_key, Value: AIAgent instance self._running_agents: Dict[str, Any] = {} @@ -13415,6 +13417,8 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew self._executor_lock = lock with lock: + if getattr(self, "_executor_closing", False): + raise RuntimeError("Gateway is shutting down; executor unavailable") executor = getattr(self, "_executor", None) if executor is None or getattr(executor, "_shutdown", False): executor = concurrent.futures.ThreadPoolExecutor( @@ -13431,6 +13435,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew return with lock: + self._executor_closing = True executor = getattr(self, "_executor", None) self._executor = None diff --git a/plugins/platforms/feishu/adapter.py b/plugins/platforms/feishu/adapter.py index 415ec9550cf..42bd404104a 100644 --- a/plugins/platforms/feishu/adapter.py +++ b/plugins/platforms/feishu/adapter.py @@ -1438,6 +1438,9 @@ class FeishuAdapter(BasePlatformAdapter): # if it has been shut down. See issue #10849. self._sdk_executor_lock = threading.Lock() self._sdk_executor: Optional[concurrent.futures.ThreadPoolExecutor] = None + # Set on disconnect/shutdown so a real teardown can't be resurrected + # by the recreate-on-shutdown path; cleared on connect for reconnects. + self._sdk_executor_closing = False self._ws_client: Optional[Any] = None self._ws_future: Optional[asyncio.Future] = None self._ws_thread_loop: Optional[asyncio.AbstractEventLoop] = None @@ -1652,14 +1655,18 @@ class FeishuAdapter(BasePlatformAdapter): def _get_sdk_executor(self) -> concurrent.futures.ThreadPoolExecutor: """Return the adapter-owned executor for blocking Feishu SDK calls. - Recreates the pool if it was never built or has been shut down, so a - torn-down executor can no longer permanently wedge sends (#10849). + Recreates the pool if it was never built or was shut down by an + *external* teardown of the loop's default executor, so that can no + longer permanently wedge sends (#10849). Refuses to resurrect once + the adapter itself is closing — a real disconnect/shutdown stays shut. """ lock = getattr(self, "_sdk_executor_lock", None) if lock is None: lock = threading.Lock() self._sdk_executor_lock = lock with lock: + if getattr(self, "_sdk_executor_closing", False): + raise RuntimeError("Feishu adapter is shutting down; SDK executor unavailable") executor = getattr(self, "_sdk_executor", None) if executor is None or getattr(executor, "_shutdown", False): executor = concurrent.futures.ThreadPoolExecutor( @@ -1680,6 +1687,7 @@ class FeishuAdapter(BasePlatformAdapter): if lock is None: return with lock: + self._sdk_executor_closing = True executor = getattr(self, "_sdk_executor", None) self._sdk_executor = None if executor is None: @@ -1691,6 +1699,9 @@ class FeishuAdapter(BasePlatformAdapter): async def connect(self, *, is_reconnect: bool = False) -> bool: """Connect to Feishu/Lark.""" + # A fresh connect (or reconnect) re-arms the SDK executor after a prior + # disconnect set the closing flag. + self._sdk_executor_closing = False if not FEISHU_AVAILABLE: logger.error("[Feishu] lark-oapi not installed") return False diff --git a/tests/gateway/test_feishu_sdk_executor.py b/tests/gateway/test_feishu_sdk_executor.py index d4cce31a5dd..c61ef9ae6fd 100644 --- a/tests/gateway/test_feishu_sdk_executor.py +++ b/tests/gateway/test_feishu_sdk_executor.py @@ -22,6 +22,7 @@ def _bare_adapter() -> FeishuAdapter: adapter._sdk_executor_lock = threading.Lock() adapter._sdk_executor = None + adapter._sdk_executor_closing = False return adapter @@ -82,7 +83,41 @@ async def test_run_blocking_survives_pool_shutdown(): adapter._shutdown_sdk_executor() - # The old pool is gone; the next call rebuilds one instead of raising - # "cannot schedule new futures after shutdown". + # _shutdown set the closing flag, so this would now refuse — re-arm first + # the way a reconnect does, then the next call rebuilds the pool. + adapter._sdk_executor_closing = False assert await adapter._run_blocking(lambda: "second") == "second" adapter._shutdown_sdk_executor() + + +def test_closing_flag_refuses_resurrection(): + """A real disconnect/shutdown must NOT be resurrected by the recreate path.""" + adapter = _bare_adapter() + adapter._get_sdk_executor() # build a live pool + adapter._shutdown_sdk_executor() # real teardown sets _closing + + assert adapter._sdk_executor_closing is True + with pytest.raises(RuntimeError, match="shutting down"): + adapter._get_sdk_executor() + + +@pytest.mark.asyncio +async def test_reconnect_rearms_executor(): + """connect() clears the closing flag so a reconnect can use the pool again.""" + import threading + + adapter = object.__new__(FeishuAdapter) + adapter._sdk_executor_lock = threading.Lock() + adapter._sdk_executor = None + adapter._sdk_executor_closing = True # as if a prior disconnect ran + + # connect() bails early (no creds) but must still re-arm the executor. + adapter._app_id = "" + adapter._app_secret = "" + ok = await adapter.connect() + assert ok is False # bailed on missing creds + assert adapter._sdk_executor_closing is False + # And now the executor is usable again. + assert await adapter._run_blocking(lambda: "rearmed") == "rearmed" + adapter._shutdown_sdk_executor() + diff --git a/tests/gateway/test_session_env.py b/tests/gateway/test_session_env.py index a47653db988..f5392ab2c22 100644 --- a/tests/gateway/test_session_env.py +++ b/tests/gateway/test_session_env.py @@ -375,20 +375,22 @@ async def test_run_in_executor_with_context_survives_default_executor_shutdown() @pytest.mark.asyncio -async def test_run_in_executor_with_context_recreates_shutdown_gateway_executor(): - """A stopped gateway-owned executor should be replaced on the next use.""" +async def test_gateway_executor_refuses_resurrection_after_shutdown(): + """A real gateway shutdown must NOT be resurrected by the recreate path. + + _shutdown_executor() means "we're stopping" — the recreate-on-shutdown + logic exists to survive an *external* teardown of the loop default + (test_..._survives_default_executor_shutdown), not to undo our own stop. + """ runner = object.__new__(GatewayRunner) try: first = await runner._run_in_executor_with_context(lambda: "first") - first_executor = runner._executor + assert first == "first" runner._shutdown_executor() - second = await runner._run_in_executor_with_context(lambda: "second") - second_executor = runner._executor + with pytest.raises(RuntimeError, match="shutting down"): + await runner._run_in_executor_with_context(lambda: "second") finally: runner._shutdown_executor() - assert first == "first" - assert second == "second" - assert second_executor is not first_executor