fix(feishu): route blocking SDK calls through an adapter-owned executor

Feishu SDK calls ran on asyncio's shared default executor, so a torn-down
default executor wedged every send with 'Executor shutdown has been called'
and left the gateway a zombie (#10849). The adapter now owns a
ThreadPoolExecutor recreated on demand if shut down, mirroring the
gateway-owned executor change. Routes all 17 self._client SDK calls through
_run_blocking; shuts the pool down on disconnect.
This commit is contained in:
teknium1 2026-06-27 03:36:03 -07:00 committed by Teknium
parent 1011c07966
commit b296915c82
3 changed files with 163 additions and 17 deletions

View file

@ -49,6 +49,7 @@ from __future__ import annotations
import asyncio
import collections
import concurrent.futures
import hashlib
import hmac
import itertools
@ -1430,6 +1431,13 @@ class FeishuAdapter(BasePlatformAdapter):
self._settings = self._load_settings(config.extra or {})
self._apply_settings(self._settings)
self._client: Optional[Any] = None
# Adapter-owned thread pool for blocking Feishu SDK calls. Routing SDK
# work through this pool (instead of asyncio's shared default executor)
# means a torn-down default executor can no longer wedge sends with
# "Executor shutdown has been called" — the pool is recreated on demand
# if it has been shut down. See issue #10849.
self._sdk_executor_lock = threading.Lock()
self._sdk_executor: Optional[concurrent.futures.ThreadPoolExecutor] = None
self._ws_client: Optional[Any] = None
self._ws_future: Optional[asyncio.Future] = None
self._ws_thread_loop: Optional[asyncio.AbstractEventLoop] = None
@ -1641,6 +1649,46 @@ class FeishuAdapter(BasePlatformAdapter):
.build()
)
def _get_sdk_executor(self) -> concurrent.futures.ThreadPoolExecutor:
"""Return the adapter-owned executor for blocking Feishu SDK calls.
Recreates the pool if it was never built or has been shut down, so a
torn-down executor can no longer permanently wedge sends (#10849).
"""
lock = getattr(self, "_sdk_executor_lock", None)
if lock is None:
lock = threading.Lock()
self._sdk_executor_lock = lock
with lock:
executor = getattr(self, "_sdk_executor", None)
if executor is None or getattr(executor, "_shutdown", False):
executor = concurrent.futures.ThreadPoolExecutor(
max_workers=10,
thread_name_prefix="hermes-feishu-sdk",
)
self._sdk_executor = executor
return executor
async def _run_blocking(self, func, *args):
"""Run a blocking Feishu SDK call on the adapter-owned thread pool."""
loop = asyncio.get_running_loop()
return await loop.run_in_executor(self._get_sdk_executor(), func, *args)
def _shutdown_sdk_executor(self) -> None:
"""Stop the adapter-owned SDK executor without touching the loop default."""
lock = getattr(self, "_sdk_executor_lock", None)
if lock is None:
return
with lock:
executor = getattr(self, "_sdk_executor", None)
self._sdk_executor = None
if executor is None:
return
try:
executor.shutdown(wait=False, cancel_futures=True)
except TypeError:
executor.shutdown(wait=False)
async def connect(self, *, is_reconnect: bool = False) -> bool:
"""Connect to Feishu/Lark."""
if not FEISHU_AVAILABLE:
@ -1730,6 +1778,7 @@ class FeishuAdapter(BasePlatformAdapter):
self._ws_thread_loop = None
self._loop = None
self._event_handler = None
self._shutdown_sdk_executor()
self._persist_seen_message_ids()
await self._release_app_lock()
@ -1846,7 +1895,7 @@ class FeishuAdapter(BasePlatformAdapter):
msg_type, payload = self._build_outbound_payload(content)
body = self._build_update_message_body(msg_type=msg_type, content=payload)
request = self._build_update_message_request(message_id=message_id, request_body=body)
response = await asyncio.to_thread(self._client.im.v1.message.update, request)
response = await self._run_blocking(self._client.im.v1.message.update, request)
result = self._finalize_send_result(response, "update failed")
if not result.success and msg_type == "post" and _POST_CONTENT_INVALID_RE.search(result.error or ""):
logger.warning("[Feishu] Invalid post update payload rejected by API; falling back to plain text")
@ -1855,7 +1904,7 @@ class FeishuAdapter(BasePlatformAdapter):
content=json.dumps({"text": _strip_markdown_to_plain_text(content)}, ensure_ascii=False),
)
fallback_request = self._build_update_message_request(message_id=message_id, request_body=fallback_body)
fallback_response = await asyncio.to_thread(self._client.im.v1.message.update, fallback_request)
fallback_response = await self._run_blocking(self._client.im.v1.message.update, fallback_request)
result = self._finalize_send_result(fallback_response, "update failed")
if result.success:
result.message_id = message_id
@ -2128,7 +2177,7 @@ class FeishuAdapter(BasePlatformAdapter):
image=image_file,
)
request = self._build_image_upload_request(body)
upload_response = await asyncio.to_thread(self._client.im.v1.image.create, request)
upload_response = await self._run_blocking(self._client.im.v1.image.create, request)
image_key = self._extract_response_field(upload_response, "image_key")
if not image_key:
return self._response_error_result(
@ -2244,7 +2293,7 @@ class FeishuAdapter(BasePlatformAdapter):
try:
request = self._build_get_chat_request(chat_id)
response = await asyncio.to_thread(self._client.im.v1.chat.get, request)
response = await self._run_blocking(self._client.im.v1.chat.get, request)
if not response or getattr(response, "success", lambda: False)() is False:
code = getattr(response, "code", "unknown")
msg = getattr(response, "msg", "chat lookup failed")
@ -2789,7 +2838,7 @@ class FeishuAdapter(BasePlatformAdapter):
# Fetch the target message to verify it was sent by us and to obtain chat context.
try:
request = self._build_get_message_request(message_id)
response = await asyncio.to_thread(self._client.im.v1.message.get, request)
response = await self._run_blocking(self._client.im.v1.message.get, request)
if not response or not getattr(response, "success", lambda: False)():
return
items = getattr(getattr(response, "data", None), "items", None) or []
@ -2967,7 +3016,7 @@ class FeishuAdapter(BasePlatformAdapter):
.request_body(body)
.build()
)
response = await asyncio.to_thread(self._client.im.v1.message_reaction.create, request)
response = await self._run_blocking(self._client.im.v1.message_reaction.create, request)
if response and getattr(response, "success", lambda: False)():
data = getattr(response, "data", None)
return getattr(data, "reaction_id", None)
@ -2998,7 +3047,7 @@ class FeishuAdapter(BasePlatformAdapter):
.reaction_id(reaction_id)
.build()
)
response = await asyncio.to_thread(self._client.im.v1.message_reaction.delete, request)
response = await self._run_blocking(self._client.im.v1.message_reaction.delete, request)
if response and getattr(response, "success", lambda: False)():
return True
logger.debug(
@ -3713,7 +3762,7 @@ class FeishuAdapter(BasePlatformAdapter):
file_key=image_key,
resource_type="image",
)
response = await asyncio.to_thread(self._client.im.v1.message_resource.get, request)
response = await self._run_blocking(self._client.im.v1.message_resource.get, request)
if not response or not response.success():
logger.warning(
"[Feishu] Failed to download image %s: %s %s",
@ -3757,7 +3806,7 @@ class FeishuAdapter(BasePlatformAdapter):
file_key=file_key,
resource_type=request_type,
)
response = await asyncio.to_thread(self._client.im.v1.message_resource.get, request)
response = await self._run_blocking(self._client.im.v1.message_resource.get, request)
if not response or not response.success():
logger.debug(
"[Feishu] Resource download failed for %s/%s via type=%s: %s %s",
@ -3975,7 +4024,7 @@ class FeishuAdapter(BasePlatformAdapter):
else:
id_type = "user_id"
request = GetUserRequest.builder().user_id(trimmed).user_id_type(id_type).build()
response = await asyncio.to_thread(self._client.contact.v3.user.get, request)
response = await self._run_blocking(self._client.contact.v3.user.get, request)
if not response or not response.success():
return None
user = getattr(getattr(response, "data", None), "user", None)
@ -4006,7 +4055,7 @@ class FeishuAdapter(BasePlatformAdapter):
.token_types({AccessTokenType.TENANT})
.build()
)
resp = await asyncio.to_thread(self._client.request, req)
resp = await self._run_blocking(self._client.request, req)
content = getattr(getattr(resp, "raw", None), "content", None)
if not content:
return None
@ -4031,7 +4080,7 @@ class FeishuAdapter(BasePlatformAdapter):
return self._message_text_cache[message_id]
try:
request = self._build_get_message_request(message_id)
response = await asyncio.to_thread(self._client.im.v1.message.get, request)
response = await self._run_blocking(self._client.im.v1.message.get, request)
if not response or getattr(response, "success", lambda: False)() is False:
code = getattr(response, "code", "unknown")
msg = getattr(response, "msg", "message lookup failed")
@ -4255,7 +4304,7 @@ class FeishuAdapter(BasePlatformAdapter):
.token_types({AccessTokenType.TENANT})
.build()
)
resp = await asyncio.to_thread(self._client.request, req)
resp = await self._run_blocking(self._client.request, req)
content = getattr(getattr(resp, "raw", None), "content", None)
if content:
payload = json.loads(content)
@ -4287,7 +4336,7 @@ class FeishuAdapter(BasePlatformAdapter):
return
try:
request = self._build_get_application_request(app_id=self._app_id, lang="en_us")
response = await asyncio.to_thread(self._client.application.v6.application.get, request)
response = await self._run_blocking(self._client.application.v6.application.get, request)
if not response or not response.success():
code = getattr(response, "code", None)
if code == 99991672:
@ -4415,7 +4464,7 @@ class FeishuAdapter(BasePlatformAdapter):
file=file_obj,
)
request = self._build_file_upload_request(body)
upload_response = await asyncio.to_thread(self._client.im.v1.file.create, request)
upload_response = await self._run_blocking(self._client.im.v1.file.create, request)
file_key = self._extract_response_field(upload_response, "file_key")
if not file_key:
return self._response_error_result(
@ -4471,7 +4520,7 @@ class FeishuAdapter(BasePlatformAdapter):
uuid_value=str(uuid.uuid4()),
)
request = self._build_reply_message_request(effective_reply_to, body)
return await asyncio.to_thread(self._client.im.v1.message.reply, request)
return await self._run_blocking(self._client.im.v1.message.reply, request)
# For topic/thread messages that fell back from reply→create, use
# thread_id as receive_id so the message lands in the topic instead of
@ -4501,7 +4550,7 @@ class FeishuAdapter(BasePlatformAdapter):
uuid_value=str(uuid.uuid4()),
)
request = self._build_create_message_request(receive_id_type, body)
return await asyncio.to_thread(self._client.im.v1.message.create, request)
return await self._run_blocking(self._client.im.v1.message.create, request)
@staticmethod
def _response_succeeded(response: Any) -> bool:

