fix: add platform lock, send retry, message splitting, REST one-shot, shared strip_markdown

Improvements from our earlier #8269 salvage work applied to #7616:

- Platform token lock: acquire_scoped_lock/release_scoped_lock prevents
  two profiles from double-connecting the same QQ bot simultaneously
- Send retry with exponential backoff (3 attempts, 1s/2s/4s) with
  permanent vs transient error classification (matches Telegram pattern)
- Proper long-message splitting via truncate_message() instead of
  hard-truncating at MAX_MESSAGE_LENGTH (preserves code blocks, adds 1/N)
- REST-based one-shot send in send_message_tool — uses QQ Bot REST API
  directly with httpx instead of creating a full WebSocket adapter per
  message (fixes the connect→send race condition)
- Use shared strip_markdown() from helpers.py instead of 15 lines of
  inline regex with import-inside-method (DRY, same as BlueBubbles/SMS)
- format_message() now wired into send() pipeline
This commit is contained in:
Teknium 2026-04-13 23:15:59 -07:00 committed by Teknium
parent 4654f75627
commit 8d545da3ff
2 changed files with 106 additions and 52 deletions

View file

@ -67,6 +67,7 @@ from gateway.platforms.base import (
cache_document_from_bytes,
cache_image_from_bytes,
)
from gateway.platforms.helpers import strip_markdown
logger = logging.getLogger(__name__)
@ -218,6 +219,12 @@ class QQAdapter(BasePlatformAdapter):
logger.warning("[%s] %s", self.name, message)
return False
# Prevent duplicate connections with the same credentials
if not self._acquire_platform_lock(
"qqbot-appid", self._app_id, "QQBot app ID"
):
return False
try:
self._http_client = httpx.AsyncClient(timeout=30.0, follow_redirects=True)
@ -242,6 +249,7 @@ class QQAdapter(BasePlatformAdapter):
self._set_fatal_error("qq_connect_error", message, retryable=True)
logger.error("[%s] %s", self.name, message, exc_info=True)
await self._cleanup()
self._release_platform_lock()
return False
async def disconnect(self) -> None:
@ -266,6 +274,7 @@ class QQAdapter(BasePlatformAdapter):
self._heartbeat_task = None
await self._cleanup()
self._release_platform_lock()
logger.info("[%s] Disconnected", self.name)
async def _cleanup(self) -> None:
@ -1523,7 +1532,11 @@ class QQAdapter(BasePlatformAdapter):
reply_to: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> SendResult:
"""Send a text or markdown message to a QQ user or group."""
"""Send a text or markdown message to a QQ user or group.
Applies format_message(), splits long messages via truncate_message(),
and retries transient failures with exponential backoff.
"""
del metadata
if not self.is_connected:
@ -1532,24 +1545,53 @@ class QQAdapter(BasePlatformAdapter):
if not content or not content.strip():
return SendResult(success=True)
try:
# Determine routing
chat_type = self._guess_chat_type(chat_id)
is_reply = bool(reply_to)
formatted = self.format_message(content)
chunks = self.truncate_message(formatted, self.MAX_MESSAGE_LENGTH)
if chat_type == "c2c":
return await self._send_c2c_text(chat_id, content, reply_to)
elif chat_type == "group":
return await self._send_group_text(chat_id, content, reply_to)
elif chat_type == "guild":
return await self._send_guild_text(chat_id, content, reply_to)
else:
return SendResult(success=False, error=f"Unknown chat type for {chat_id}")
except asyncio.TimeoutError:
return SendResult(success=False, error="Timeout sending message to QQ")
except Exception as exc:
logger.error("[%s] Send failed: %s", self.name, exc)
return SendResult(success=False, error=str(exc))
last_result = SendResult(success=False, error="No chunks")
for chunk in chunks:
last_result = await self._send_chunk(chat_id, chunk, reply_to)
if not last_result.success:
return last_result
# Only reply_to the first chunk
reply_to = None
return last_result
async def _send_chunk(
self, chat_id: str, content: str, reply_to: Optional[str] = None,
) -> SendResult:
"""Send a single chunk with retry + exponential backoff."""
last_exc: Optional[Exception] = None
chat_type = self._guess_chat_type(chat_id)
for attempt in range(3):
try:
if chat_type == "c2c":
return await self._send_c2c_text(chat_id, content, reply_to)
elif chat_type == "group":
return await self._send_group_text(chat_id, content, reply_to)
elif chat_type == "guild":
return await self._send_guild_text(chat_id, content, reply_to)
else:
return SendResult(success=False, error=f"Unknown chat type for {chat_id}")
except Exception as exc:
last_exc = exc
err = str(exc).lower()
# Permanent errors — don't retry
if any(k in err for k in ("invalid", "forbidden", "not found", "bad request")):
break
# Transient — back off and retry
if attempt < 2:
delay = 1.0 * (2 ** attempt)
logger.warning("[%s] send retry %d/3 after %.1fs: %s",
self.name, attempt + 1, delay, exc)
await asyncio.sleep(delay)
error_msg = str(last_exc) if last_exc else "Unknown error"
logger.error("[%s] Send failed: %s", self.name, error_msg)
retryable = not any(k in error_msg.lower()
for k in ("invalid", "forbidden", "not found"))
return SendResult(success=False, error=error_msg, retryable=retryable)
async def _send_c2c_text(
self, openid: str, content: str, reply_to: Optional[str] = None
@ -1824,26 +1866,11 @@ class QQAdapter(BasePlatformAdapter):
"""Format message for QQ.
When markdown_support is enabled, content is sent as-is (QQ renders it).
When disabled, strip common Markdown patterns for plain-text display.
When disabled, strip markdown via shared helper (same as BlueBubbles/SMS).
"""
if self._markdown_support:
return content
# Strip markdown formatting for plain text
text = content
# Bold/italic/strikethrough
import re
text = re.sub(r'\*{1,2}([^*]+)\*{1,2}', r'\1', text)
text = re.sub(r'_{1,2}([^_]+)_{1,2}', r'\1', text)
text = re.sub(r'~~([^~]+)~~', r'\1', text)
# Code blocks
text = re.sub(r'```[\s\S]*?```', lambda m: m.group(0).split('\n', 1)[-1].rsplit('```', 1)[0] if '\n' in m.group(0) else m.group(0).replace('`', ''), text)
text = re.sub(r'`([^`]+)`', r'\1', text)
# Links
text = re.sub(r'\[([^\]]+)\]\(([^)]+)\)', r'\1 (\2)', text)
# Headers
text = re.sub(r'^#{1,6}\s+', '', text, flags=re.MULTILINE)
return text
return strip_markdown(content)
# ------------------------------------------------------------------
# Chat info

View file

@ -1042,28 +1042,55 @@ def _check_send_message():
async def _send_qqbot(pconfig, chat_id, message):
"""Send via QQ Bot API using the adapter's REST API."""
"""Send via QQBot using the REST API directly (no WebSocket needed).
Uses the QQ Bot Open Platform REST endpoints to get an access token
and post a message. Works for guild channels without requiring
a running gateway adapter.
"""
try:
from gateway.platforms.qqbot import QQAdapter, check_qq_requirements
if not check_qq_requirements():
return {"error": "QQBot requirements not met (need aiohttp + httpx)."}
import httpx
except ImportError:
return {"error": "QQBot adapter not available."}
return _error("QQBot direct send requires httpx. Run: pip install httpx")
extra = pconfig.extra or {}
appid = extra.get("app_id") or os.getenv("QQ_APP_ID", "")
secret = (pconfig.token or extra.get("client_secret")
or os.getenv("QQ_CLIENT_SECRET", ""))
if not appid or not secret:
return _error("QQBot: QQ_APP_ID / QQ_CLIENT_SECRET not configured.")
try:
adapter = QQAdapter(pconfig)
connected = await adapter.connect()
if not connected:
return _error("QQBot: failed to connect to server")
try:
result = await adapter.send(chat_id, message)
if not result.success:
return _error(f"QQ send failed: {result.error}")
return {"success": True, "platform": "qqbot", "chat_id": chat_id, "message_id": result.message_id}
finally:
await adapter.disconnect()
async with httpx.AsyncClient(timeout=15) as client:
# Step 1: Get access token
token_resp = await client.post(
"https://bots.qq.com/app/getAppAccessToken",
json={"appId": str(appid), "clientSecret": str(secret)},
)
if token_resp.status_code != 200:
return _error(f"QQBot token request failed: {token_resp.status_code}")
token_data = token_resp.json()
access_token = token_data.get("access_token")
if not access_token:
return _error(f"QQBot: no access_token in response")
# Step 2: Send message via REST
headers = {
"Authorization": f"QQBotAccessToken {access_token}",
"Content-Type": "application/json",
}
url = f"https://api.sgroup.qq.com/channels/{chat_id}/messages"
payload = {"content": message[:4000], "msg_type": 0}
resp = await client.post(url, json=payload, headers=headers)
if resp.status_code in (200, 201):
data = resp.json()
return {"success": True, "platform": "qqbot", "chat_id": chat_id,
"message_id": data.get("id")}
else:
return _error(f"QQBot send failed: {resp.status_code} {resp.text}")
except Exception as e:
return _error(f"QQ send failed: {e}")
return _error(f"QQBot send failed: {e}")
# --- Registry ---