fix(weixin): replace all aiohttp ClientTimeout with asyncio.wait_for()

aiohttp ClientTimeout uses BaseTimerContext which calls
loop.call_later() internally. When invoked via
asyncio.run_coroutine_threadsafe() from cron jobs, this
triggers "Timeout context manager should be used inside a task"
errors, causing message delivery failures.

Replace all direct ClientTimeout usage with asyncio.wait_for():
- _upload_ciphertext: CDN upload (120s timeout)
- _download_bytes: CDN download (configurable timeout)
- _download_remote_media: remote media fetch (30s timeout)

Also set total=None on _send_session to disable aiohttp built-in
timeout, and change trust_env=True to False to bypass proxy for
WeChat CDN connections.
This commit is contained in:
chenlinfeng 2026-05-03 10:03:20 +08:00 committed by Teknium
parent 2e00bcaaab
commit 3a0d52d579

View file

@ -548,17 +548,21 @@ async def _upload_ciphertext(
Accepts either a constructed CDN URL (from upload_param) or a direct Accepts either a constructed CDN URL (from upload_param) or a direct
upload_full_url both use POST with the raw ciphertext as the body. upload_full_url both use POST with the raw ciphertext as the body.
""" """
timeout = aiohttp.ClientTimeout(total=120) # Use asyncio.wait_for() instead of aiohttp ClientTimeout to avoid
async with session.post(upload_url, data=ciphertext, headers={"Content-Type": "application/octet-stream"}, timeout=timeout) as response: # "Timeout context manager should be used inside a task" errors when
if response.status == 200: # invoked via asyncio.run_coroutine_threadsafe() from cron jobs.
encrypted_param = response.headers.get("x-encrypted-param") async def _do_upload() -> str:
if encrypted_param: async with session.post(upload_url, data=ciphertext, headers={"Content-Type": "application/octet-stream"}) as response:
await response.read() if response.status == 200:
return encrypted_param encrypted_param = response.headers.get("x-encrypted-param")
if encrypted_param:
await response.read()
return encrypted_param
raw = await response.text()
raise RuntimeError(f"CDN upload missing x-encrypted-param header: {raw[:200]}")
raw = await response.text() raw = await response.text()
raise RuntimeError(f"CDN upload missing x-encrypted-param header: {raw[:200]}") raise RuntimeError(f"CDN upload HTTP {response.status}: {raw[:200]}")
raw = await response.text() return await asyncio.wait_for(_do_upload(), timeout=120)
raise RuntimeError(f"CDN upload HTTP {response.status}: {raw[:200]}")
async def _download_bytes( async def _download_bytes(
@ -567,10 +571,13 @@ async def _download_bytes(
url: str, url: str,
timeout_seconds: float = 60.0, timeout_seconds: float = 60.0,
) -> bytes: ) -> bytes:
timeout = aiohttp.ClientTimeout(total=timeout_seconds) # Use asyncio.wait_for() instead of aiohttp ClientTimeout to avoid
async with session.get(url, timeout=timeout) as response: # "Timeout context manager should be used inside a task" errors.
response.raise_for_status() async def _do_download() -> bytes:
return await response.read() async with session.get(url) as response:
response.raise_for_status()
return await response.read()
return await asyncio.wait_for(_do_download(), timeout=timeout_seconds)
_WEIXIN_CDN_ALLOWLIST: frozenset[str] = frozenset( _WEIXIN_CDN_ALLOWLIST: frozenset[str] = frozenset(
@ -1216,7 +1223,12 @@ class WeixinAdapter(BasePlatformAdapter):
logger.debug("[%s] Token lock unavailable (non-fatal): %s", self.name, exc) logger.debug("[%s] Token lock unavailable (non-fatal): %s", self.name, exc)
self._poll_session = aiohttp.ClientSession(trust_env=True, connector=_make_ssl_connector()) self._poll_session = aiohttp.ClientSession(trust_env=True, connector=_make_ssl_connector())
self._send_session = aiohttp.ClientSession(trust_env=True, connector=_make_ssl_connector()) # Disable aiohttp's built-in ClientTimeout (total=None) to prevent
# "Timeout context manager should be used inside a task" errors when
# send() is invoked via asyncio.run_coroutine_threadsafe() from cron.
# Timeout is managed externally via asyncio.wait_for() in _api_post/_api_get.
_no_aiohttp_timeout = aiohttp.ClientTimeout(total=None, connect=None, sock_connect=None, sock_read=None)
self._send_session = aiohttp.ClientSession(trust_env=True, connector=_make_ssl_connector(), timeout=_no_aiohttp_timeout)
self._token_store.restore(self._account_id) self._token_store.restore(self._account_id)
self._poll_task = asyncio.create_task(self._poll_loop(), name="weixin-poll") self._poll_task = asyncio.create_task(self._poll_loop(), name="weixin-poll")
self._mark_connected() self._mark_connected()
@ -1824,10 +1836,14 @@ class WeixinAdapter(BasePlatformAdapter):
raise ValueError(f"Blocked unsafe URL (SSRF protection): {url}") raise ValueError(f"Blocked unsafe URL (SSRF protection): {url}")
assert self._send_session is not None assert self._send_session is not None
async with self._send_session.get(url, timeout=aiohttp.ClientTimeout(total=30)) as response: # Use asyncio.wait_for() instead of aiohttp ClientTimeout to avoid
response.raise_for_status() # "Timeout context manager should be used inside a task" errors.
data = await response.read() async def _do_fetch():
suffix = Path(url.split("?", 1)[0]).suffix or ".bin" async with self._send_session.get(url) as response:
response.raise_for_status()
return await response.read()
data = await asyncio.wait_for(_do_fetch(), timeout=30)
suffix = Path(url.split("?", 1)[0]).suffix or ".bin"
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as handle: with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as handle:
handle.write(data) handle.write(data)
return handle.name return handle.name