From b296915c82c9da02bd6edacf52490e68f85e1f16 Mon Sep 17 00:00:00 2001 From: teknium1 <127238744+teknium1@users.noreply.github.com> Date: Sat, 27 Jun 2026 03:36:03 -0700 Subject: [PATCH] fix(feishu): route blocking SDK calls through an adapter-owned executor Feishu SDK calls ran on asyncio's shared default executor, so a torn-down default executor wedged every send with 'Executor shutdown has been called' and left the gateway a zombie (#10849). The adapter now owns a ThreadPoolExecutor recreated on demand if shut down, mirroring the gateway-owned executor change. Routes all 17 self._client SDK calls through _run_blocking; shuts the pool down on disconnect. --- plugins/platforms/feishu/adapter.py | 83 +++++++++++++---- tests/gateway/test_feishu_sdk_executor.py | 88 +++++++++++++++++++ .../test_stream_consumer_thread_routing.py | 9 ++ 3 files changed, 163 insertions(+), 17 deletions(-) create mode 100644 tests/gateway/test_feishu_sdk_executor.py diff --git a/plugins/platforms/feishu/adapter.py b/plugins/platforms/feishu/adapter.py index c3ce2c29431..415ec9550cf 100644 --- a/plugins/platforms/feishu/adapter.py +++ b/plugins/platforms/feishu/adapter.py @@ -49,6 +49,7 @@ from __future__ import annotations import asyncio import collections +import concurrent.futures import hashlib import hmac import itertools @@ -1430,6 +1431,13 @@ class FeishuAdapter(BasePlatformAdapter): self._settings = self._load_settings(config.extra or {}) self._apply_settings(self._settings) self._client: Optional[Any] = None + # Adapter-owned thread pool for blocking Feishu SDK calls. Routing SDK + # work through this pool (instead of asyncio's shared default executor) + # means a torn-down default executor can no longer wedge sends with + # "Executor shutdown has been called" — the pool is recreated on demand + # if it has been shut down. See issue #10849. + self._sdk_executor_lock = threading.Lock() + self._sdk_executor: Optional[concurrent.futures.ThreadPoolExecutor] = None self._ws_client: Optional[Any] = None self._ws_future: Optional[asyncio.Future] = None self._ws_thread_loop: Optional[asyncio.AbstractEventLoop] = None @@ -1641,6 +1649,46 @@ class FeishuAdapter(BasePlatformAdapter): .build() ) + def _get_sdk_executor(self) -> concurrent.futures.ThreadPoolExecutor: + """Return the adapter-owned executor for blocking Feishu SDK calls. + + Recreates the pool if it was never built or has been shut down, so a + torn-down executor can no longer permanently wedge sends (#10849). + """ + lock = getattr(self, "_sdk_executor_lock", None) + if lock is None: + lock = threading.Lock() + self._sdk_executor_lock = lock + with lock: + executor = getattr(self, "_sdk_executor", None) + if executor is None or getattr(executor, "_shutdown", False): + executor = concurrent.futures.ThreadPoolExecutor( + max_workers=10, + thread_name_prefix="hermes-feishu-sdk", + ) + self._sdk_executor = executor + return executor + + async def _run_blocking(self, func, *args): + """Run a blocking Feishu SDK call on the adapter-owned thread pool.""" + loop = asyncio.get_running_loop() + return await loop.run_in_executor(self._get_sdk_executor(), func, *args) + + def _shutdown_sdk_executor(self) -> None: + """Stop the adapter-owned SDK executor without touching the loop default.""" + lock = getattr(self, "_sdk_executor_lock", None) + if lock is None: + return + with lock: + executor = getattr(self, "_sdk_executor", None) + self._sdk_executor = None + if executor is None: + return + try: + executor.shutdown(wait=False, cancel_futures=True) + except TypeError: + executor.shutdown(wait=False) + async def connect(self, *, is_reconnect: bool = False) -> bool: """Connect to Feishu/Lark.""" if not FEISHU_AVAILABLE: @@ -1730,6 +1778,7 @@ class FeishuAdapter(BasePlatformAdapter): self._ws_thread_loop = None self._loop = None self._event_handler = None + self._shutdown_sdk_executor() self._persist_seen_message_ids() await self._release_app_lock() @@ -1846,7 +1895,7 @@ class FeishuAdapter(BasePlatformAdapter): msg_type, payload = self._build_outbound_payload(content) body = self._build_update_message_body(msg_type=msg_type, content=payload) request = self._build_update_message_request(message_id=message_id, request_body=body) - response = await asyncio.to_thread(self._client.im.v1.message.update, request) + response = await self._run_blocking(self._client.im.v1.message.update, request) result = self._finalize_send_result(response, "update failed") if not result.success and msg_type == "post" and _POST_CONTENT_INVALID_RE.search(result.error or ""): logger.warning("[Feishu] Invalid post update payload rejected by API; falling back to plain text") @@ -1855,7 +1904,7 @@ class FeishuAdapter(BasePlatformAdapter): content=json.dumps({"text": _strip_markdown_to_plain_text(content)}, ensure_ascii=False), ) fallback_request = self._build_update_message_request(message_id=message_id, request_body=fallback_body) - fallback_response = await asyncio.to_thread(self._client.im.v1.message.update, fallback_request) + fallback_response = await self._run_blocking(self._client.im.v1.message.update, fallback_request) result = self._finalize_send_result(fallback_response, "update failed") if result.success: result.message_id = message_id @@ -2128,7 +2177,7 @@ class FeishuAdapter(BasePlatformAdapter): image=image_file, ) request = self._build_image_upload_request(body) - upload_response = await asyncio.to_thread(self._client.im.v1.image.create, request) + upload_response = await self._run_blocking(self._client.im.v1.image.create, request) image_key = self._extract_response_field(upload_response, "image_key") if not image_key: return self._response_error_result( @@ -2244,7 +2293,7 @@ class FeishuAdapter(BasePlatformAdapter): try: request = self._build_get_chat_request(chat_id) - response = await asyncio.to_thread(self._client.im.v1.chat.get, request) + response = await self._run_blocking(self._client.im.v1.chat.get, request) if not response or getattr(response, "success", lambda: False)() is False: code = getattr(response, "code", "unknown") msg = getattr(response, "msg", "chat lookup failed") @@ -2789,7 +2838,7 @@ class FeishuAdapter(BasePlatformAdapter): # Fetch the target message to verify it was sent by us and to obtain chat context. try: request = self._build_get_message_request(message_id) - response = await asyncio.to_thread(self._client.im.v1.message.get, request) + response = await self._run_blocking(self._client.im.v1.message.get, request) if not response or not getattr(response, "success", lambda: False)(): return items = getattr(getattr(response, "data", None), "items", None) or [] @@ -2967,7 +3016,7 @@ class FeishuAdapter(BasePlatformAdapter): .request_body(body) .build() ) - response = await asyncio.to_thread(self._client.im.v1.message_reaction.create, request) + response = await self._run_blocking(self._client.im.v1.message_reaction.create, request) if response and getattr(response, "success", lambda: False)(): data = getattr(response, "data", None) return getattr(data, "reaction_id", None) @@ -2998,7 +3047,7 @@ class FeishuAdapter(BasePlatformAdapter): .reaction_id(reaction_id) .build() ) - response = await asyncio.to_thread(self._client.im.v1.message_reaction.delete, request) + response = await self._run_blocking(self._client.im.v1.message_reaction.delete, request) if response and getattr(response, "success", lambda: False)(): return True logger.debug( @@ -3713,7 +3762,7 @@ class FeishuAdapter(BasePlatformAdapter): file_key=image_key, resource_type="image", ) - response = await asyncio.to_thread(self._client.im.v1.message_resource.get, request) + response = await self._run_blocking(self._client.im.v1.message_resource.get, request) if not response or not response.success(): logger.warning( "[Feishu] Failed to download image %s: %s %s", @@ -3757,7 +3806,7 @@ class FeishuAdapter(BasePlatformAdapter): file_key=file_key, resource_type=request_type, ) - response = await asyncio.to_thread(self._client.im.v1.message_resource.get, request) + response = await self._run_blocking(self._client.im.v1.message_resource.get, request) if not response or not response.success(): logger.debug( "[Feishu] Resource download failed for %s/%s via type=%s: %s %s", @@ -3975,7 +4024,7 @@ class FeishuAdapter(BasePlatformAdapter): else: id_type = "user_id" request = GetUserRequest.builder().user_id(trimmed).user_id_type(id_type).build() - response = await asyncio.to_thread(self._client.contact.v3.user.get, request) + response = await self._run_blocking(self._client.contact.v3.user.get, request) if not response or not response.success(): return None user = getattr(getattr(response, "data", None), "user", None) @@ -4006,7 +4055,7 @@ class FeishuAdapter(BasePlatformAdapter): .token_types({AccessTokenType.TENANT}) .build() ) - resp = await asyncio.to_thread(self._client.request, req) + resp = await self._run_blocking(self._client.request, req) content = getattr(getattr(resp, "raw", None), "content", None) if not content: return None @@ -4031,7 +4080,7 @@ class FeishuAdapter(BasePlatformAdapter): return self._message_text_cache[message_id] try: request = self._build_get_message_request(message_id) - response = await asyncio.to_thread(self._client.im.v1.message.get, request) + response = await self._run_blocking(self._client.im.v1.message.get, request) if not response or getattr(response, "success", lambda: False)() is False: code = getattr(response, "code", "unknown") msg = getattr(response, "msg", "message lookup failed") @@ -4255,7 +4304,7 @@ class FeishuAdapter(BasePlatformAdapter): .token_types({AccessTokenType.TENANT}) .build() ) - resp = await asyncio.to_thread(self._client.request, req) + resp = await self._run_blocking(self._client.request, req) content = getattr(getattr(resp, "raw", None), "content", None) if content: payload = json.loads(content) @@ -4287,7 +4336,7 @@ class FeishuAdapter(BasePlatformAdapter): return try: request = self._build_get_application_request(app_id=self._app_id, lang="en_us") - response = await asyncio.to_thread(self._client.application.v6.application.get, request) + response = await self._run_blocking(self._client.application.v6.application.get, request) if not response or not response.success(): code = getattr(response, "code", None) if code == 99991672: @@ -4415,7 +4464,7 @@ class FeishuAdapter(BasePlatformAdapter): file=file_obj, ) request = self._build_file_upload_request(body) - upload_response = await asyncio.to_thread(self._client.im.v1.file.create, request) + upload_response = await self._run_blocking(self._client.im.v1.file.create, request) file_key = self._extract_response_field(upload_response, "file_key") if not file_key: return self._response_error_result( @@ -4471,7 +4520,7 @@ class FeishuAdapter(BasePlatformAdapter): uuid_value=str(uuid.uuid4()), ) request = self._build_reply_message_request(effective_reply_to, body) - return await asyncio.to_thread(self._client.im.v1.message.reply, request) + return await self._run_blocking(self._client.im.v1.message.reply, request) # For topic/thread messages that fell back from reply→create, use # thread_id as receive_id so the message lands in the topic instead of @@ -4501,7 +4550,7 @@ class FeishuAdapter(BasePlatformAdapter): uuid_value=str(uuid.uuid4()), ) request = self._build_create_message_request(receive_id_type, body) - return await asyncio.to_thread(self._client.im.v1.message.create, request) + return await self._run_blocking(self._client.im.v1.message.create, request) @staticmethod def _response_succeeded(response: Any) -> bool: diff --git a/tests/gateway/test_feishu_sdk_executor.py b/tests/gateway/test_feishu_sdk_executor.py new file mode 100644 index 00000000000..d4cce31a5dd --- /dev/null +++ b/tests/gateway/test_feishu_sdk_executor.py @@ -0,0 +1,88 @@ +"""Regression tests for the Feishu adapter's owned SDK executor. + +Blocking Feishu SDK calls used to run on asyncio's shared default executor. +When that executor was torn down (agent thread exit / loop cleanup), every +subsequent send failed permanently with "Executor shutdown has been called" +and the gateway became a zombie. The adapter now owns its own +ThreadPoolExecutor and recreates it on demand if it has been shut down. + +Covers: #10849 +""" +import concurrent.futures + +import pytest + +from plugins.platforms.feishu.adapter import FeishuAdapter + + +def _bare_adapter() -> FeishuAdapter: + """A FeishuAdapter with only the executor fields wired (no __init__).""" + adapter = object.__new__(FeishuAdapter) + import threading + + adapter._sdk_executor_lock = threading.Lock() + adapter._sdk_executor = None + return adapter + + +def test_get_executor_creates_pool(): + adapter = _bare_adapter() + executor = adapter._get_sdk_executor() + assert isinstance(executor, concurrent.futures.ThreadPoolExecutor) + # Same instance returned while alive. + assert adapter._get_sdk_executor() is executor + adapter._shutdown_sdk_executor() + + +def test_get_executor_recreates_after_shutdown(): + """A shut-down pool must be transparently replaced — the #10849 recovery.""" + adapter = _bare_adapter() + first = adapter._get_sdk_executor() + first.shutdown(wait=True) + assert getattr(first, "_shutdown", False) is True + + second = adapter._get_sdk_executor() + assert second is not first + assert getattr(second, "_shutdown", False) is False + adapter._shutdown_sdk_executor() + + +def test_shutdown_clears_reference(): + adapter = _bare_adapter() + adapter._get_sdk_executor() + adapter._shutdown_sdk_executor() + assert adapter._sdk_executor is None + # Idempotent. + adapter._shutdown_sdk_executor() + + +@pytest.mark.asyncio +async def test_run_blocking_executes_on_owned_pool(): + adapter = _bare_adapter() + captured = {} + + def _work(value): + import threading + + captured["thread"] = threading.current_thread().name + return value * 2 + + result = await adapter._run_blocking(_work, 21) + assert result == 42 + # Ran on the adapter-owned pool, not the default executor. + assert captured["thread"].startswith("hermes-feishu-sdk") + adapter._shutdown_sdk_executor() + + +@pytest.mark.asyncio +async def test_run_blocking_survives_pool_shutdown(): + """After the pool is shut down, _run_blocking transparently recovers.""" + adapter = _bare_adapter() + assert await adapter._run_blocking(lambda: "first") == "first" + + adapter._shutdown_sdk_executor() + + # The old pool is gone; the next call rebuilds one instead of raising + # "cannot schedule new futures after shutdown". + assert await adapter._run_blocking(lambda: "second") == "second" + adapter._shutdown_sdk_executor() diff --git a/tests/gateway/test_stream_consumer_thread_routing.py b/tests/gateway/test_stream_consumer_thread_routing.py index bb1675f03c0..a133a078466 100644 --- a/tests/gateway/test_stream_consumer_thread_routing.py +++ b/tests/gateway/test_stream_consumer_thread_routing.py @@ -197,6 +197,12 @@ class TestFeishuFallbackThreadRouting: adapter._client = mock_client adapter._build_create_message_body = FeishuAdapter._build_create_message_body adapter._build_create_message_request = FeishuAdapter._build_create_message_request + # _send_raw_message routes blocking SDK calls through _run_blocking + # (adapter-owned executor). On a MagicMock(spec=...) that method is + # auto-mocked and would swallow the real call, so wire a passthrough. + async def _run_blocking_passthrough(func, *args): + return func(*args) + adapter._run_blocking = _run_blocking_passthrough # Call _send_raw_message with reply_to=None and thread_id in metadata import json @@ -250,6 +256,9 @@ class TestFeishuFallbackThreadRouting: adapter._client = mock_client adapter._build_create_message_body = FeishuAdapter._build_create_message_body adapter._build_create_message_request = FeishuAdapter._build_create_message_request + async def _run_blocking_passthrough(func, *args): + return func(*args) + adapter._run_blocking = _run_blocking_passthrough import json result = await FeishuAdapter._send_raw_message(