View file

@ -0,0 +1,88 @@
"""Regression tests for the Feishu adapter's owned SDK executor.
Blocking Feishu SDK calls used to run on asyncio's shared default executor.
When that executor was torn down (agent thread exit / loop cleanup), every
subsequent send failed permanently with "Executor shutdown has been called"
and the gateway became a zombie. The adapter now owns its own
ThreadPoolExecutor and recreates it on demand if it has been shut down.
Covers: #10849
"""
import concurrent.futures
import pytest
from plugins.platforms.feishu.adapter import FeishuAdapter
def _bare_adapter() -> FeishuAdapter:
"""A FeishuAdapter with only the executor fields wired (no __init__)."""
adapter = object.__new__(FeishuAdapter)
import threading
adapter._sdk_executor_lock = threading.Lock()
adapter._sdk_executor = None
return adapter
def test_get_executor_creates_pool():
adapter = _bare_adapter()
executor = adapter._get_sdk_executor()
assert isinstance(executor, concurrent.futures.ThreadPoolExecutor)
# Same instance returned while alive.
assert adapter._get_sdk_executor() is executor
adapter._shutdown_sdk_executor()
def test_get_executor_recreates_after_shutdown():
"""A shut-down pool must be transparently replaced — the #10849 recovery."""
adapter = _bare_adapter()
first = adapter._get_sdk_executor()
first.shutdown(wait=True)
assert getattr(first, "_shutdown", False) is True
second = adapter._get_sdk_executor()
assert second is not first
assert getattr(second, "_shutdown", False) is False
adapter._shutdown_sdk_executor()
def test_shutdown_clears_reference():
adapter = _bare_adapter()
adapter._get_sdk_executor()
adapter._shutdown_sdk_executor()
assert adapter._sdk_executor is None
# Idempotent.
adapter._shutdown_sdk_executor()
@pytest.mark.asyncio
async def test_run_blocking_executes_on_owned_pool():
adapter = _bare_adapter()
captured = {}
def _work(value):
import threading
captured["thread"] = threading.current_thread().name
return value * 2
result = await adapter._run_blocking(_work, 21)
assert result == 42
# Ran on the adapter-owned pool, not the default executor.
assert captured["thread"].startswith("hermes-feishu-sdk")
adapter._shutdown_sdk_executor()
@pytest.mark.asyncio
async def test_run_blocking_survives_pool_shutdown():
"""After the pool is shut down, _run_blocking transparently recovers."""
adapter = _bare_adapter()
assert await adapter._run_blocking(lambda: "first") == "first"
adapter._shutdown_sdk_executor()
# The old pool is gone; the next call rebuilds one instead of raising
# "cannot schedule new futures after shutdown".
assert await adapter._run_blocking(lambda: "second") == "second"
adapter._shutdown_sdk_executor()

View file

@ -197,6 +197,12 @@ class TestFeishuFallbackThreadRouting:
adapter._client = mock_client
adapter._build_create_message_body = FeishuAdapter._build_create_message_body
adapter._build_create_message_request = FeishuAdapter._build_create_message_request
# _send_raw_message routes blocking SDK calls through _run_blocking
# (adapter-owned executor). On a MagicMock(spec=...) that method is
# auto-mocked and would swallow the real call, so wire a passthrough.
async def _run_blocking_passthrough(func, *args):
return func(*args)
adapter._run_blocking = _run_blocking_passthrough
# Call _send_raw_message with reply_to=None and thread_id in metadata
import json
@ -250,6 +256,9 @@ class TestFeishuFallbackThreadRouting:
adapter._client = mock_client
adapter._build_create_message_body = FeishuAdapter._build_create_message_body
adapter._build_create_message_request = FeishuAdapter._build_create_message_request
async def _run_blocking_passthrough(func, *args):
return func(*args)
adapter._run_blocking = _run_blocking_passthrough
import json
result = await FeishuAdapter._send_raw_message(