mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-18 04:41:56 +00:00
Plugin platforms (IRC, Teams, Google Chat) currently fail with `No live adapter for platform '<name>'` when a `deliver=<plugin>` cron job runs in a separate process from the gateway, even though the platforms are eligible cron targets via `cron_deliver_env_var` (added in #21306). Built-in platforms (Telegram, Discord, Slack, etc.) use direct REST helpers in `tools/send_message_tool.py` so cron can deliver without holding the gateway in the same process; plugin platforms historically depended on `_gateway_runner_ref()` which returns `None` out of process. This change adds an optional `standalone_sender_fn` field to `PlatformEntry` so plugins can register an ephemeral send path that opens its own connection, sends, and closes without needing the live adapter. The dispatch site in `_send_via_adapter` falls through to the hook when the gateway runner is unavailable, with a descriptive error when neither path applies. The hook is optional, so existing plugins are unaffected. Reference migrations land in the same change for IRC, Teams, and Google Chat, exercising the hook across stdlib (asyncio + IRC protocol), Bot Framework OAuth client_credentials, and Google service-account flows respectively. Security hardening on the new code paths: * IRC: control-character stripping on chat_id and message body to block CRLF command injection; bounded nick-collision retries; JOIN before PRIVMSG so channels with the default `+n` mode accept the delivery. * Teams: TEAMS_SERVICE_URL validated against an allowlist of known Bot Framework hosts (`smba.trafficmanager.net`, `smba.infra.gov.teams.microsoft.us`) to block SSRF; chat_id and tenant_id constrained to the documented Bot Framework character set; per-request timeouts so a slow STS endpoint cannot starve the activity POST. * Google Chat: chat_id and thread_id validated against strict resource-name regexes; service-account refresh wrapped in `asyncio.wait_for` so a hung token endpoint cannot stall the scheduler. Test coverage: 20 new tests covering happy path, missing-config errors, network failure modes, and each defensive validation. Existing tests unchanged. `bash scripts/run_tests.sh tests/tools/test_send_message_tool.py tests/gateway/test_irc_adapter.py tests/gateway/test_teams.py tests/gateway/test_google_chat.py` reports 341 passed, 0 regressions. Documentation: new "Out-of-process cron delivery" section in website/docs/developer-guide/adding-platform-adapters.md and an entry in gateway/platforms/ADDING_A_PLATFORM.md naming the hook.
1883 lines
80 KiB
Python
1883 lines
80 KiB
Python
"""Send Message Tool -- cross-channel messaging via platform APIs.
|
|
|
|
Sends a message to a user or channel on any connected messaging platform
|
|
(Telegram, Discord, Slack). Supports listing available targets and resolving
|
|
human-friendly channel names to IDs. Works in both CLI and gateway contexts.
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import os
|
|
import re
|
|
import ssl
|
|
import time
|
|
from email.utils import formatdate
|
|
from typing import Dict, Optional
|
|
|
|
from agent.redact import redact_sensitive_text
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
_TELEGRAM_TOPIC_TARGET_RE = re.compile(r"^\s*(-?\d+)(?::(\d+))?\s*$")
|
|
_FEISHU_TARGET_RE = re.compile(r"^\s*((?:oc|ou|on|chat|open)_[-A-Za-z0-9]+)(?::([-A-Za-z0-9_]+))?\s*$")
|
|
# Slack conversation IDs: C (public channel), G (private/group channel), D (DM).
|
|
# Must be uppercase alphanumeric, 9+ chars. User IDs (U...) and workspace IDs
|
|
# (W...) are NOT valid chat.postMessage channel values — posting to them fails
|
|
# because the API requires a conversation ID. To DM a user you must first call
|
|
# conversations.open to obtain a D... ID. Without this gate, Slack IDs fall
|
|
# through to channel-name resolution, which only matches by name and fails.
|
|
_SLACK_TARGET_RE = re.compile(r"^\s*([CGD][A-Z0-9]{8,})\s*$")
|
|
_WEIXIN_TARGET_RE = re.compile(r"^\s*((?:wxid|gh|v\d+|wm|wb)_[A-Za-z0-9_-]+|[A-Za-z0-9._-]+@chatroom|filehelper)\s*$")
|
|
_YUANBAO_TARGET_RE = re.compile(r"^\s*((?:group|direct):[^:]+)\s*$")
|
|
# Discord snowflake IDs are numeric, same regex pattern as Telegram topic targets.
|
|
_NUMERIC_TOPIC_RE = _TELEGRAM_TOPIC_TARGET_RE
|
|
# Platforms that address recipients by phone number and accept E.164 format
|
|
# (with a leading '+'). Without this, "+15551234567" fails the isdigit() check
|
|
# below and falls through to channel-name resolution, which has no way to
|
|
# resolve a raw phone number. Keeping the '+' preserves the E.164 form that
|
|
# downstream adapters (signal, etc.) expect.
|
|
_PHONE_PLATFORMS = frozenset({"signal", "sms", "whatsapp"})
|
|
_E164_TARGET_RE = re.compile(r"^\s*\+(\d{7,15})\s*$")
|
|
_IMAGE_EXTS = {".jpg", ".jpeg", ".png", ".webp", ".gif"}
|
|
_VIDEO_EXTS = {".mp4", ".mov", ".avi", ".mkv", ".3gp"}
|
|
_AUDIO_EXTS = {".ogg", ".opus", ".mp3", ".wav", ".m4a", ".flac"}
|
|
_VOICE_EXTS = {".ogg", ".opus"}
|
|
# Telegram's Bot API sendAudio only accepts MP3 / M4A. Other audio
|
|
# formats either route through sendVoice (Opus/OGG) or fall back to
|
|
# document delivery.
|
|
_TELEGRAM_SEND_AUDIO_EXTS = {".mp3", ".m4a"}
|
|
_URL_SECRET_QUERY_RE = re.compile(
|
|
r"([?&](?:access_token|api[_-]?key|auth[_-]?token|token|signature|sig)=)([^&#\s]+)",
|
|
re.IGNORECASE,
|
|
)
|
|
_GENERIC_SECRET_ASSIGN_RE = re.compile(
|
|
r"\b(access_token|api[_-]?key|auth[_-]?token|signature|sig)\s*=\s*([^\s,;]+)",
|
|
re.IGNORECASE,
|
|
)
|
|
|
|
|
|
def _sanitize_error_text(text) -> str:
|
|
"""Redact secrets from error text before surfacing it to users/models."""
|
|
redacted = redact_sensitive_text(text)
|
|
redacted = _URL_SECRET_QUERY_RE.sub(lambda m: f"{m.group(1)}***", redacted)
|
|
redacted = _GENERIC_SECRET_ASSIGN_RE.sub(lambda m: f"{m.group(1)}=***", redacted)
|
|
return redacted
|
|
|
|
|
|
def _error(message: str) -> dict:
|
|
"""Build a standardized error payload with redacted content."""
|
|
return {"error": _sanitize_error_text(message)}
|
|
|
|
|
|
def _telegram_retry_delay(exc: Exception, attempt: int) -> float | None:
|
|
retry_after = getattr(exc, "retry_after", None)
|
|
if retry_after is not None:
|
|
try:
|
|
return max(float(retry_after), 0.0)
|
|
except (TypeError, ValueError):
|
|
return 1.0
|
|
|
|
text = str(exc).lower()
|
|
if "timed out" in text or "timeout" in text:
|
|
return None
|
|
if (
|
|
"bad gateway" in text
|
|
or "502" in text
|
|
or "too many requests" in text
|
|
or "429" in text
|
|
or "service unavailable" in text
|
|
or "503" in text
|
|
or "gateway timeout" in text
|
|
or "504" in text
|
|
):
|
|
return float(2 ** attempt)
|
|
return None
|
|
|
|
|
|
async def _send_telegram_message_with_retry(bot, *, attempts: int = 3, **kwargs):
|
|
for attempt in range(attempts):
|
|
try:
|
|
return await bot.send_message(**kwargs)
|
|
except Exception as exc:
|
|
delay = _telegram_retry_delay(exc, attempt)
|
|
if delay is None or attempt >= attempts - 1:
|
|
raise
|
|
logger.warning(
|
|
"Transient Telegram send failure (attempt %d/%d), retrying in %.1fs: %s",
|
|
attempt + 1,
|
|
attempts,
|
|
delay,
|
|
_sanitize_error_text(exc),
|
|
)
|
|
await asyncio.sleep(delay)
|
|
|
|
|
|
SEND_MESSAGE_SCHEMA = {
|
|
"name": "send_message",
|
|
"description": (
|
|
"Send a message to a connected messaging platform, or list available targets.\n\n"
|
|
"IMPORTANT: When the user asks to send to a specific channel or person "
|
|
"(not just a bare platform name), call send_message(action='list') FIRST to see "
|
|
"available targets, then send to the correct one.\n"
|
|
"If the user just says a platform name like 'send to telegram', send directly "
|
|
"to the home channel without listing first."
|
|
),
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"action": {
|
|
"type": "string",
|
|
"enum": ["send", "list"],
|
|
"description": "Action to perform. 'send' (default) sends a message. 'list' returns all available channels/contacts across connected platforms."
|
|
},
|
|
"target": {
|
|
"type": "string",
|
|
"description": "Delivery target. Format: 'platform' (uses home channel), 'platform:#channel-name', 'platform:chat_id', or 'platform:chat_id:thread_id' for Telegram topics and Discord threads. Examples: 'telegram', 'telegram:-1001234567890:17585', 'discord:999888777:555444333', 'discord:#bot-home', 'slack:#engineering', 'signal:+155****4567', 'matrix:!roomid:server.org', 'matrix:@user:server.org', 'yuanbao:direct:<account_id>' (DM), 'yuanbao:group:<group_code>' (group chat)"
|
|
},
|
|
"message": {
|
|
"type": "string",
|
|
"description": "The message text to send. To send an image or file, include MEDIA:<local_path> (e.g. 'MEDIA:/tmp/hermes/cache/img_xxx.jpg') in the message — the platform will deliver it as a native media attachment."
|
|
}
|
|
},
|
|
"required": []
|
|
}
|
|
}
|
|
|
|
|
|
def send_message_tool(args, **kw):
|
|
"""Handle cross-channel send_message tool calls."""
|
|
action = args.get("action", "send")
|
|
|
|
if action == "list":
|
|
return _handle_list()
|
|
|
|
return _handle_send(args)
|
|
|
|
|
|
def _handle_list():
|
|
"""Return formatted list of available messaging targets."""
|
|
try:
|
|
from gateway.channel_directory import format_directory_for_display
|
|
return json.dumps({"targets": format_directory_for_display()})
|
|
except Exception as e:
|
|
return json.dumps(_error(f"Failed to load channel directory: {e}"))
|
|
|
|
|
|
def _handle_send(args):
|
|
"""Send a message to a platform target."""
|
|
target = args.get("target", "")
|
|
message = args.get("message", "")
|
|
if not target or not message:
|
|
return tool_error("Both 'target' and 'message' are required when action='send'")
|
|
|
|
parts = target.split(":", 1)
|
|
platform_name = parts[0].strip().lower()
|
|
target_ref = parts[1].strip() if len(parts) > 1 else None
|
|
chat_id = None
|
|
thread_id = None
|
|
|
|
if target_ref:
|
|
chat_id, thread_id, is_explicit = _parse_target_ref(platform_name, target_ref)
|
|
else:
|
|
is_explicit = False
|
|
|
|
# Resolve human-friendly channel names to numeric IDs
|
|
if target_ref and not is_explicit:
|
|
try:
|
|
from gateway.channel_directory import resolve_channel_name
|
|
resolved = resolve_channel_name(platform_name, target_ref)
|
|
if resolved:
|
|
chat_id, thread_id, _ = _parse_target_ref(platform_name, resolved)
|
|
else:
|
|
return json.dumps({
|
|
"error": f"Could not resolve '{target_ref}' on {platform_name}. "
|
|
f"Use send_message(action='list') to see available targets."
|
|
})
|
|
except Exception:
|
|
return json.dumps({
|
|
"error": f"Could not resolve '{target_ref}' on {platform_name}. "
|
|
f"Try using a numeric channel ID instead."
|
|
})
|
|
|
|
from tools.interrupt import is_interrupted
|
|
if is_interrupted():
|
|
return tool_error("Interrupted")
|
|
|
|
try:
|
|
from gateway.config import load_gateway_config, Platform
|
|
config = load_gateway_config()
|
|
except Exception as e:
|
|
return json.dumps(_error(f"Failed to load gateway config: {e}"))
|
|
|
|
# Accept any platform name — built-in names resolve to their enum
|
|
# member, plugin platform names create dynamic members via _missing_().
|
|
try:
|
|
platform = Platform(platform_name)
|
|
except (ValueError, KeyError):
|
|
return tool_error(f"Unknown platform: {platform_name}")
|
|
|
|
pconfig = config.platforms.get(platform)
|
|
if not pconfig or not pconfig.enabled:
|
|
# Weixin can be configured purely via .env; synthesize a pconfig so
|
|
# send_message and cron delivery work without a gateway.yaml entry.
|
|
if platform_name == "weixin":
|
|
wx_token = os.getenv("WEIXIN_TOKEN", "").strip()
|
|
wx_account = os.getenv("WEIXIN_ACCOUNT_ID", "").strip()
|
|
if wx_token and wx_account:
|
|
from gateway.config import PlatformConfig
|
|
pconfig = PlatformConfig(
|
|
enabled=True,
|
|
token=wx_token,
|
|
extra={
|
|
"account_id": wx_account,
|
|
"base_url": os.getenv("WEIXIN_BASE_URL", "").strip(),
|
|
"cdn_base_url": os.getenv("WEIXIN_CDN_BASE_URL", "").strip(),
|
|
},
|
|
)
|
|
else:
|
|
return tool_error(f"Platform '{platform_name}' is not configured. Set up credentials in ~/.hermes/config.yaml or environment variables.")
|
|
else:
|
|
return tool_error(f"Platform '{platform_name}' is not configured. Set up credentials in ~/.hermes/config.yaml or environment variables.")
|
|
|
|
from gateway.platforms.base import BasePlatformAdapter
|
|
|
|
# Capture [[as_document]] directive before extract_media strips it.
|
|
# Image-extension files in this batch will route through send_document
|
|
# instead of send_photo so the original bytes survive (e.g. info-graph
|
|
# JPGs where Telegram's sendPhoto recompresses to 1280px).
|
|
force_document_attachments = "[[as_document]]" in message
|
|
|
|
media_files, cleaned_message = BasePlatformAdapter.extract_media(message)
|
|
mirror_text = cleaned_message.strip() or _describe_media_for_mirror(media_files)
|
|
|
|
used_home_channel = False
|
|
if not chat_id:
|
|
home = config.get_home_channel(platform)
|
|
if not home and platform_name == "weixin":
|
|
wx_home = os.getenv("WEIXIN_HOME_CHANNEL", "").strip()
|
|
if wx_home:
|
|
from gateway.config import HomeChannel
|
|
home = HomeChannel(platform=platform, chat_id=wx_home, name="Weixin Home")
|
|
if home:
|
|
chat_id = home.chat_id
|
|
used_home_channel = True
|
|
else:
|
|
return json.dumps({
|
|
"error": f"No home channel set for {platform_name} to determine where to send the message. "
|
|
f"Either specify a channel directly with '{platform_name}:CHANNEL_NAME', "
|
|
f"or set a home channel via: hermes config set {platform_name.upper()}_HOME_CHANNEL <channel_id>"
|
|
})
|
|
|
|
duplicate_skip = _maybe_skip_cron_duplicate_send(platform_name, chat_id, thread_id)
|
|
if duplicate_skip:
|
|
return json.dumps(duplicate_skip)
|
|
|
|
try:
|
|
from model_tools import _run_async
|
|
result = _run_async(
|
|
_send_to_platform(
|
|
platform,
|
|
pconfig,
|
|
chat_id,
|
|
cleaned_message,
|
|
thread_id=thread_id,
|
|
media_files=media_files,
|
|
force_document=force_document_attachments,
|
|
)
|
|
)
|
|
if used_home_channel and isinstance(result, dict) and result.get("success"):
|
|
result["note"] = f"Sent to {platform_name} home channel (chat_id: {chat_id})"
|
|
|
|
# Mirror the sent message into the target's gateway session
|
|
if isinstance(result, dict) and result.get("success") and mirror_text:
|
|
try:
|
|
from gateway.mirror import mirror_to_session
|
|
from gateway.session_context import get_session_env
|
|
source_label = get_session_env("HERMES_SESSION_PLATFORM", "cli")
|
|
user_id = get_session_env("HERMES_SESSION_USER_ID", "") or None
|
|
if mirror_to_session(
|
|
platform_name,
|
|
chat_id,
|
|
mirror_text,
|
|
source_label=source_label,
|
|
thread_id=thread_id,
|
|
user_id=user_id,
|
|
):
|
|
result["mirrored"] = True
|
|
except Exception:
|
|
pass
|
|
|
|
if isinstance(result, dict) and "error" in result:
|
|
result["error"] = _sanitize_error_text(result["error"])
|
|
return json.dumps(result)
|
|
except Exception as e:
|
|
return json.dumps(_error(f"Send failed: {e}"))
|
|
|
|
|
|
def _parse_target_ref(platform_name: str, target_ref: str):
|
|
"""Parse a tool target into chat_id/thread_id and whether it is explicit."""
|
|
if platform_name == "telegram":
|
|
match = _TELEGRAM_TOPIC_TARGET_RE.fullmatch(target_ref)
|
|
if match:
|
|
return match.group(1), match.group(2), True
|
|
if platform_name == "feishu":
|
|
match = _FEISHU_TARGET_RE.fullmatch(target_ref)
|
|
if match:
|
|
return match.group(1), match.group(2), True
|
|
if platform_name == "discord":
|
|
match = _NUMERIC_TOPIC_RE.fullmatch(target_ref)
|
|
if match:
|
|
return match.group(1), match.group(2), True
|
|
if platform_name == "slack":
|
|
match = _SLACK_TARGET_RE.fullmatch(target_ref)
|
|
if match:
|
|
return match.group(1), None, True
|
|
if platform_name == "weixin":
|
|
match = _WEIXIN_TARGET_RE.fullmatch(target_ref)
|
|
if match:
|
|
return match.group(1), None, True
|
|
if platform_name == "yuanbao":
|
|
match = _YUANBAO_TARGET_RE.fullmatch(target_ref)
|
|
if match:
|
|
return match.group(1), None, True
|
|
if target_ref.strip().isdigit():
|
|
return f"group:{target_ref.strip()}", None, True
|
|
return None, None, False
|
|
if platform_name in _PHONE_PLATFORMS:
|
|
match = _E164_TARGET_RE.fullmatch(target_ref)
|
|
if match:
|
|
# Preserve the leading '+' — signal-cli and sms/whatsapp adapters
|
|
# expect E.164 format for direct recipients.
|
|
return target_ref.strip(), None, True
|
|
if target_ref.lstrip("-").isdigit():
|
|
return target_ref, None, True
|
|
# Matrix room IDs (start with !) and user IDs (start with @) are explicit
|
|
if platform_name == "matrix" and (target_ref.startswith("!") or target_ref.startswith("@")):
|
|
return target_ref, None, True
|
|
return None, None, False
|
|
|
|
|
|
def _describe_media_for_mirror(media_files):
|
|
"""Return a human-readable mirror summary when a message only contains media."""
|
|
if not media_files:
|
|
return ""
|
|
if len(media_files) == 1:
|
|
media_path, is_voice = media_files[0]
|
|
ext = os.path.splitext(media_path)[1].lower()
|
|
if is_voice and ext in _VOICE_EXTS:
|
|
return "[Sent voice message]"
|
|
if ext in _IMAGE_EXTS:
|
|
return "[Sent image attachment]"
|
|
if ext in _VIDEO_EXTS:
|
|
return "[Sent video attachment]"
|
|
if ext in _AUDIO_EXTS:
|
|
return "[Sent audio attachment]"
|
|
return "[Sent document attachment]"
|
|
return f"[Sent {len(media_files)} media attachments]"
|
|
|
|
|
|
def _get_cron_auto_delivery_target():
|
|
"""Return the cron scheduler's auto-delivery target for the current run, if any."""
|
|
from gateway.session_context import get_session_env
|
|
platform = get_session_env("HERMES_CRON_AUTO_DELIVER_PLATFORM", "").strip().lower()
|
|
chat_id = get_session_env("HERMES_CRON_AUTO_DELIVER_CHAT_ID", "").strip()
|
|
if not platform or not chat_id:
|
|
return None
|
|
thread_id = get_session_env("HERMES_CRON_AUTO_DELIVER_THREAD_ID", "").strip() or None
|
|
return {
|
|
"platform": platform,
|
|
"chat_id": chat_id,
|
|
"thread_id": thread_id,
|
|
}
|
|
|
|
|
|
def _maybe_skip_cron_duplicate_send(platform_name: str, chat_id: str, thread_id: str | None):
|
|
"""Skip redundant cron send_message calls when the scheduler will auto-deliver there."""
|
|
auto_target = _get_cron_auto_delivery_target()
|
|
if not auto_target:
|
|
return None
|
|
|
|
same_target = (
|
|
auto_target["platform"] == platform_name
|
|
and str(auto_target["chat_id"]) == str(chat_id)
|
|
and auto_target.get("thread_id") == thread_id
|
|
)
|
|
if not same_target:
|
|
return None
|
|
|
|
target_label = f"{platform_name}:{chat_id}"
|
|
if thread_id is not None:
|
|
target_label += f":{thread_id}"
|
|
|
|
return {
|
|
"success": True,
|
|
"skipped": True,
|
|
"reason": "cron_auto_delivery_duplicate_target",
|
|
"target": target_label,
|
|
"note": (
|
|
f"Skipped send_message to {target_label}. This cron job will already auto-deliver "
|
|
"its final response to that same target. Put the intended user-facing content in "
|
|
"your final response instead, or use a different target if you want an additional message."
|
|
),
|
|
}
|
|
|
|
|
|
async def _send_via_adapter(
|
|
platform,
|
|
pconfig,
|
|
chat_id,
|
|
chunk,
|
|
*,
|
|
thread_id=None,
|
|
media_files=None,
|
|
force_document=False,
|
|
):
|
|
"""Send a message via a live gateway adapter, with a standalone fallback
|
|
for out-of-process callers (e.g. cron running separately from the gateway).
|
|
|
|
Order of attempts:
|
|
1. Live in-process adapter via ``_gateway_runner_ref()`` (the path that
|
|
existed before this change).
|
|
2. The plugin's ``standalone_sender_fn`` registered on its
|
|
``PlatformEntry`` (used when the gateway is not in this process, so
|
|
the runner weakref is ``None``).
|
|
3. A descriptive error explaining both options.
|
|
"""
|
|
runner = None
|
|
try:
|
|
from gateway.run import _gateway_runner_ref
|
|
runner = _gateway_runner_ref()
|
|
except Exception:
|
|
runner = None
|
|
|
|
if runner is not None:
|
|
try:
|
|
adapter = runner.adapters.get(platform)
|
|
except Exception:
|
|
adapter = None
|
|
if adapter is not None:
|
|
try:
|
|
result = await adapter.send(chat_id=chat_id, content=chunk)
|
|
except asyncio.CancelledError:
|
|
raise
|
|
except Exception as e:
|
|
return {"error": f"Plugin platform send failed: {e}"}
|
|
if result.success:
|
|
return {"success": True, "message_id": result.message_id}
|
|
return {"error": f"Adapter send failed: {result.error}"}
|
|
|
|
platform_name = platform.value if hasattr(platform, "value") else str(platform)
|
|
entry = None
|
|
try:
|
|
from gateway.platform_registry import platform_registry
|
|
entry = platform_registry.get(platform_name)
|
|
except Exception:
|
|
entry = None
|
|
|
|
if entry is not None and entry.standalone_sender_fn is not None:
|
|
try:
|
|
result = await entry.standalone_sender_fn(
|
|
pconfig,
|
|
chat_id,
|
|
chunk,
|
|
thread_id=thread_id,
|
|
media_files=media_files,
|
|
force_document=force_document,
|
|
)
|
|
except asyncio.CancelledError:
|
|
raise
|
|
except Exception as e:
|
|
logger.debug("Plugin standalone send for %s raised", platform_name, exc_info=True)
|
|
return {"error": f"Plugin standalone send failed: {e}"}
|
|
|
|
if isinstance(result, dict) and (result.get("success") or result.get("error")):
|
|
return result
|
|
return {
|
|
"error": (
|
|
f"Plugin standalone send for '{platform_name}' returned an "
|
|
f"invalid result: expected a dict with 'success' or 'error' "
|
|
f"keys, got {type(result).__name__}"
|
|
)
|
|
}
|
|
|
|
return {
|
|
"error": (
|
|
f"No live adapter for platform '{platform_name}'. Is the gateway "
|
|
f"running with this platform connected? For out-of-process delivery "
|
|
f"(e.g. cron in a separate process), the platform plugin must "
|
|
f"register a standalone_sender_fn on its PlatformEntry."
|
|
)
|
|
}
|
|
|
|
|
|
async def _send_to_platform(platform, pconfig, chat_id, message, thread_id=None, media_files=None, force_document=False):
|
|
"""Route a message to the appropriate platform sender.
|
|
|
|
Long messages are automatically chunked to fit within platform limits
|
|
using the same smart-splitting algorithm as the gateway adapters
|
|
(preserves code-block boundaries, adds part indicators).
|
|
"""
|
|
from gateway.config import Platform
|
|
from gateway.platforms.base import BasePlatformAdapter, utf16_len
|
|
from gateway.platforms.discord import DiscordAdapter
|
|
from gateway.platforms.slack import SlackAdapter
|
|
|
|
# Telegram adapter import is optional (requires python-telegram-bot)
|
|
try:
|
|
from gateway.platforms.telegram import TelegramAdapter
|
|
_telegram_available = True
|
|
except ImportError:
|
|
_telegram_available = False
|
|
|
|
# Feishu adapter import is optional (requires lark-oapi)
|
|
try:
|
|
from gateway.platforms.feishu import FeishuAdapter
|
|
_feishu_available = True
|
|
except ImportError:
|
|
_feishu_available = False
|
|
|
|
media_files = media_files or []
|
|
|
|
if platform == Platform.SLACK and message:
|
|
try:
|
|
slack_adapter = SlackAdapter.__new__(SlackAdapter)
|
|
message = slack_adapter.format_message(message)
|
|
except Exception:
|
|
logger.debug("Failed to apply Slack mrkdwn formatting in _send_to_platform", exc_info=True)
|
|
|
|
# Platform message length limits (from adapter class attributes)
|
|
_MAX_LENGTHS = {
|
|
Platform.TELEGRAM: TelegramAdapter.MAX_MESSAGE_LENGTH if _telegram_available else 4096,
|
|
Platform.DISCORD: DiscordAdapter.MAX_MESSAGE_LENGTH,
|
|
Platform.SLACK: SlackAdapter.MAX_MESSAGE_LENGTH,
|
|
}
|
|
if _feishu_available:
|
|
_MAX_LENGTHS[Platform.FEISHU] = FeishuAdapter.MAX_MESSAGE_LENGTH
|
|
|
|
# Check plugin registry for max_message_length
|
|
if platform not in _MAX_LENGTHS:
|
|
try:
|
|
from gateway.platform_registry import platform_registry
|
|
entry = platform_registry.get(platform.value)
|
|
if entry and entry.max_message_length > 0:
|
|
_MAX_LENGTHS[platform] = entry.max_message_length
|
|
except Exception:
|
|
pass
|
|
|
|
# Smart-chunk the message to fit within platform limits.
|
|
# For short messages or platforms without a known limit this is a no-op.
|
|
# Telegram measures length in UTF-16 code units, not Unicode codepoints.
|
|
max_len = _MAX_LENGTHS.get(platform)
|
|
if max_len:
|
|
_len_fn = utf16_len if platform == Platform.TELEGRAM else None
|
|
chunks = BasePlatformAdapter.truncate_message(message, max_len, len_fn=_len_fn)
|
|
else:
|
|
chunks = [message]
|
|
|
|
# --- Telegram: special handling for media attachments ---
|
|
if platform == Platform.TELEGRAM:
|
|
last_result = None
|
|
disable_link_previews = bool(getattr(pconfig, "extra", {}) and pconfig.extra.get("disable_link_previews"))
|
|
for i, chunk in enumerate(chunks):
|
|
is_last = (i == len(chunks) - 1)
|
|
result = await _send_telegram(
|
|
pconfig.token,
|
|
chat_id,
|
|
chunk,
|
|
media_files=media_files if is_last else [],
|
|
thread_id=thread_id,
|
|
disable_link_previews=disable_link_previews,
|
|
force_document=force_document,
|
|
)
|
|
if isinstance(result, dict) and result.get("error"):
|
|
return result
|
|
last_result = result
|
|
return last_result
|
|
|
|
# --- Weixin: use the native one-shot adapter helper for text + media ---
|
|
if platform == Platform.WEIXIN:
|
|
return await _send_weixin(pconfig, chat_id, message, media_files=media_files)
|
|
|
|
# --- Discord: special handling for media attachments ---
|
|
if platform == Platform.DISCORD:
|
|
last_result = None
|
|
for i, chunk in enumerate(chunks):
|
|
is_last = (i == len(chunks) - 1)
|
|
result = await _send_discord(
|
|
pconfig.token,
|
|
chat_id,
|
|
chunk,
|
|
media_files=media_files if is_last else [],
|
|
thread_id=thread_id,
|
|
)
|
|
if isinstance(result, dict) and result.get("error"):
|
|
return result
|
|
last_result = result
|
|
return last_result
|
|
|
|
# --- Matrix: use the native adapter helper when media is present ---
|
|
if platform == Platform.MATRIX and media_files:
|
|
last_result = None
|
|
for i, chunk in enumerate(chunks):
|
|
is_last = (i == len(chunks) - 1)
|
|
result = await _send_matrix_via_adapter(
|
|
pconfig,
|
|
chat_id,
|
|
chunk,
|
|
media_files=media_files if is_last else [],
|
|
thread_id=thread_id,
|
|
)
|
|
if isinstance(result, dict) and result.get("error"):
|
|
return result
|
|
last_result = result
|
|
return last_result
|
|
|
|
# --- Signal: native attachment support via JSON-RPC attachments param ---
|
|
if platform == Platform.SIGNAL and media_files:
|
|
last_result = None
|
|
for i, chunk in enumerate(chunks):
|
|
is_last = (i == len(chunks) - 1)
|
|
result = await _send_signal(
|
|
pconfig.extra,
|
|
chat_id,
|
|
chunk,
|
|
media_files=media_files if is_last else [],
|
|
)
|
|
if isinstance(result, dict) and result.get("error"):
|
|
return result
|
|
last_result = result
|
|
return last_result
|
|
|
|
# --- Yuanbao: native media attachment support via running gateway adapter ---
|
|
if platform == Platform.YUANBAO and media_files:
|
|
last_result = None
|
|
for i, chunk in enumerate(chunks):
|
|
is_last = (i == len(chunks) - 1)
|
|
result = await _send_yuanbao(
|
|
chat_id,
|
|
chunk,
|
|
media_files=media_files if is_last else None,
|
|
)
|
|
if isinstance(result, dict) and result.get("error"):
|
|
return result
|
|
last_result = result
|
|
return last_result
|
|
|
|
# --- Feishu: native media attachment support via adapter ---
|
|
if platform == Platform.FEISHU and media_files:
|
|
last_result = None
|
|
for i, chunk in enumerate(chunks):
|
|
is_last = (i == len(chunks) - 1)
|
|
result = await _send_feishu(
|
|
pconfig,
|
|
chat_id,
|
|
chunk,
|
|
media_files=media_files if is_last else None,
|
|
thread_id=thread_id,
|
|
)
|
|
if isinstance(result, dict) and result.get("error"):
|
|
return result
|
|
last_result = result
|
|
return last_result
|
|
|
|
# --- Non-media platforms ---
|
|
if media_files and not message.strip():
|
|
return {
|
|
"error": (
|
|
f"send_message MEDIA delivery is currently only supported for telegram, discord, matrix, weixin, signal, yuanbao and feishu; "
|
|
f"target {platform.value} had only media attachments"
|
|
)
|
|
}
|
|
warning = None
|
|
if media_files:
|
|
warning = (
|
|
f"MEDIA attachments were omitted for {platform.value}; "
|
|
"native send_message media delivery is currently only supported for telegram, discord, matrix, weixin, signal, yuanbao and feishu"
|
|
)
|
|
|
|
last_result = None
|
|
for chunk in chunks:
|
|
if platform == Platform.SLACK:
|
|
result = await _send_slack(pconfig.token, chat_id, chunk)
|
|
elif platform == Platform.WHATSAPP:
|
|
result = await _send_whatsapp(pconfig.extra, chat_id, chunk)
|
|
elif platform == Platform.SIGNAL:
|
|
result = await _send_signal(pconfig.extra, chat_id, chunk)
|
|
elif platform == Platform.EMAIL:
|
|
result = await _send_email(pconfig.extra, chat_id, chunk)
|
|
elif platform == Platform.SMS:
|
|
result = await _send_sms(pconfig.api_key, chat_id, chunk)
|
|
elif platform == Platform.MATTERMOST:
|
|
result = await _send_mattermost(pconfig.token, pconfig.extra, chat_id, chunk)
|
|
elif platform == Platform.MATRIX:
|
|
result = await _send_matrix(pconfig.token, pconfig.extra, chat_id, chunk)
|
|
elif platform == Platform.HOMEASSISTANT:
|
|
result = await _send_homeassistant(pconfig.token, pconfig.extra, chat_id, chunk)
|
|
elif platform == Platform.DINGTALK:
|
|
result = await _send_dingtalk(pconfig.extra, chat_id, chunk)
|
|
elif platform == Platform.FEISHU:
|
|
result = await _send_feishu(pconfig, chat_id, chunk, thread_id=thread_id)
|
|
elif platform == Platform.WECOM:
|
|
result = await _send_wecom(pconfig.extra, chat_id, chunk)
|
|
elif platform == Platform.BLUEBUBBLES:
|
|
result = await _send_bluebubbles(pconfig.extra, chat_id, chunk)
|
|
elif platform == Platform.QQBOT:
|
|
result = await _send_qqbot(pconfig, chat_id, chunk)
|
|
elif platform == Platform.YUANBAO:
|
|
result = await _send_yuanbao(chat_id, chunk)
|
|
else:
|
|
# Plugin platform: route through the gateway's live adapter if
|
|
# available, otherwise the plugin's standalone_sender_fn.
|
|
result = await _send_via_adapter(
|
|
platform,
|
|
pconfig,
|
|
chat_id,
|
|
chunk,
|
|
thread_id=thread_id,
|
|
media_files=media_files,
|
|
force_document=force_document,
|
|
)
|
|
|
|
if isinstance(result, dict) and result.get("error"):
|
|
return result
|
|
last_result = result
|
|
|
|
if warning and isinstance(last_result, dict) and last_result.get("success"):
|
|
warnings = list(last_result.get("warnings", []))
|
|
warnings.append(warning)
|
|
last_result["warnings"] = warnings
|
|
return last_result
|
|
|
|
|
|
async def _send_telegram(token, chat_id, message, media_files=None, thread_id=None, disable_link_previews=False, force_document=False):
|
|
"""Send via Telegram Bot API (one-shot, no polling needed).
|
|
|
|
Applies markdown→MarkdownV2 formatting (same as the gateway adapter)
|
|
so that bold, links, and headers render correctly. If the message
|
|
already contains HTML tags, it is sent with ``parse_mode='HTML'``
|
|
instead, bypassing MarkdownV2 conversion.
|
|
"""
|
|
try:
|
|
from telegram import Bot
|
|
from telegram.constants import ParseMode
|
|
|
|
# Auto-detect HTML tags — if present, skip MarkdownV2 and send as HTML.
|
|
# Inspired by github.com/ashaney — PR #1568.
|
|
_has_html = bool(re.search(r'<[a-zA-Z/][^>]*>', message))
|
|
|
|
if _has_html:
|
|
formatted = message
|
|
send_parse_mode = ParseMode.HTML
|
|
else:
|
|
# Reuse the gateway adapter's format_message for markdown→MarkdownV2
|
|
try:
|
|
from gateway.platforms.telegram import TelegramAdapter
|
|
_adapter = TelegramAdapter.__new__(TelegramAdapter)
|
|
formatted = _adapter.format_message(message)
|
|
except Exception:
|
|
# Fallback: send as-is if formatting unavailable
|
|
formatted = message
|
|
send_parse_mode = ParseMode.MARKDOWN_V2
|
|
|
|
bot = Bot(token=token)
|
|
int_chat_id = int(chat_id)
|
|
media_files = media_files or []
|
|
thread_kwargs = {}
|
|
if thread_id is not None:
|
|
# Reuse the gateway adapter's General-topic mapping: in Telegram
|
|
# forum supergroups, the General topic is addressed as
|
|
# message_thread_id="1" on incoming updates, but Bot API
|
|
# sendMessage rejects message_thread_id=1 with "Message thread
|
|
# not found". The adapter's helper maps "1" to None for that
|
|
# reason; the send_message tool needs the same mapping or a
|
|
# send to a forum group's General topic always errors out
|
|
# (see issue #22267).
|
|
try:
|
|
from gateway.platforms.telegram import TelegramAdapter
|
|
effective_thread_id = TelegramAdapter._message_thread_id_for_send(
|
|
str(thread_id)
|
|
)
|
|
except Exception:
|
|
# Fallback: explicit mapping in case the adapter import
|
|
# fails (e.g. python-telegram-bot missing in this venv).
|
|
effective_thread_id = (
|
|
None if str(thread_id) == "1" else int(thread_id)
|
|
)
|
|
if effective_thread_id is not None:
|
|
thread_kwargs["message_thread_id"] = effective_thread_id
|
|
if disable_link_previews:
|
|
thread_kwargs["disable_web_page_preview"] = True
|
|
|
|
last_msg = None
|
|
warnings = []
|
|
|
|
if formatted.strip():
|
|
try:
|
|
last_msg = await _send_telegram_message_with_retry(
|
|
bot,
|
|
chat_id=int_chat_id, text=formatted,
|
|
parse_mode=send_parse_mode, **thread_kwargs
|
|
)
|
|
except Exception as md_error:
|
|
# Parse failed, fall back to plain text
|
|
if "parse" in str(md_error).lower() or "markdown" in str(md_error).lower() or "html" in str(md_error).lower():
|
|
logger.warning(
|
|
"Parse mode %s failed in _send_telegram, falling back to plain text: %s",
|
|
send_parse_mode,
|
|
_sanitize_error_text(md_error),
|
|
)
|
|
if not _has_html:
|
|
try:
|
|
from gateway.platforms.telegram import _strip_mdv2
|
|
plain = _strip_mdv2(formatted)
|
|
except Exception:
|
|
plain = message
|
|
else:
|
|
plain = message
|
|
last_msg = await _send_telegram_message_with_retry(
|
|
bot,
|
|
chat_id=int_chat_id, text=plain,
|
|
parse_mode=None, **thread_kwargs
|
|
)
|
|
else:
|
|
raise
|
|
|
|
for media_path, is_voice in media_files:
|
|
if not os.path.exists(media_path):
|
|
warning = f"Media file not found, skipping: {media_path}"
|
|
logger.warning(warning)
|
|
warnings.append(warning)
|
|
continue
|
|
|
|
ext = os.path.splitext(media_path)[1].lower()
|
|
try:
|
|
with open(media_path, "rb") as f:
|
|
if ext in _IMAGE_EXTS and not force_document:
|
|
last_msg = await bot.send_photo(
|
|
chat_id=int_chat_id, photo=f, **thread_kwargs
|
|
)
|
|
elif ext in _VIDEO_EXTS:
|
|
last_msg = await bot.send_video(
|
|
chat_id=int_chat_id, video=f, **thread_kwargs
|
|
)
|
|
elif ext in _VOICE_EXTS and is_voice:
|
|
last_msg = await bot.send_voice(
|
|
chat_id=int_chat_id, voice=f, **thread_kwargs
|
|
)
|
|
elif ext in _TELEGRAM_SEND_AUDIO_EXTS:
|
|
last_msg = await bot.send_audio(
|
|
chat_id=int_chat_id, audio=f, **thread_kwargs
|
|
)
|
|
else:
|
|
last_msg = await bot.send_document(
|
|
chat_id=int_chat_id, document=f, **thread_kwargs
|
|
)
|
|
except Exception as e:
|
|
warning = _sanitize_error_text(f"Failed to send media {media_path}: {e}")
|
|
logger.error(warning)
|
|
warnings.append(warning)
|
|
|
|
if last_msg is None:
|
|
error = "No deliverable text or media remained after processing MEDIA tags"
|
|
if warnings:
|
|
return {"error": error, "warnings": warnings}
|
|
return {"error": error}
|
|
|
|
result = {
|
|
"success": True,
|
|
"platform": "telegram",
|
|
"chat_id": chat_id,
|
|
"message_id": str(last_msg.message_id),
|
|
}
|
|
if warnings:
|
|
result["warnings"] = warnings
|
|
return result
|
|
except ImportError:
|
|
return {"error": "python-telegram-bot not installed. Run: pip install python-telegram-bot"}
|
|
except Exception as e:
|
|
return _error(f"Telegram send failed: {e}")
|
|
|
|
|
|
def _derive_forum_thread_name(message: str) -> str:
|
|
"""Derive a thread name from the first line of the message, capped at 100 chars."""
|
|
first_line = message.strip().split("\n", 1)[0].strip()
|
|
# Strip common markdown heading prefixes
|
|
first_line = first_line.lstrip("#").strip()
|
|
if not first_line:
|
|
first_line = "New Post"
|
|
return first_line[:100]
|
|
|
|
|
|
# Process-local cache for Discord channel-type probes. Avoids re-probing the
|
|
# same channel on every send when the directory cache has no entry (e.g. fresh
|
|
# install, or channel created after the last directory build).
|
|
_DISCORD_CHANNEL_TYPE_PROBE_CACHE: Dict[str, bool] = {}
|
|
|
|
|
|
def _remember_channel_is_forum(chat_id: str, is_forum: bool) -> None:
|
|
_DISCORD_CHANNEL_TYPE_PROBE_CACHE[str(chat_id)] = bool(is_forum)
|
|
|
|
|
|
def _probe_is_forum_cached(chat_id: str) -> Optional[bool]:
|
|
return _DISCORD_CHANNEL_TYPE_PROBE_CACHE.get(str(chat_id))
|
|
|
|
|
|
async def _send_discord(token, chat_id, message, thread_id=None, media_files=None):
|
|
"""Send a single message via Discord REST API (no websocket client needed).
|
|
|
|
Chunking is handled by _send_to_platform() before this is called.
|
|
|
|
When thread_id is provided, the message is sent directly to that thread
|
|
via the /channels/{thread_id}/messages endpoint.
|
|
|
|
Media files are uploaded one-by-one via multipart/form-data after the
|
|
text message is sent (same pattern as Telegram).
|
|
|
|
Forum channels (type 15) reject POST /messages — a thread post is created
|
|
automatically via POST /channels/{id}/threads. Media files are uploaded
|
|
as multipart attachments on the starter message of the new thread.
|
|
|
|
Channel type is resolved from the channel directory first, then a
|
|
process-local probe cache, and only as a last resort with a live
|
|
GET /channels/{id} probe (whose result is memoized).
|
|
"""
|
|
try:
|
|
import aiohttp
|
|
except ImportError:
|
|
return {"error": "aiohttp not installed. Run: pip install aiohttp"}
|
|
try:
|
|
from gateway.platforms.base import resolve_proxy_url, proxy_kwargs_for_aiohttp
|
|
_proxy = resolve_proxy_url(platform_env_var="DISCORD_PROXY")
|
|
_sess_kw, _req_kw = proxy_kwargs_for_aiohttp(_proxy)
|
|
auth_headers = {"Authorization": f"Bot {token}"}
|
|
json_headers = {**auth_headers, "Content-Type": "application/json"}
|
|
media_files = media_files or []
|
|
last_data = None
|
|
warnings = []
|
|
|
|
# Thread endpoint: Discord threads are channels; send directly to the thread ID.
|
|
if thread_id:
|
|
url = f"https://discord.com/api/v10/channels/{thread_id}/messages"
|
|
else:
|
|
# Check if the target channel is a forum channel (type 15).
|
|
# Forum channels reject POST /messages — create a thread post instead.
|
|
# Three-layer detection: directory cache → process-local probe
|
|
# cache → GET /channels/{id} probe (with result memoized).
|
|
_channel_type = None
|
|
try:
|
|
from gateway.channel_directory import lookup_channel_type
|
|
_channel_type = lookup_channel_type("discord", chat_id)
|
|
except Exception:
|
|
pass
|
|
|
|
if _channel_type == "forum":
|
|
is_forum = True
|
|
elif _channel_type is not None:
|
|
is_forum = False
|
|
else:
|
|
cached = _probe_is_forum_cached(chat_id)
|
|
if cached is not None:
|
|
is_forum = cached
|
|
else:
|
|
is_forum = False
|
|
try:
|
|
info_url = f"https://discord.com/api/v10/channels/{chat_id}"
|
|
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=15), **_sess_kw) as info_sess:
|
|
async with info_sess.get(info_url, headers=json_headers, **_req_kw) as info_resp:
|
|
if info_resp.status == 200:
|
|
info = await info_resp.json()
|
|
is_forum = info.get("type") == 15
|
|
_remember_channel_is_forum(chat_id, is_forum)
|
|
except Exception:
|
|
logger.debug("Failed to probe channel type for %s", chat_id, exc_info=True)
|
|
|
|
if is_forum:
|
|
thread_name = _derive_forum_thread_name(message)
|
|
thread_url = f"https://discord.com/api/v10/channels/{chat_id}/threads"
|
|
|
|
# Filter to readable media files up front so we can pick the
|
|
# right code path (JSON vs multipart) before opening a session.
|
|
valid_media = []
|
|
for media_path, _is_voice in media_files:
|
|
if not os.path.exists(media_path):
|
|
warning = f"Media file not found, skipping: {media_path}"
|
|
logger.warning(warning)
|
|
warnings.append(warning)
|
|
continue
|
|
valid_media.append(media_path)
|
|
|
|
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=60), **_sess_kw) as session:
|
|
if valid_media:
|
|
# Multipart: payload_json + files[N] creates a forum
|
|
# thread with the starter message plus attachments in
|
|
# a single API call.
|
|
attachments_meta = [
|
|
{"id": str(idx), "filename": os.path.basename(path)}
|
|
for idx, path in enumerate(valid_media)
|
|
]
|
|
starter_message = {"content": message, "attachments": attachments_meta}
|
|
payload_json = json.dumps({"name": thread_name, "message": starter_message})
|
|
|
|
form = aiohttp.FormData()
|
|
form.add_field("payload_json", payload_json, content_type="application/json")
|
|
|
|
# Buffer file bytes up front — aiohttp's FormData can
|
|
# read lazily and we don't want handles closing under
|
|
# it on retry.
|
|
try:
|
|
for idx, media_path in enumerate(valid_media):
|
|
with open(media_path, "rb") as fh:
|
|
form.add_field(
|
|
f"files[{idx}]",
|
|
fh.read(),
|
|
filename=os.path.basename(media_path),
|
|
)
|
|
async with session.post(thread_url, headers=auth_headers, data=form, **_req_kw) as resp:
|
|
if resp.status not in (200, 201):
|
|
body = await resp.text()
|
|
return _error(f"Discord forum thread creation error ({resp.status}): {body}")
|
|
data = await resp.json()
|
|
except Exception as e:
|
|
return _error(_sanitize_error_text(f"Discord forum thread upload failed: {e}"))
|
|
else:
|
|
# No media — simple JSON POST creates the thread with
|
|
# just the text starter.
|
|
async with session.post(
|
|
thread_url,
|
|
headers=json_headers,
|
|
json={
|
|
"name": thread_name,
|
|
"message": {"content": message},
|
|
},
|
|
**_req_kw,
|
|
) as resp:
|
|
if resp.status not in (200, 201):
|
|
body = await resp.text()
|
|
return _error(f"Discord forum thread creation error ({resp.status}): {body}")
|
|
data = await resp.json()
|
|
|
|
thread_id_created = data.get("id")
|
|
starter_msg_id = (data.get("message") or {}).get("id", thread_id_created)
|
|
result = {
|
|
"success": True,
|
|
"platform": "discord",
|
|
"chat_id": chat_id,
|
|
"thread_id": thread_id_created,
|
|
"message_id": starter_msg_id,
|
|
}
|
|
if warnings:
|
|
result["warnings"] = warnings
|
|
return result
|
|
|
|
url = f"https://discord.com/api/v10/channels/{chat_id}/messages"
|
|
|
|
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=30), **_sess_kw) as session:
|
|
# Send text message (skip if empty and media is present)
|
|
if message.strip() or not media_files:
|
|
async with session.post(url, headers=json_headers, json={"content": message}, **_req_kw) as resp:
|
|
if resp.status not in (200, 201):
|
|
body = await resp.text()
|
|
return _error(f"Discord API error ({resp.status}): {body}")
|
|
last_data = await resp.json()
|
|
|
|
# Send each media file as a separate multipart upload
|
|
for media_path, _is_voice in media_files:
|
|
if not os.path.exists(media_path):
|
|
warning = f"Media file not found, skipping: {media_path}"
|
|
logger.warning(warning)
|
|
warnings.append(warning)
|
|
continue
|
|
try:
|
|
form = aiohttp.FormData()
|
|
filename = os.path.basename(media_path)
|
|
with open(media_path, "rb") as f:
|
|
form.add_field("files[0]", f, filename=filename)
|
|
async with session.post(url, headers=auth_headers, data=form, **_req_kw) as resp:
|
|
if resp.status not in (200, 201):
|
|
body = await resp.text()
|
|
warning = _sanitize_error_text(f"Failed to send media {media_path}: Discord API error ({resp.status}): {body}")
|
|
logger.error(warning)
|
|
warnings.append(warning)
|
|
continue
|
|
last_data = await resp.json()
|
|
except Exception as e:
|
|
warning = _sanitize_error_text(f"Failed to send media {media_path}: {e}")
|
|
logger.error(warning)
|
|
warnings.append(warning)
|
|
|
|
if last_data is None:
|
|
error = "No deliverable text or media remained after processing"
|
|
if warnings:
|
|
return {"error": error, "warnings": warnings}
|
|
return {"error": error}
|
|
|
|
result = {"success": True, "platform": "discord", "chat_id": chat_id, "message_id": last_data.get("id")}
|
|
if warnings:
|
|
result["warnings"] = warnings
|
|
return result
|
|
except Exception as e:
|
|
return _error(f"Discord send failed: {e}")
|
|
|
|
|
|
async def _send_slack(token, chat_id, message):
|
|
"""Send via Slack Web API."""
|
|
try:
|
|
import aiohttp
|
|
except ImportError:
|
|
return {"error": "aiohttp not installed. Run: pip install aiohttp"}
|
|
try:
|
|
from gateway.platforms.base import resolve_proxy_url, proxy_kwargs_for_aiohttp
|
|
_proxy = resolve_proxy_url()
|
|
_sess_kw, _req_kw = proxy_kwargs_for_aiohttp(_proxy)
|
|
url = "https://slack.com/api/chat.postMessage"
|
|
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
|
|
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=30), **_sess_kw) as session:
|
|
payload = {"channel": chat_id, "text": message, "mrkdwn": True}
|
|
async with session.post(url, headers=headers, json=payload, **_req_kw) as resp:
|
|
data = await resp.json()
|
|
if data.get("ok"):
|
|
return {"success": True, "platform": "slack", "chat_id": chat_id, "message_id": data.get("ts")}
|
|
return _error(f"Slack API error: {data.get('error', 'unknown')}")
|
|
except Exception as e:
|
|
return _error(f"Slack send failed: {e}")
|
|
|
|
|
|
async def _send_whatsapp(extra, chat_id, message):
|
|
"""Send via the local WhatsApp bridge HTTP API."""
|
|
try:
|
|
import aiohttp
|
|
except ImportError:
|
|
return {"error": "aiohttp not installed. Run: pip install aiohttp"}
|
|
try:
|
|
bridge_port = extra.get("bridge_port", 3000)
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.post(
|
|
f"http://localhost:{bridge_port}/send",
|
|
json={"chatId": chat_id, "message": message},
|
|
timeout=aiohttp.ClientTimeout(total=30),
|
|
) as resp:
|
|
if resp.status == 200:
|
|
data = await resp.json()
|
|
return {
|
|
"success": True,
|
|
"platform": "whatsapp",
|
|
"chat_id": chat_id,
|
|
"message_id": data.get("messageId"),
|
|
}
|
|
body = await resp.text()
|
|
return _error(f"WhatsApp bridge error ({resp.status}): {body}")
|
|
except Exception as e:
|
|
return _error(f"WhatsApp send failed: {e}")
|
|
|
|
|
|
async def _send_signal(extra, chat_id, message, media_files=None):
|
|
"""Send via signal-cli JSON-RPC API.
|
|
|
|
Supports both text-only and text-with-attachments (images/audio/documents).
|
|
Multi-attachment sends are chunked into batches of
|
|
SIGNAL_MAX_ATTACHMENTS_PER_MSG and metered by the process-wide
|
|
SignalAttachmentScheduler — same bucket the gateway adapter uses, so
|
|
sends from this tool and inbound-driven replies share rate-limit state.
|
|
"""
|
|
try:
|
|
import httpx
|
|
except ImportError:
|
|
return {"error": "httpx not installed"}
|
|
|
|
from gateway.platforms.signal_rate_limit import (
|
|
SIGNAL_BATCH_PACING_NOTICE_THRESHOLD,
|
|
SIGNAL_MAX_ATTACHMENTS_PER_MSG,
|
|
SIGNAL_RATE_LIMIT_MAX_ATTEMPTS,
|
|
_extract_retry_after_seconds,
|
|
_format_wait,
|
|
_is_signal_rate_limit_error,
|
|
_signal_send_timeout,
|
|
get_scheduler,
|
|
)
|
|
|
|
try:
|
|
http_url = extra.get("http_url", "http://127.0.0.1:8080").rstrip("/")
|
|
account = extra.get("account", "")
|
|
if not account:
|
|
return {"error": "Signal account not configured"}
|
|
|
|
valid_media = media_files or []
|
|
attachment_paths = []
|
|
for media_path, _is_voice in valid_media:
|
|
if os.path.exists(media_path):
|
|
attachment_paths.append(media_path)
|
|
else:
|
|
logger.warning("Signal media file not found, skipping: %s", media_path)
|
|
|
|
# Chunk attachments. With no attachments we still emit one batch
|
|
# (text only). With attachments, the text rides on batch #0 so the
|
|
# caption isn't repeated across every chunk.
|
|
if attachment_paths:
|
|
att_batches = [
|
|
attachment_paths[i:i + SIGNAL_MAX_ATTACHMENTS_PER_MSG]
|
|
for i in range(0, len(attachment_paths), SIGNAL_MAX_ATTACHMENTS_PER_MSG)
|
|
]
|
|
else:
|
|
att_batches = [[]]
|
|
|
|
async def _post(batch_attachments, batch_message):
|
|
params = {"account": account, "message": batch_message}
|
|
if chat_id.startswith("group:"):
|
|
params["groupId"] = chat_id[6:]
|
|
else:
|
|
params["recipient"] = [chat_id]
|
|
if batch_attachments:
|
|
params["attachments"] = batch_attachments
|
|
|
|
payload = {
|
|
"jsonrpc": "2.0",
|
|
"method": "send",
|
|
"params": params,
|
|
"id": f"send_{int(time.time() * 1000)}",
|
|
}
|
|
timeout = _signal_send_timeout(len(batch_attachments) if batch_attachments else 0)
|
|
async with httpx.AsyncClient(timeout=timeout) as client:
|
|
resp = await client.post(f"{http_url}/api/v1/rpc", json=payload)
|
|
resp.raise_for_status()
|
|
return resp.json()
|
|
|
|
async def _send_inline_notice(text: str) -> None:
|
|
"""Best-effort one-shot RPC for a user-facing pacing notice."""
|
|
notice_params = {"account": account, "message": text}
|
|
if chat_id.startswith("group:"):
|
|
notice_params["groupId"] = chat_id[6:]
|
|
else:
|
|
notice_params["recipient"] = [chat_id]
|
|
try:
|
|
async with httpx.AsyncClient(timeout=30.0) as _client:
|
|
await _client.post(
|
|
f"{http_url}/api/v1/rpc",
|
|
json={
|
|
"jsonrpc": "2.0",
|
|
"method": "send",
|
|
"params": notice_params,
|
|
"id": f"notice_{int(time.time() * 1000)}",
|
|
},
|
|
)
|
|
except Exception as _e:
|
|
logger.warning("Signal: inline notice failed: %s", _e)
|
|
|
|
scheduler = get_scheduler()
|
|
logger.info(
|
|
"send_message Signal: scheduler state=%s, %d attachment(s) in %d batch(es)",
|
|
scheduler.state(), len(attachment_paths), len(att_batches),
|
|
)
|
|
failed_batches: list[int] = []
|
|
for idx, att_batch in enumerate(att_batches):
|
|
n = len(att_batch)
|
|
if n > 0:
|
|
estimated = scheduler.estimate_wait(n)
|
|
if estimated >= SIGNAL_BATCH_PACING_NOTICE_THRESHOLD:
|
|
await _send_inline_notice(
|
|
f"(More images coming — pausing ~{_format_wait(estimated)} "
|
|
f"for Signal rate limit, batch {idx + 1}/{len(att_batches)}.)"
|
|
)
|
|
|
|
batch_message = message if idx == 0 else ""
|
|
|
|
for attempt in range(1, SIGNAL_RATE_LIMIT_MAX_ATTEMPTS + 1):
|
|
try:
|
|
await scheduler.acquire(n)
|
|
_rpc_t0 = time.monotonic()
|
|
data = await _post(att_batch, batch_message)
|
|
_rpc_duration = time.monotonic() - _rpc_t0
|
|
if "error" not in data:
|
|
await scheduler.report_rpc_duration(_rpc_duration, n)
|
|
break
|
|
|
|
err = data["error"]
|
|
|
|
if not _is_signal_rate_limit_error(err):
|
|
return _error(f"Signal RPC error on batch {idx + 1}/{len(att_batches)}: {err}")
|
|
|
|
server_retry_after = _extract_retry_after_seconds(err)
|
|
scheduler.feedback(server_retry_after, n)
|
|
|
|
if attempt >= SIGNAL_RATE_LIMIT_MAX_ATTEMPTS:
|
|
failed_batches.append(idx + 1)
|
|
logger.error(
|
|
"Signal: rate-limit retries exhausted on batch %d/%d "
|
|
"(%d attachments lost, server retry_after=%s)",
|
|
idx + 1, len(att_batches), n,
|
|
f"{server_retry_after:.0f}s" if server_retry_after else "unknown",
|
|
)
|
|
break
|
|
logger.warning(
|
|
"Signal: rate-limited on batch %d/%d "
|
|
"(attempt %d/%d, server retry_after=%s); "
|
|
"scheduler will pace the retry",
|
|
idx + 1, len(att_batches),
|
|
attempt, SIGNAL_RATE_LIMIT_MAX_ATTEMPTS,
|
|
f"{server_retry_after:.0f}s" if server_retry_after else "unknown",
|
|
)
|
|
except Exception as e:
|
|
if attempt >= SIGNAL_RATE_LIMIT_MAX_ATTEMPTS:
|
|
failed_batches.append(idx + 1)
|
|
logger.error(
|
|
"Signal: send error on batch %d/%d after %d attempts: %s",
|
|
idx + 1, len(att_batches), attempt, str(e)
|
|
)
|
|
break
|
|
logger.warning(
|
|
"Signal: transient error on batch %d/%d (attempt %d/%d): %s; will retry",
|
|
idx + 1, len(att_batches), attempt, SIGNAL_RATE_LIMIT_MAX_ATTEMPTS, str(e)
|
|
)
|
|
|
|
warnings = []
|
|
if len(attachment_paths) < len(valid_media):
|
|
warnings.append("Some media files were skipped (not found on disk)")
|
|
if failed_batches:
|
|
warnings.append(
|
|
f"Signal rate-limited {len(failed_batches)} batch(es) "
|
|
f"(#{', #'.join(str(b) for b in failed_batches)})"
|
|
)
|
|
|
|
if failed_batches and len(failed_batches) == len(att_batches):
|
|
return _error(
|
|
f"Signal: every batch ({len(att_batches)}) hit rate limit; "
|
|
f"no attachments delivered"
|
|
)
|
|
|
|
result = {"success": True, "platform": "signal", "chat_id": chat_id}
|
|
if warnings:
|
|
result["warnings"] = warnings
|
|
return result
|
|
except Exception as e:
|
|
return _error(f"Signal send failed: {e}")
|
|
|
|
|
|
async def _send_email(extra, chat_id, message):
|
|
"""Send via SMTP (one-shot, no persistent connection needed)."""
|
|
import smtplib
|
|
from email.mime.text import MIMEText
|
|
from email.utils import formatdate
|
|
|
|
address = extra.get("address") or os.getenv("EMAIL_ADDRESS", "")
|
|
password = os.getenv("EMAIL_PASSWORD", "")
|
|
smtp_host = extra.get("smtp_host") or os.getenv("EMAIL_SMTP_HOST", "")
|
|
try:
|
|
smtp_port = int(os.getenv("EMAIL_SMTP_PORT", "587"))
|
|
except (ValueError, TypeError):
|
|
smtp_port = 587
|
|
|
|
if not all([address, password, smtp_host]):
|
|
return {"error": "Email not configured (EMAIL_ADDRESS, EMAIL_PASSWORD, EMAIL_SMTP_HOST required)"}
|
|
|
|
try:
|
|
msg = MIMEText(message, "plain", "utf-8")
|
|
msg["From"] = address
|
|
msg["To"] = chat_id
|
|
msg["Subject"] = "Hermes Agent"
|
|
msg["Date"] = formatdate(localtime=True)
|
|
|
|
server = smtplib.SMTP(smtp_host, smtp_port)
|
|
server.starttls(context=ssl.create_default_context())
|
|
server.login(address, password)
|
|
server.send_message(msg)
|
|
server.quit()
|
|
return {"success": True, "platform": "email", "chat_id": chat_id}
|
|
except Exception as e:
|
|
return _error(f"Email send failed: {e}")
|
|
|
|
|
|
async def _send_sms(auth_token, chat_id, message):
|
|
"""Send a single SMS via Twilio REST API.
|
|
|
|
Uses HTTP Basic auth (Account SID : Auth Token) and form-encoded POST.
|
|
Chunking is handled by _send_to_platform() before this is called.
|
|
"""
|
|
try:
|
|
import aiohttp
|
|
except ImportError:
|
|
return {"error": "aiohttp not installed. Run: pip install aiohttp"}
|
|
|
|
import base64
|
|
|
|
account_sid = os.getenv("TWILIO_ACCOUNT_SID", "")
|
|
from_number = os.getenv("TWILIO_PHONE_NUMBER", "")
|
|
if not account_sid or not auth_token or not from_number:
|
|
return {"error": "SMS not configured (TWILIO_ACCOUNT_SID, TWILIO_AUTH_TOKEN, TWILIO_PHONE_NUMBER required)"}
|
|
|
|
# Strip markdown — SMS renders it as literal characters
|
|
message = re.sub(r"\*\*(.+?)\*\*", r"\1", message, flags=re.DOTALL)
|
|
message = re.sub(r"\*(.+?)\*", r"\1", message, flags=re.DOTALL)
|
|
message = re.sub(r"__(.+?)__", r"\1", message, flags=re.DOTALL)
|
|
message = re.sub(r"_(.+?)_", r"\1", message, flags=re.DOTALL)
|
|
message = re.sub(r"```[a-z]*\n?", "", message)
|
|
message = re.sub(r"`(.+?)`", r"\1", message)
|
|
message = re.sub(r"^#{1,6}\s+", "", message, flags=re.MULTILINE)
|
|
message = re.sub(r"\[([^\]]+)\]\([^\)]+\)", r"\1", message)
|
|
message = re.sub(r"\n{3,}", "\n\n", message)
|
|
message = message.strip()
|
|
|
|
try:
|
|
from gateway.platforms.base import resolve_proxy_url, proxy_kwargs_for_aiohttp
|
|
_proxy = resolve_proxy_url()
|
|
_sess_kw, _req_kw = proxy_kwargs_for_aiohttp(_proxy)
|
|
creds = f"{account_sid}:{auth_token}"
|
|
encoded = base64.b64encode(creds.encode("ascii")).decode("ascii")
|
|
url = f"https://api.twilio.com/2010-04-01/Accounts/{account_sid}/Messages.json"
|
|
headers = {"Authorization": f"Basic {encoded}"}
|
|
|
|
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=30), **_sess_kw) as session:
|
|
form_data = aiohttp.FormData()
|
|
form_data.add_field("From", from_number)
|
|
form_data.add_field("To", chat_id)
|
|
form_data.add_field("Body", message)
|
|
|
|
async with session.post(url, data=form_data, headers=headers, **_req_kw) as resp:
|
|
body = await resp.json()
|
|
if resp.status >= 400:
|
|
error_msg = body.get("message", str(body))
|
|
return _error(f"Twilio API error ({resp.status}): {error_msg}")
|
|
msg_sid = body.get("sid", "")
|
|
return {"success": True, "platform": "sms", "chat_id": chat_id, "message_id": msg_sid}
|
|
except Exception as e:
|
|
return _error(f"SMS send failed: {e}")
|
|
|
|
|
|
async def _send_mattermost(token, extra, chat_id, message):
|
|
"""Send via Mattermost REST API."""
|
|
try:
|
|
import aiohttp
|
|
except ImportError:
|
|
return {"error": "aiohttp not installed. Run: pip install aiohttp"}
|
|
try:
|
|
base_url = (extra.get("url") or os.getenv("MATTERMOST_URL", "")).rstrip("/")
|
|
token = token or os.getenv("MATTERMOST_TOKEN", "")
|
|
if not base_url or not token:
|
|
return {"error": "Mattermost not configured (MATTERMOST_URL, MATTERMOST_TOKEN required)"}
|
|
url = f"{base_url}/api/v4/posts"
|
|
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
|
|
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=30)) as session:
|
|
async with session.post(url, headers=headers, json={"channel_id": chat_id, "message": message}) as resp:
|
|
if resp.status not in (200, 201):
|
|
body = await resp.text()
|
|
return _error(f"Mattermost API error ({resp.status}): {body}")
|
|
data = await resp.json()
|
|
return {"success": True, "platform": "mattermost", "chat_id": chat_id, "message_id": data.get("id")}
|
|
except Exception as e:
|
|
return _error(f"Mattermost send failed: {e}")
|
|
|
|
|
|
async def _send_matrix(token, extra, chat_id, message):
|
|
"""Send via Matrix Client-Server API.
|
|
|
|
Converts markdown to HTML for rich rendering in Matrix clients.
|
|
Falls back to plain text if the ``markdown`` library is not installed.
|
|
"""
|
|
try:
|
|
import aiohttp
|
|
except ImportError:
|
|
return {"error": "aiohttp not installed. Run: pip install aiohttp"}
|
|
try:
|
|
homeserver = (extra.get("homeserver") or os.getenv("MATRIX_HOMESERVER", "")).rstrip("/")
|
|
token = token or os.getenv("MATRIX_ACCESS_TOKEN", "")
|
|
if not homeserver or not token:
|
|
return {"error": "Matrix not configured (MATRIX_HOMESERVER, MATRIX_ACCESS_TOKEN required)"}
|
|
txn_id = f"hermes_{int(time.time() * 1000)}_{os.urandom(4).hex()}"
|
|
from urllib.parse import quote
|
|
encoded_room = quote(chat_id, safe="")
|
|
url = f"{homeserver}/_matrix/client/v3/rooms/{encoded_room}/send/m.room.message/{txn_id}"
|
|
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
|
|
|
|
# Build message payload with optional HTML formatted_body.
|
|
payload = {"msgtype": "m.text", "body": message}
|
|
try:
|
|
import markdown as _md
|
|
html = _md.markdown(message, extensions=["fenced_code", "tables"])
|
|
# Convert h1-h6 to bold for Element X compatibility.
|
|
html = re.sub(r"<h[1-6]>(.*?)</h[1-6]>", r"<strong>\1</strong>", html)
|
|
payload["format"] = "org.matrix.custom.html"
|
|
payload["formatted_body"] = html
|
|
except ImportError:
|
|
pass
|
|
|
|
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=30)) as session:
|
|
async with session.put(url, headers=headers, json=payload) as resp:
|
|
if resp.status not in (200, 201):
|
|
body = await resp.text()
|
|
return _error(f"Matrix API error ({resp.status}): {body}")
|
|
data = await resp.json()
|
|
return {"success": True, "platform": "matrix", "chat_id": chat_id, "message_id": data.get("event_id")}
|
|
except Exception as e:
|
|
return _error(f"Matrix send failed: {e}")
|
|
|
|
|
|
async def _send_matrix_via_adapter(pconfig, chat_id, message, media_files=None, thread_id=None):
|
|
"""Send via the Matrix adapter so native Matrix media uploads are preserved."""
|
|
try:
|
|
from gateway.platforms.matrix import MatrixAdapter
|
|
except ImportError:
|
|
return {"error": "Matrix dependencies not installed. Run: pip install 'mautrix[encryption]'"}
|
|
|
|
media_files = media_files or []
|
|
|
|
try:
|
|
adapter = MatrixAdapter(pconfig)
|
|
connected = await adapter.connect()
|
|
if not connected:
|
|
return _error("Matrix connect failed")
|
|
|
|
metadata = {"thread_id": thread_id} if thread_id else None
|
|
last_result = None
|
|
|
|
if message.strip():
|
|
last_result = await adapter.send(chat_id, message, metadata=metadata)
|
|
if not last_result.success:
|
|
return _error(f"Matrix send failed: {last_result.error}")
|
|
|
|
for media_path, is_voice in media_files:
|
|
if not os.path.exists(media_path):
|
|
return _error(f"Media file not found: {media_path}")
|
|
|
|
ext = os.path.splitext(media_path)[1].lower()
|
|
if ext in _IMAGE_EXTS:
|
|
last_result = await adapter.send_image_file(chat_id, media_path, metadata=metadata)
|
|
elif ext in _VIDEO_EXTS:
|
|
last_result = await adapter.send_video(chat_id, media_path, metadata=metadata)
|
|
elif ext in _VOICE_EXTS and is_voice:
|
|
last_result = await adapter.send_voice(chat_id, media_path, metadata=metadata)
|
|
elif ext in _AUDIO_EXTS:
|
|
last_result = await adapter.send_voice(chat_id, media_path, metadata=metadata)
|
|
else:
|
|
last_result = await adapter.send_document(chat_id, media_path, metadata=metadata)
|
|
|
|
if not last_result.success:
|
|
return _error(f"Matrix media send failed: {last_result.error}")
|
|
|
|
if last_result is None:
|
|
return {"error": "No deliverable text or media remained after processing MEDIA tags"}
|
|
|
|
return {
|
|
"success": True,
|
|
"platform": "matrix",
|
|
"chat_id": chat_id,
|
|
"message_id": last_result.message_id,
|
|
}
|
|
except Exception as e:
|
|
return _error(f"Matrix send failed: {e}")
|
|
finally:
|
|
try:
|
|
await adapter.disconnect()
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
async def _send_homeassistant(token, extra, chat_id, message):
|
|
"""Send via Home Assistant notify service."""
|
|
try:
|
|
import aiohttp
|
|
except ImportError:
|
|
return {"error": "aiohttp not installed. Run: pip install aiohttp"}
|
|
try:
|
|
hass_url = (extra.get("url") or os.getenv("HASS_URL", "")).rstrip("/")
|
|
token = token or os.getenv("HASS_TOKEN", "")
|
|
if not hass_url or not token:
|
|
return {"error": "Home Assistant not configured (HASS_URL, HASS_TOKEN required)"}
|
|
url = f"{hass_url}/api/services/notify/notify"
|
|
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
|
|
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=30)) as session:
|
|
async with session.post(url, headers=headers, json={"message": message, "target": chat_id}) as resp:
|
|
if resp.status not in (200, 201):
|
|
body = await resp.text()
|
|
return _error(f"Home Assistant API error ({resp.status}): {body}")
|
|
return {"success": True, "platform": "homeassistant", "chat_id": chat_id}
|
|
except Exception as e:
|
|
return _error(f"Home Assistant send failed: {e}")
|
|
|
|
|
|
async def _send_dingtalk(extra, chat_id, message):
|
|
"""Send via DingTalk robot webhook.
|
|
|
|
Note: The gateway's DingTalk adapter uses per-session webhook URLs from
|
|
incoming messages (dingtalk-stream SDK). For cross-platform send_message
|
|
delivery we use a static robot webhook URL instead, which must be
|
|
configured via ``DINGTALK_WEBHOOK_URL`` env var or ``webhook_url`` in the
|
|
platform's extra config.
|
|
"""
|
|
try:
|
|
import httpx
|
|
except ImportError:
|
|
return {"error": "httpx not installed"}
|
|
try:
|
|
webhook_url = extra.get("webhook_url") or os.getenv("DINGTALK_WEBHOOK_URL", "")
|
|
if not webhook_url:
|
|
return {"error": "DingTalk not configured. Set DINGTALK_WEBHOOK_URL env var or webhook_url in dingtalk platform extra config."}
|
|
async with httpx.AsyncClient(timeout=30.0) as client:
|
|
resp = await client.post(
|
|
webhook_url,
|
|
json={"msgtype": "text", "text": {"content": message}},
|
|
)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
if data.get("errcode", 0) != 0:
|
|
return _error(f"DingTalk API error: {data.get('errmsg', 'unknown')}")
|
|
return {"success": True, "platform": "dingtalk", "chat_id": chat_id}
|
|
except Exception as e:
|
|
return _error(f"DingTalk send failed: {e}")
|
|
|
|
|
|
async def _send_wecom(extra, chat_id, message):
|
|
"""Send via WeCom using the adapter's WebSocket send pipeline."""
|
|
try:
|
|
from gateway.platforms.wecom import WeComAdapter, check_wecom_requirements
|
|
if not check_wecom_requirements():
|
|
return {"error": "WeCom requirements not met. Need aiohttp + WECOM_BOT_ID/SECRET."}
|
|
except ImportError:
|
|
return {"error": "WeCom adapter not available."}
|
|
|
|
try:
|
|
from gateway.config import PlatformConfig
|
|
pconfig = PlatformConfig(extra=extra)
|
|
adapter = WeComAdapter(pconfig)
|
|
connected = await adapter.connect()
|
|
if not connected:
|
|
return _error(f"WeCom: failed to connect - {adapter.fatal_error_message or 'unknown error'}")
|
|
try:
|
|
result = await adapter.send(chat_id, message)
|
|
if not result.success:
|
|
return _error(f"WeCom send failed: {result.error}")
|
|
return {"success": True, "platform": "wecom", "chat_id": chat_id, "message_id": result.message_id}
|
|
finally:
|
|
await adapter.disconnect()
|
|
except Exception as e:
|
|
return _error(f"WeCom send failed: {e}")
|
|
|
|
|
|
async def _send_weixin(pconfig, chat_id, message, media_files=None):
|
|
"""Send via Weixin iLink using the native adapter helper."""
|
|
try:
|
|
from gateway.platforms.weixin import check_weixin_requirements, send_weixin_direct
|
|
if not check_weixin_requirements():
|
|
return {"error": "Weixin requirements not met. Need aiohttp + cryptography."}
|
|
except ImportError:
|
|
return {"error": "Weixin adapter not available."}
|
|
|
|
try:
|
|
return await send_weixin_direct(
|
|
extra=pconfig.extra,
|
|
token=pconfig.token,
|
|
chat_id=chat_id,
|
|
message=message,
|
|
media_files=media_files,
|
|
)
|
|
except Exception as e:
|
|
return _error(f"Weixin send failed: {e}")
|
|
|
|
|
|
async def _send_bluebubbles(extra, chat_id, message):
|
|
"""Send via BlueBubbles iMessage server using the adapter's REST API."""
|
|
try:
|
|
from gateway.platforms.bluebubbles import BlueBubblesAdapter, check_bluebubbles_requirements
|
|
if not check_bluebubbles_requirements():
|
|
return {"error": "BlueBubbles requirements not met (need aiohttp + httpx)."}
|
|
except ImportError:
|
|
return {"error": "BlueBubbles adapter not available."}
|
|
|
|
try:
|
|
from gateway.config import PlatformConfig
|
|
pconfig = PlatformConfig(extra=extra)
|
|
adapter = BlueBubblesAdapter(pconfig)
|
|
connected = await adapter.connect()
|
|
if not connected:
|
|
return _error("BlueBubbles: failed to connect to server")
|
|
try:
|
|
result = await adapter.send(chat_id, message)
|
|
if not result.success:
|
|
return _error(f"BlueBubbles send failed: {result.error}")
|
|
return {"success": True, "platform": "bluebubbles", "chat_id": chat_id, "message_id": result.message_id}
|
|
finally:
|
|
await adapter.disconnect()
|
|
except Exception as e:
|
|
return _error(f"BlueBubbles send failed: {e}")
|
|
|
|
|
|
async def _send_feishu(pconfig, chat_id, message, media_files=None, thread_id=None):
|
|
"""Send via Feishu/Lark using the adapter's send pipeline."""
|
|
try:
|
|
from gateway.platforms.feishu import FeishuAdapter, FEISHU_AVAILABLE
|
|
if not FEISHU_AVAILABLE:
|
|
return {"error": "Feishu dependencies not installed. Run: pip install 'hermes-agent[feishu]'"}
|
|
from gateway.platforms.feishu import FEISHU_DOMAIN, LARK_DOMAIN
|
|
except ImportError:
|
|
return {"error": "Feishu dependencies not installed. Run: pip install 'hermes-agent[feishu]'"}
|
|
|
|
media_files = media_files or []
|
|
|
|
try:
|
|
adapter = FeishuAdapter(pconfig)
|
|
domain_name = getattr(adapter, "_domain_name", "feishu")
|
|
domain = FEISHU_DOMAIN if domain_name != "lark" else LARK_DOMAIN
|
|
adapter._client = adapter._build_lark_client(domain)
|
|
metadata = {"thread_id": thread_id} if thread_id else None
|
|
|
|
last_result = None
|
|
if message.strip():
|
|
last_result = await adapter.send(chat_id, message, metadata=metadata)
|
|
if not last_result.success:
|
|
return _error(f"Feishu send failed: {last_result.error}")
|
|
|
|
for media_path, is_voice in media_files:
|
|
if not os.path.exists(media_path):
|
|
return _error(f"Media file not found: {media_path}")
|
|
|
|
ext = os.path.splitext(media_path)[1].lower()
|
|
if ext in _IMAGE_EXTS:
|
|
last_result = await adapter.send_image_file(chat_id, media_path, metadata=metadata)
|
|
elif ext in _VIDEO_EXTS:
|
|
last_result = await adapter.send_video(chat_id, media_path, metadata=metadata)
|
|
elif ext in _VOICE_EXTS and is_voice:
|
|
last_result = await adapter.send_voice(chat_id, media_path, metadata=metadata)
|
|
elif ext in _AUDIO_EXTS:
|
|
last_result = await adapter.send_voice(chat_id, media_path, metadata=metadata)
|
|
else:
|
|
last_result = await adapter.send_document(chat_id, media_path, metadata=metadata)
|
|
|
|
if not last_result.success:
|
|
return _error(f"Feishu media send failed: {last_result.error}")
|
|
|
|
if last_result is None:
|
|
return {"error": "No deliverable text or media remained after processing MEDIA tags"}
|
|
|
|
return {
|
|
"success": True,
|
|
"platform": "feishu",
|
|
"chat_id": chat_id,
|
|
"message_id": last_result.message_id,
|
|
}
|
|
except Exception as e:
|
|
return _error(f"Feishu send failed: {e}")
|
|
|
|
|
|
def _check_send_message():
|
|
"""Gate send_message on gateway running (always available on messaging platforms)."""
|
|
from gateway.session_context import get_session_env
|
|
platform = get_session_env("HERMES_SESSION_PLATFORM", "")
|
|
if platform and platform != "local":
|
|
return True
|
|
try:
|
|
from gateway.status import is_gateway_running
|
|
return is_gateway_running()
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
async def _send_qqbot(pconfig, chat_id, message):
|
|
"""Send via QQBot using the REST API directly (no WebSocket needed).
|
|
|
|
Uses the QQ Bot Open Platform REST endpoints to get an access token
|
|
and post a message. Supports guild channels, C2C (private) chats,
|
|
and group chats by trying the appropriate endpoints.
|
|
"""
|
|
try:
|
|
import httpx
|
|
except ImportError:
|
|
return _error("QQBot direct send requires httpx. Run: pip install httpx")
|
|
|
|
extra = pconfig.extra or {}
|
|
appid = extra.get("app_id") or os.getenv("QQ_APP_ID", "")
|
|
secret = (pconfig.token or extra.get("client_secret")
|
|
or os.getenv("QQ_CLIENT_SECRET", ""))
|
|
if not appid or not secret:
|
|
return _error("QQBot: QQ_APP_ID / QQ_CLIENT_SECRET not configured.")
|
|
|
|
try:
|
|
async with httpx.AsyncClient(timeout=15) as client:
|
|
# Step 1: Get access token
|
|
token_resp = await client.post(
|
|
"https://bots.qq.com/app/getAppAccessToken",
|
|
json={"appId": str(appid), "clientSecret": str(secret)},
|
|
)
|
|
if token_resp.status_code != 200:
|
|
return _error(f"QQBot token request failed: {token_resp.status_code}")
|
|
token_data = token_resp.json()
|
|
access_token = token_data.get("access_token")
|
|
if not access_token:
|
|
return _error(f"QQBot: no access_token in response")
|
|
|
|
# Step 2: Send message via REST
|
|
# QQ Bot API has separate endpoints for channels, C2C, and groups.
|
|
# We try them in order: channel first, then fallback to C2C.
|
|
headers = {
|
|
"Authorization": f"QQBot {access_token}",
|
|
"Content-Type": "application/json",
|
|
}
|
|
payload = {"content": message[:4000], "msg_type": 0}
|
|
|
|
# Try channel endpoint first (works for guild channels)
|
|
url = f"https://api.sgroup.qq.com/channels/{chat_id}/messages"
|
|
resp = await client.post(url, json=payload, headers=headers)
|
|
if resp.status_code in (200, 201):
|
|
data = resp.json()
|
|
return {"success": True, "platform": "qqbot", "chat_id": chat_id,
|
|
"message_id": data.get("id")}
|
|
|
|
# If channel endpoint failed (likely "频道不存在"), try C2C endpoint
|
|
url_c2c = f"https://api.sgroup.qq.com/v2/users/{chat_id}/messages"
|
|
resp_c2c = await client.post(url_c2c, json=payload, headers=headers)
|
|
if resp_c2c.status_code in (200, 201):
|
|
data = resp_c2c.json()
|
|
return {"success": True, "platform": "qqbot", "chat_id": chat_id,
|
|
"message_id": data.get("id")}
|
|
|
|
# If C2C also failed, try group endpoint
|
|
url_group = f"https://api.sgroup.qq.com/v2/groups/{chat_id}/messages"
|
|
resp_group = await client.post(url_group, json=payload, headers=headers)
|
|
if resp_group.status_code in (200, 201):
|
|
data = resp_group.json()
|
|
return {"success": True, "platform": "qqbot", "chat_id": chat_id,
|
|
"message_id": data.get("id")}
|
|
|
|
# All endpoints failed — return the most informative error
|
|
return _error(f"QQBot send failed: channel={resp.status_code} c2c={resp_c2c.status_code} group={resp_group.status_code}")
|
|
except Exception as e:
|
|
return _error(f"QQBot send failed: {e}")
|
|
|
|
|
|
async def _send_yuanbao(chat_id, message, media_files=None):
|
|
"""Send via Yuanbao using the running gateway adapter's WebSocket connection.
|
|
|
|
Yuanbao uses a persistent WebSocket — unlike HTTP-based platforms, we
|
|
cannot create a throwaway client. We obtain the running singleton from
|
|
the adapter module itself (``get_active_adapter``).
|
|
|
|
chat_id format:
|
|
- Group: "group:<group_code>"
|
|
- DM: "direct:<account_id>" or just "<account_id>"
|
|
"""
|
|
try:
|
|
from gateway.platforms.yuanbao import get_active_adapter, send_yuanbao_direct
|
|
except ImportError:
|
|
return _error("Yuanbao adapter module not available.")
|
|
|
|
adapter = get_active_adapter()
|
|
if adapter is None:
|
|
return _error(
|
|
"Yuanbao adapter is not running. "
|
|
"Start the gateway with yuanbao platform enabled first."
|
|
)
|
|
|
|
try:
|
|
return await send_yuanbao_direct(adapter, chat_id, message, media_files=media_files)
|
|
except Exception as e:
|
|
return _error(f"Yuanbao send failed: {e}")
|
|
|
|
|
|
# --- Registry ---
|
|
from tools.registry import registry, tool_error
|
|
|
|
registry.register(
|
|
name="send_message",
|
|
toolset="messaging",
|
|
schema=SEND_MESSAGE_SCHEMA,
|
|
handler=send_message_tool,
|
|
check_fn=_check_send_message,
|
|
emoji="📨",
|
|
)
|