mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-25 00:51:20 +00:00
fix(gateway/weixin): split poll/send sessions, reuse live adapter for cron & send_message
- gateway/platforms/weixin.py: - Split aiohttp.ClientSession into _poll_session and _send_session - Add _LIVE_ADAPTERS registry so send_weixin_direct() reuses the connected gateway adapter instead of creating a competing session - Fixes silent message loss when gateway is running (iLink token contention) - cron/scheduler.py: - Support comma-separated deliver values (e.g. 'feishu,weixin') for multi-target delivery - Delay pconfig/enabled check until standalone fallback so live adapters work even when platform is not in gateway config - tools/send_message_tool.py: - Synthesize PlatformConfig from WEIXIN_* env vars when gateway config lacks a weixin entry - Fall back to WEIXIN_HOME_CHANNEL env var for home channel resolution - tests/gateway/test_weixin.py: - Update mocks to include _send_session
This commit is contained in:
parent
c60b6dc317
commit
5ca52bae5b
4 changed files with 235 additions and 134 deletions
|
|
@ -27,7 +27,7 @@ except ImportError:
|
|||
except ImportError:
|
||||
msvcrt = None
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
from typing import List, Optional
|
||||
|
||||
# Add parent directory to path for imports BEFORE repo-level imports.
|
||||
# Without this, standalone invocations (e.g. after `hermes update` reloads
|
||||
|
|
@ -76,15 +76,14 @@ def _resolve_origin(job: dict) -> Optional[dict]:
|
|||
return None
|
||||
|
||||
|
||||
def _resolve_delivery_target(job: dict) -> Optional[dict]:
|
||||
"""Resolve the concrete auto-delivery target for a cron job, if any."""
|
||||
deliver = job.get("deliver", "local")
|
||||
def _resolve_single_delivery_target(job: dict, deliver_value: str) -> Optional[dict]:
|
||||
"""Resolve one concrete auto-delivery target for a cron job."""
|
||||
origin = _resolve_origin(job)
|
||||
|
||||
if deliver == "local":
|
||||
if deliver_value == "local":
|
||||
return None
|
||||
|
||||
if deliver == "origin":
|
||||
if deliver_value == "origin":
|
||||
if origin:
|
||||
return {
|
||||
"platform": origin["platform"],
|
||||
|
|
@ -108,8 +107,8 @@ def _resolve_delivery_target(job: dict) -> Optional[dict]:
|
|||
}
|
||||
return None
|
||||
|
||||
if ":" in deliver:
|
||||
platform_name, rest = deliver.split(":", 1)
|
||||
if ":" in deliver_value:
|
||||
platform_name, rest = deliver_value.split(":", 1)
|
||||
platform_key = platform_name.lower()
|
||||
|
||||
from tools.send_message_tool import _parse_target_ref
|
||||
|
|
@ -139,7 +138,7 @@ def _resolve_delivery_target(job: dict) -> Optional[dict]:
|
|||
"thread_id": thread_id,
|
||||
}
|
||||
|
||||
platform_name = deliver
|
||||
platform_name = deliver_value
|
||||
if origin and origin.get("platform") == platform_name:
|
||||
return {
|
||||
"platform": platform_name,
|
||||
|
|
@ -160,6 +159,30 @@ def _resolve_delivery_target(job: dict) -> Optional[dict]:
|
|||
}
|
||||
|
||||
|
||||
def _resolve_delivery_targets(job: dict) -> List[dict]:
|
||||
"""Resolve all concrete auto-delivery targets for a cron job (supports comma-separated deliver)."""
|
||||
deliver = job.get("deliver", "local")
|
||||
if deliver == "local":
|
||||
return []
|
||||
parts = [p.strip() for p in str(deliver).split(",") if p.strip()]
|
||||
seen = set()
|
||||
targets = []
|
||||
for part in parts:
|
||||
target = _resolve_single_delivery_target(job, part)
|
||||
if target:
|
||||
key = (target["platform"].lower(), str(target["chat_id"]), target.get("thread_id"))
|
||||
if key not in seen:
|
||||
seen.add(key)
|
||||
targets.append(target)
|
||||
return targets
|
||||
|
||||
|
||||
def _resolve_delivery_target(job: dict) -> Optional[dict]:
|
||||
"""Resolve the concrete auto-delivery target for a cron job, if any."""
|
||||
targets = _resolve_delivery_targets(job)
|
||||
return targets[0] if targets else None
|
||||
|
||||
|
||||
# Media extension sets — keep in sync with gateway/platforms/base.py:_process_message_background
|
||||
_AUDIO_EXTS = frozenset({'.ogg', '.opus', '.mp3', '.wav', '.m4a'})
|
||||
_VIDEO_EXTS = frozenset({'.mp4', '.mov', '.avi', '.mkv', '.webm', '.3gp'})
|
||||
|
|
@ -200,7 +223,7 @@ def _send_media_via_adapter(adapter, chat_id: str, media_files: list, metadata:
|
|||
|
||||
def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> Optional[str]:
|
||||
"""
|
||||
Deliver job output to the configured target (origin chat, specific platform, etc.).
|
||||
Deliver job output to the configured target(s) (origin chat, specific platform, etc.).
|
||||
|
||||
When ``adapters`` and ``loop`` are provided (gateway is running), tries to
|
||||
use the live adapter first — this supports E2EE rooms (e.g. Matrix) where
|
||||
|
|
@ -209,33 +232,14 @@ def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> Option
|
|||
|
||||
Returns None on success, or an error string on failure.
|
||||
"""
|
||||
target = _resolve_delivery_target(job)
|
||||
if not target:
|
||||
targets = _resolve_delivery_targets(job)
|
||||
if not targets:
|
||||
if job.get("deliver", "local") != "local":
|
||||
msg = f"no delivery target resolved for deliver={job.get('deliver', 'local')}"
|
||||
logger.warning("Job '%s': %s", job["id"], msg)
|
||||
return msg
|
||||
return None # local-only jobs don't deliver — not a failure
|
||||
|
||||
platform_name = target["platform"]
|
||||
chat_id = target["chat_id"]
|
||||
thread_id = target.get("thread_id")
|
||||
|
||||
# Diagnostic: log thread_id for topic-aware delivery debugging
|
||||
origin = job.get("origin") or {}
|
||||
origin_thread = origin.get("thread_id")
|
||||
if origin_thread and not thread_id:
|
||||
logger.warning(
|
||||
"Job '%s': origin has thread_id=%s but delivery target lost it "
|
||||
"(deliver=%s, target=%s)",
|
||||
job["id"], origin_thread, job.get("deliver", "local"), target,
|
||||
)
|
||||
elif thread_id:
|
||||
logger.debug(
|
||||
"Job '%s': delivering to %s:%s thread_id=%s",
|
||||
job["id"], platform_name, chat_id, thread_id,
|
||||
)
|
||||
|
||||
from tools.send_message_tool import _send_to_platform
|
||||
from gateway.config import load_gateway_config, Platform
|
||||
|
||||
|
|
@ -258,24 +262,6 @@ def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> Option
|
|||
"bluebubbles": Platform.BLUEBUBBLES,
|
||||
"qqbot": Platform.QQBOT,
|
||||
}
|
||||
platform = platform_map.get(platform_name.lower())
|
||||
if not platform:
|
||||
msg = f"unknown platform '{platform_name}'"
|
||||
logger.warning("Job '%s': %s", job["id"], msg)
|
||||
return msg
|
||||
|
||||
try:
|
||||
config = load_gateway_config()
|
||||
except Exception as e:
|
||||
msg = f"failed to load gateway config: {e}"
|
||||
logger.error("Job '%s': %s", job["id"], msg)
|
||||
return msg
|
||||
|
||||
pconfig = config.platforms.get(platform)
|
||||
if not pconfig or not pconfig.enabled:
|
||||
msg = f"platform '{platform_name}' not configured/enabled"
|
||||
logger.warning("Job '%s': %s", job["id"], msg)
|
||||
return msg
|
||||
|
||||
# Optionally wrap the content with a header/footer so the user knows this
|
||||
# is a cron delivery. Wrapping is on by default; set cron.wrap_response: false
|
||||
|
|
@ -304,67 +290,117 @@ def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> Option
|
|||
from gateway.platforms.base import BasePlatformAdapter
|
||||
media_files, cleaned_delivery_content = BasePlatformAdapter.extract_media(delivery_content)
|
||||
|
||||
# Prefer the live adapter when the gateway is running — this supports E2EE
|
||||
# rooms (e.g. Matrix) where the standalone HTTP path cannot encrypt.
|
||||
runtime_adapter = (adapters or {}).get(platform)
|
||||
if runtime_adapter is not None and loop is not None and getattr(loop, "is_running", lambda: False)():
|
||||
send_metadata = {"thread_id": thread_id} if thread_id else None
|
||||
try:
|
||||
# Send cleaned text (MEDIA tags stripped) — not the raw content
|
||||
text_to_send = cleaned_delivery_content.strip()
|
||||
adapter_ok = True
|
||||
if text_to_send:
|
||||
future = asyncio.run_coroutine_threadsafe(
|
||||
runtime_adapter.send(chat_id, text_to_send, metadata=send_metadata),
|
||||
loop,
|
||||
)
|
||||
send_result = future.result(timeout=60)
|
||||
if send_result and not getattr(send_result, "success", True):
|
||||
err = getattr(send_result, "error", "unknown")
|
||||
logger.warning(
|
||||
"Job '%s': live adapter send to %s:%s failed (%s), falling back to standalone",
|
||||
job["id"], platform_name, chat_id, err,
|
||||
)
|
||||
adapter_ok = False # fall through to standalone path
|
||||
try:
|
||||
config = load_gateway_config()
|
||||
except Exception as e:
|
||||
msg = f"failed to load gateway config: {e}"
|
||||
logger.error("Job '%s': %s", job["id"], msg)
|
||||
return msg
|
||||
|
||||
# Send extracted media files as native attachments via the live adapter
|
||||
if adapter_ok and media_files:
|
||||
_send_media_via_adapter(runtime_adapter, chat_id, media_files, send_metadata, loop, job)
|
||||
delivery_errors = []
|
||||
|
||||
if adapter_ok:
|
||||
logger.info("Job '%s': delivered to %s:%s via live adapter", job["id"], platform_name, chat_id)
|
||||
return None
|
||||
except Exception as e:
|
||||
for target in targets:
|
||||
platform_name = target["platform"]
|
||||
chat_id = target["chat_id"]
|
||||
thread_id = target.get("thread_id")
|
||||
|
||||
# Diagnostic: log thread_id for topic-aware delivery debugging
|
||||
origin = job.get("origin") or {}
|
||||
origin_thread = origin.get("thread_id")
|
||||
if origin_thread and not thread_id:
|
||||
logger.warning(
|
||||
"Job '%s': live adapter delivery to %s:%s failed (%s), falling back to standalone",
|
||||
job["id"], platform_name, chat_id, e,
|
||||
"Job '%s': origin has thread_id=%s but delivery target lost it "
|
||||
"(deliver=%s, target=%s)",
|
||||
job["id"], origin_thread, job.get("deliver", "local"), target,
|
||||
)
|
||||
elif thread_id:
|
||||
logger.debug(
|
||||
"Job '%s': delivering to %s:%s thread_id=%s",
|
||||
job["id"], platform_name, chat_id, thread_id,
|
||||
)
|
||||
|
||||
# Standalone path: run the async send in a fresh event loop (safe from any thread)
|
||||
coro = _send_to_platform(platform, pconfig, chat_id, cleaned_delivery_content, thread_id=thread_id, media_files=media_files)
|
||||
try:
|
||||
result = asyncio.run(coro)
|
||||
except RuntimeError:
|
||||
# asyncio.run() checks for a running loop before awaiting the coroutine;
|
||||
# when it raises, the original coro was never started — close it to
|
||||
# prevent "coroutine was never awaited" RuntimeWarning, then retry in a
|
||||
# fresh thread that has no running loop.
|
||||
coro.close()
|
||||
import concurrent.futures
|
||||
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
|
||||
future = pool.submit(asyncio.run, _send_to_platform(platform, pconfig, chat_id, cleaned_delivery_content, thread_id=thread_id, media_files=media_files))
|
||||
result = future.result(timeout=30)
|
||||
except Exception as e:
|
||||
msg = f"delivery to {platform_name}:{chat_id} failed: {e}"
|
||||
logger.error("Job '%s': %s", job["id"], msg)
|
||||
return msg
|
||||
platform = platform_map.get(platform_name.lower())
|
||||
if not platform:
|
||||
msg = f"unknown platform '{platform_name}'"
|
||||
logger.warning("Job '%s': %s", job["id"], msg)
|
||||
delivery_errors.append(msg)
|
||||
continue
|
||||
|
||||
if result and result.get("error"):
|
||||
msg = f"delivery error: {result['error']}"
|
||||
logger.error("Job '%s': %s", job["id"], msg)
|
||||
return msg
|
||||
# Prefer the live adapter when the gateway is running — this supports E2EE
|
||||
# rooms (e.g. Matrix) where the standalone HTTP path cannot encrypt.
|
||||
runtime_adapter = (adapters or {}).get(platform)
|
||||
delivered = False
|
||||
if runtime_adapter is not None and loop is not None and getattr(loop, "is_running", lambda: False)():
|
||||
send_metadata = {"thread_id": thread_id} if thread_id else None
|
||||
try:
|
||||
# Send cleaned text (MEDIA tags stripped) — not the raw content
|
||||
text_to_send = cleaned_delivery_content.strip()
|
||||
adapter_ok = True
|
||||
if text_to_send:
|
||||
future = asyncio.run_coroutine_threadsafe(
|
||||
runtime_adapter.send(chat_id, text_to_send, metadata=send_metadata),
|
||||
loop,
|
||||
)
|
||||
send_result = future.result(timeout=60)
|
||||
if send_result and not getattr(send_result, "success", True):
|
||||
err = getattr(send_result, "error", "unknown")
|
||||
logger.warning(
|
||||
"Job '%s': live adapter send to %s:%s failed (%s), falling back to standalone",
|
||||
job["id"], platform_name, chat_id, err,
|
||||
)
|
||||
adapter_ok = False # fall through to standalone path
|
||||
|
||||
logger.info("Job '%s': delivered to %s:%s", job["id"], platform_name, chat_id)
|
||||
# Send extracted media files as native attachments via the live adapter
|
||||
if adapter_ok and media_files:
|
||||
_send_media_via_adapter(runtime_adapter, chat_id, media_files, send_metadata, loop, job)
|
||||
|
||||
if adapter_ok:
|
||||
logger.info("Job '%s': delivered to %s:%s via live adapter", job["id"], platform_name, chat_id)
|
||||
delivered = True
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
"Job '%s': live adapter delivery to %s:%s failed (%s), falling back to standalone",
|
||||
job["id"], platform_name, chat_id, e,
|
||||
)
|
||||
|
||||
if not delivered:
|
||||
pconfig = config.platforms.get(platform)
|
||||
if not pconfig or not pconfig.enabled:
|
||||
msg = f"platform '{platform_name}' not configured/enabled"
|
||||
logger.warning("Job '%s': %s", job["id"], msg)
|
||||
delivery_errors.append(msg)
|
||||
continue
|
||||
|
||||
# Standalone path: run the async send in a fresh event loop (safe from any thread)
|
||||
coro = _send_to_platform(platform, pconfig, chat_id, cleaned_delivery_content, thread_id=thread_id, media_files=media_files)
|
||||
try:
|
||||
result = asyncio.run(coro)
|
||||
except RuntimeError:
|
||||
# asyncio.run() checks for a running loop before awaiting the coroutine;
|
||||
# when it raises, the original coro was never started — close it to
|
||||
# prevent "coroutine was never awaited" RuntimeWarning, then retry in a
|
||||
# fresh thread that has no running loop.
|
||||
coro.close()
|
||||
import concurrent.futures
|
||||
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
|
||||
future = pool.submit(asyncio.run, _send_to_platform(platform, pconfig, chat_id, cleaned_delivery_content, thread_id=thread_id, media_files=media_files))
|
||||
result = future.result(timeout=30)
|
||||
except Exception as e:
|
||||
msg = f"delivery to {platform_name}:{chat_id} failed: {e}"
|
||||
logger.error("Job '%s': %s", job["id"], msg)
|
||||
delivery_errors.append(msg)
|
||||
continue
|
||||
|
||||
if result and result.get("error"):
|
||||
msg = f"delivery error: {result['error']}"
|
||||
logger.error("Job '%s': %s", job["id"], msg)
|
||||
delivery_errors.append(msg)
|
||||
continue
|
||||
|
||||
logger.info("Job '%s': delivered to %s:%s", job["id"], platform_name, chat_id)
|
||||
|
||||
if delivery_errors:
|
||||
return "; ".join(delivery_errors)
|
||||
return None
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -96,6 +96,8 @@ MEDIA_VIDEO = 2
|
|||
MEDIA_FILE = 3
|
||||
MEDIA_VOICE = 4
|
||||
|
||||
_LIVE_ADAPTERS: Dict[str, Any] = {}
|
||||
|
||||
ITEM_TEXT = 1
|
||||
ITEM_IMAGE = 2
|
||||
ITEM_VOICE = 3
|
||||
|
|
@ -1052,7 +1054,8 @@ class WeixinAdapter(BasePlatformAdapter):
|
|||
self._hermes_home = hermes_home
|
||||
self._token_store = ContextTokenStore(hermes_home)
|
||||
self._typing_cache = TypingTicketCache()
|
||||
self._session: Optional[aiohttp.ClientSession] = None
|
||||
self._poll_session: Optional[aiohttp.ClientSession] = None
|
||||
self._send_session: Optional[aiohttp.ClientSession] = None
|
||||
self._poll_task: Optional[asyncio.Task] = None
|
||||
self._dedup = MessageDeduplicator(ttl_seconds=MESSAGE_DEDUP_TTL_SECONDS)
|
||||
|
||||
|
|
@ -1127,14 +1130,17 @@ class WeixinAdapter(BasePlatformAdapter):
|
|||
except Exception as exc:
|
||||
logger.debug("[%s] Token lock unavailable (non-fatal): %s", self.name, exc)
|
||||
|
||||
self._session = aiohttp.ClientSession(trust_env=True)
|
||||
self._poll_session = aiohttp.ClientSession(trust_env=True)
|
||||
self._send_session = aiohttp.ClientSession(trust_env=True)
|
||||
self._token_store.restore(self._account_id)
|
||||
self._poll_task = asyncio.create_task(self._poll_loop(), name="weixin-poll")
|
||||
self._mark_connected()
|
||||
_LIVE_ADAPTERS[self._token] = self
|
||||
logger.info("[%s] Connected account=%s base=%s", self.name, _safe_id(self._account_id), self._base_url)
|
||||
return True
|
||||
|
||||
async def disconnect(self) -> None:
|
||||
_LIVE_ADAPTERS.pop(self._token, None)
|
||||
self._running = False
|
||||
if self._poll_task and not self._poll_task.done():
|
||||
self._poll_task.cancel()
|
||||
|
|
@ -1143,15 +1149,18 @@ class WeixinAdapter(BasePlatformAdapter):
|
|||
except asyncio.CancelledError:
|
||||
pass
|
||||
self._poll_task = None
|
||||
if self._session and not self._session.closed:
|
||||
await self._session.close()
|
||||
self._session = None
|
||||
if self._poll_session and not self._poll_session.closed:
|
||||
await self._poll_session.close()
|
||||
self._poll_session = None
|
||||
if self._send_session and not self._send_session.closed:
|
||||
await self._send_session.close()
|
||||
self._send_session = None
|
||||
self._release_platform_lock()
|
||||
self._mark_disconnected()
|
||||
logger.info("[%s] Disconnected", self.name)
|
||||
|
||||
async def _poll_loop(self) -> None:
|
||||
assert self._session is not None
|
||||
assert self._poll_session is not None
|
||||
sync_buf = _load_sync_buf(self._hermes_home, self._account_id)
|
||||
timeout_ms = LONG_POLL_TIMEOUT_MS
|
||||
consecutive_failures = 0
|
||||
|
|
@ -1159,7 +1168,7 @@ class WeixinAdapter(BasePlatformAdapter):
|
|||
while self._running:
|
||||
try:
|
||||
response = await _get_updates(
|
||||
self._session,
|
||||
self._poll_session,
|
||||
base_url=self._base_url,
|
||||
token=self._token,
|
||||
sync_buf=sync_buf,
|
||||
|
|
@ -1216,7 +1225,7 @@ class WeixinAdapter(BasePlatformAdapter):
|
|||
logger.error("[%s] unhandled inbound error from=%s: %s", self.name, _safe_id(message.get("from_user_id")), exc, exc_info=True)
|
||||
|
||||
async def _process_message(self, message: Dict[str, Any]) -> None:
|
||||
assert self._session is not None
|
||||
assert self._poll_session is not None
|
||||
sender_id = str(message.get("from_user_id") or "").strip()
|
||||
if not sender_id:
|
||||
return
|
||||
|
|
@ -1309,7 +1318,7 @@ class WeixinAdapter(BasePlatformAdapter):
|
|||
media = _media_reference(item, "image_item")
|
||||
try:
|
||||
data = await _download_and_decrypt_media(
|
||||
self._session,
|
||||
self._poll_session,
|
||||
cdn_base_url=self._cdn_base_url,
|
||||
encrypted_query_param=media.get("encrypt_query_param"),
|
||||
aes_key_b64=(item.get("image_item") or {}).get("aeskey")
|
||||
|
|
@ -1327,7 +1336,7 @@ class WeixinAdapter(BasePlatformAdapter):
|
|||
media = _media_reference(item, "video_item")
|
||||
try:
|
||||
data = await _download_and_decrypt_media(
|
||||
self._session,
|
||||
self._poll_session,
|
||||
cdn_base_url=self._cdn_base_url,
|
||||
encrypted_query_param=media.get("encrypt_query_param"),
|
||||
aes_key_b64=media.get("aes_key"),
|
||||
|
|
@ -1346,7 +1355,7 @@ class WeixinAdapter(BasePlatformAdapter):
|
|||
mime = _mime_from_filename(filename)
|
||||
try:
|
||||
data = await _download_and_decrypt_media(
|
||||
self._session,
|
||||
self._poll_session,
|
||||
cdn_base_url=self._cdn_base_url,
|
||||
encrypted_query_param=media.get("encrypt_query_param"),
|
||||
aes_key_b64=media.get("aes_key"),
|
||||
|
|
@ -1365,7 +1374,7 @@ class WeixinAdapter(BasePlatformAdapter):
|
|||
return None
|
||||
try:
|
||||
data = await _download_and_decrypt_media(
|
||||
self._session,
|
||||
self._poll_session,
|
||||
cdn_base_url=self._cdn_base_url,
|
||||
encrypted_query_param=media.get("encrypt_query_param"),
|
||||
aes_key_b64=media.get("aes_key"),
|
||||
|
|
@ -1378,13 +1387,13 @@ class WeixinAdapter(BasePlatformAdapter):
|
|||
return None
|
||||
|
||||
async def _maybe_fetch_typing_ticket(self, user_id: str, context_token: Optional[str]) -> None:
|
||||
if not self._session or not self._token:
|
||||
if not self._poll_session or not self._token:
|
||||
return
|
||||
if self._typing_cache.get(user_id):
|
||||
return
|
||||
try:
|
||||
response = await _get_config(
|
||||
self._session,
|
||||
self._poll_session,
|
||||
base_url=self._base_url,
|
||||
token=self._token,
|
||||
user_id=user_id,
|
||||
|
|
@ -1414,7 +1423,7 @@ class WeixinAdapter(BasePlatformAdapter):
|
|||
for attempt in range(self._send_chunk_retries + 1):
|
||||
try:
|
||||
await _send_message(
|
||||
self._session,
|
||||
self._send_session,
|
||||
base_url=self._base_url,
|
||||
token=self._token,
|
||||
to=chat_id,
|
||||
|
|
@ -1449,7 +1458,7 @@ class WeixinAdapter(BasePlatformAdapter):
|
|||
reply_to: Optional[str] = None,
|
||||
metadata: Optional[Dict[str, Any]] = None,
|
||||
) -> SendResult:
|
||||
if not self._session or not self._token:
|
||||
if not self._send_session or not self._token:
|
||||
return SendResult(success=False, error="Not connected")
|
||||
context_token = self._token_store.get(self._account_id, chat_id)
|
||||
last_message_id: Optional[str] = None
|
||||
|
|
@ -1508,14 +1517,14 @@ class WeixinAdapter(BasePlatformAdapter):
|
|||
return SendResult(success=False, error=str(exc))
|
||||
|
||||
async def send_typing(self, chat_id: str, metadata: Optional[Dict[str, Any]] = None) -> None:
|
||||
if not self._session or not self._token:
|
||||
if not self._send_session or not self._token:
|
||||
return
|
||||
typing_ticket = self._typing_cache.get(chat_id)
|
||||
if not typing_ticket:
|
||||
return
|
||||
try:
|
||||
await _send_typing(
|
||||
self._session,
|
||||
self._send_session,
|
||||
base_url=self._base_url,
|
||||
token=self._token,
|
||||
to_user_id=chat_id,
|
||||
|
|
@ -1526,14 +1535,14 @@ class WeixinAdapter(BasePlatformAdapter):
|
|||
logger.debug("[%s] typing start failed for %s: %s", self.name, _safe_id(chat_id), exc)
|
||||
|
||||
async def stop_typing(self, chat_id: str) -> None:
|
||||
if not self._session or not self._token:
|
||||
if not self._send_session or not self._token:
|
||||
return
|
||||
typing_ticket = self._typing_cache.get(chat_id)
|
||||
if not typing_ticket:
|
||||
return
|
||||
try:
|
||||
await _send_typing(
|
||||
self._session,
|
||||
self._send_session,
|
||||
base_url=self._base_url,
|
||||
token=self._token,
|
||||
to_user_id=chat_id,
|
||||
|
|
@ -1585,7 +1594,7 @@ class WeixinAdapter(BasePlatformAdapter):
|
|||
caption: str = "",
|
||||
metadata: Optional[Dict[str, Any]] = None,
|
||||
) -> SendResult:
|
||||
if not self._session or not self._token:
|
||||
if not self._send_session or not self._token:
|
||||
return SendResult(success=False, error="Not connected")
|
||||
try:
|
||||
message_id = await self._send_file(chat_id, file_path, caption)
|
||||
|
|
@ -1602,7 +1611,7 @@ class WeixinAdapter(BasePlatformAdapter):
|
|||
reply_to: Optional[str] = None,
|
||||
metadata: Optional[Dict[str, Any]] = None,
|
||||
) -> SendResult:
|
||||
if not self._session or not self._token:
|
||||
if not self._send_session or not self._token:
|
||||
return SendResult(success=False, error="Not connected")
|
||||
try:
|
||||
message_id = await self._send_file(chat_id, video_path, caption or "")
|
||||
|
|
@ -1619,7 +1628,7 @@ class WeixinAdapter(BasePlatformAdapter):
|
|||
reply_to: Optional[str] = None,
|
||||
metadata: Optional[Dict[str, Any]] = None,
|
||||
) -> SendResult:
|
||||
if not self._session or not self._token:
|
||||
if not self._send_session or not self._token:
|
||||
return SendResult(success=False, error="Not connected")
|
||||
|
||||
# Native outbound Weixin voice bubbles are not proven-working in the
|
||||
|
|
@ -1644,8 +1653,8 @@ class WeixinAdapter(BasePlatformAdapter):
|
|||
if not is_safe_url(url):
|
||||
raise ValueError(f"Blocked unsafe URL (SSRF protection): {url}")
|
||||
|
||||
assert self._session is not None
|
||||
async with self._session.get(url, timeout=aiohttp.ClientTimeout(total=30)) as response:
|
||||
assert self._send_session is not None
|
||||
async with self._send_session.get(url, timeout=aiohttp.ClientTimeout(total=30)) as response:
|
||||
response.raise_for_status()
|
||||
data = await response.read()
|
||||
suffix = Path(url.split("?", 1)[0]).suffix or ".bin"
|
||||
|
|
@ -1660,7 +1669,7 @@ class WeixinAdapter(BasePlatformAdapter):
|
|||
caption: str,
|
||||
force_file_attachment: bool = False,
|
||||
) -> str:
|
||||
assert self._session is not None and self._token is not None
|
||||
assert self._send_session is not None and self._token is not None
|
||||
plaintext = Path(path).read_bytes()
|
||||
media_type, item_builder = self._outbound_media_builder(path, force_file_attachment=force_file_attachment)
|
||||
filekey = secrets.token_hex(16)
|
||||
|
|
@ -1668,7 +1677,7 @@ class WeixinAdapter(BasePlatformAdapter):
|
|||
rawsize = len(plaintext)
|
||||
rawfilemd5 = hashlib.md5(plaintext).hexdigest()
|
||||
upload_response = await _get_upload_url(
|
||||
self._session,
|
||||
self._send_session,
|
||||
base_url=self._base_url,
|
||||
token=self._token,
|
||||
to_user_id=chat_id,
|
||||
|
|
@ -1694,7 +1703,7 @@ class WeixinAdapter(BasePlatformAdapter):
|
|||
raise RuntimeError(f"getUploadUrl returned neither upload_param nor upload_full_url: {upload_response}")
|
||||
|
||||
encrypted_query_param = await _upload_ciphertext(
|
||||
self._session,
|
||||
self._send_session,
|
||||
ciphertext=ciphertext,
|
||||
upload_url=upload_url,
|
||||
)
|
||||
|
|
@ -1722,7 +1731,7 @@ class WeixinAdapter(BasePlatformAdapter):
|
|||
if caption:
|
||||
last_message_id = f"hermes-weixin-{uuid.uuid4().hex}"
|
||||
await _send_message(
|
||||
self._session,
|
||||
self._send_session,
|
||||
base_url=self._base_url,
|
||||
token=self._token,
|
||||
to=chat_id,
|
||||
|
|
@ -1733,7 +1742,7 @@ class WeixinAdapter(BasePlatformAdapter):
|
|||
|
||||
last_message_id = f"hermes-weixin-{uuid.uuid4().hex}"
|
||||
await _api_post(
|
||||
self._session,
|
||||
self._send_session,
|
||||
base_url=self._base_url,
|
||||
endpoint=EP_SEND_MESSAGE,
|
||||
payload={
|
||||
|
|
@ -1857,6 +1866,33 @@ async def send_weixin_direct(
|
|||
token_store.restore(account_id)
|
||||
context_token = token_store.get(account_id, chat_id)
|
||||
|
||||
live_adapter = _LIVE_ADAPTERS.get(resolved_token)
|
||||
send_session = getattr(live_adapter, '_send_session', None)
|
||||
if live_adapter is not None and send_session is not None and not send_session.closed:
|
||||
last_result: Optional[SendResult] = None
|
||||
cleaned = live_adapter.format_message(message)
|
||||
if cleaned:
|
||||
last_result = await live_adapter.send(chat_id, cleaned)
|
||||
if not last_result.success:
|
||||
return {"error": f"Weixin send failed: {last_result.error}"}
|
||||
|
||||
for media_path, _is_voice in media_files or []:
|
||||
ext = Path(media_path).suffix.lower()
|
||||
if ext in {".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp"}:
|
||||
last_result = await live_adapter.send_image_file(chat_id, media_path)
|
||||
else:
|
||||
last_result = await live_adapter.send_document(chat_id, media_path)
|
||||
if not last_result.success:
|
||||
return {"error": f"Weixin media send failed: {last_result.error}"}
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"platform": "weixin",
|
||||
"chat_id": chat_id,
|
||||
"message_id": last_result.message_id if last_result else None,
|
||||
"context_token_used": bool(context_token),
|
||||
}
|
||||
|
||||
async with aiohttp.ClientSession(trust_env=True) as session:
|
||||
adapter = WeixinAdapter(
|
||||
PlatformConfig(
|
||||
|
|
@ -1870,6 +1906,7 @@ async def send_weixin_direct(
|
|||
},
|
||||
)
|
||||
)
|
||||
adapter._send_session = session
|
||||
adapter._session = session
|
||||
adapter._token = resolved_token
|
||||
adapter._account_id = account_id
|
||||
|
|
|
|||
|
|
@ -311,6 +311,7 @@ class TestWeixinChunkDelivery:
|
|||
def _connected_adapter(self) -> WeixinAdapter:
|
||||
adapter = _make_adapter()
|
||||
adapter._session = object()
|
||||
adapter._send_session = adapter._session
|
||||
adapter._token = "test-token"
|
||||
adapter._base_url = "https://weixin.example.com"
|
||||
adapter._token_store.get = lambda account_id, chat_id: "ctx-token"
|
||||
|
|
@ -420,6 +421,7 @@ class TestWeixinBlankMessagePrevention:
|
|||
def test_send_empty_content_does_not_call_send_message(self, send_message_mock):
|
||||
adapter = _make_adapter()
|
||||
adapter._session = object()
|
||||
adapter._send_session = adapter._session
|
||||
adapter._token = "test-token"
|
||||
adapter._base_url = "https://weixin.example.com"
|
||||
adapter._token_store.get = lambda account_id, chat_id: "ctx-token"
|
||||
|
|
|
|||
|
|
@ -215,7 +215,27 @@ def _handle_send(args):
|
|||
|
||||
pconfig = config.platforms.get(platform)
|
||||
if not pconfig or not pconfig.enabled:
|
||||
return tool_error(f"Platform '{platform_name}' is not configured. Set up credentials in ~/.hermes/config.yaml or environment variables.")
|
||||
# Weixin can be configured purely via .env; synthesize a pconfig so
|
||||
# send_message and cron delivery work without a gateway.yaml entry.
|
||||
if platform_name == "weixin":
|
||||
import os
|
||||
wx_token = os.getenv("WEIXIN_TOKEN", "").strip()
|
||||
wx_account = os.getenv("WEIXIN_ACCOUNT_ID", "").strip()
|
||||
if wx_token and wx_account:
|
||||
from gateway.config import PlatformConfig
|
||||
pconfig = PlatformConfig(
|
||||
enabled=True,
|
||||
token=wx_token,
|
||||
extra={
|
||||
"account_id": wx_account,
|
||||
"base_url": os.getenv("WEIXIN_BASE_URL", "").strip(),
|
||||
"cdn_base_url": os.getenv("WEIXIN_CDN_BASE_URL", "").strip(),
|
||||
},
|
||||
)
|
||||
else:
|
||||
return tool_error(f"Platform '{platform_name}' is not configured. Set up credentials in ~/.hermes/config.yaml or environment variables.")
|
||||
else:
|
||||
return tool_error(f"Platform '{platform_name}' is not configured. Set up credentials in ~/.hermes/config.yaml or environment variables.")
|
||||
|
||||
from gateway.platforms.base import BasePlatformAdapter
|
||||
|
||||
|
|
@ -225,6 +245,12 @@ def _handle_send(args):
|
|||
used_home_channel = False
|
||||
if not chat_id:
|
||||
home = config.get_home_channel(platform)
|
||||
if not home and platform_name == "weixin":
|
||||
import os
|
||||
wx_home = os.getenv("WEIXIN_HOME_CHANNEL", "").strip()
|
||||
if wx_home:
|
||||
from gateway.config import HomeChannel
|
||||
home = HomeChannel(platform=platform, chat_id=wx_home, name="Weixin Home")
|
||||
if home:
|
||||
chat_id = home.chat_id
|
||||
used_home_channel = True
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue