mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-25 00:51:20 +00:00
Full AST-based scan of all .py files to find every case where a module
or name is imported locally inside a function body but is already
available at module level. This is the second pass — the first commit
handled the known cases from the lint report; this one catches
everything else.
Files changed (19):
cli.py — 16 removals: time as _time/_t/_tmod (×10),
re / re as _re (×2), os as _os, sys,
partial os from combo import,
from model_tools import get_tool_definitions
gateway/run.py — 8 removals: MessageEvent as _ME /
MessageType as _MT (×3), os as _os2,
MessageEvent+MessageType (×2), Platform,
BasePlatformAdapter as _BaseAdapter
run_agent.py — 6 removals: get_hermes_home as _ghh,
partial (contextlib, os as _os),
cleanup_vm, cleanup_browser,
set_interrupt as _sif (×2),
partial get_toolset_for_tool
hermes_cli/main.py — 4 removals: get_hermes_home, time as _time,
logging as _log, shutil
hermes_cli/config.py — 1 removal: get_hermes_home as _ghome
hermes_cli/runtime_provider.py
— 1 removal: load_config as _load_bedrock_config
hermes_cli/setup.py — 2 removals: importlib.util (×2)
hermes_cli/nous_subscription.py
— 1 removal: from hermes_cli.config import load_config
hermes_cli/tools_config.py
— 1 removal: from hermes_cli.config import load_config, save_config
cron/scheduler.py — 3 removals: concurrent.futures, json as _json,
from hermes_cli.config import load_config
batch_runner.py — 1 removal: list_distributions as get_all_dists
(kept print_distribution_info, not at top level)
tools/send_message_tool.py
— 2 removals: import os (×2)
tools/skills_tool.py — 1 removal: logging as _logging
tools/browser_camofox.py
— 1 removal: from hermes_cli.config import load_config
tools/image_generation_tool.py
— 1 removal: import fal_client
environments/tool_context.py
— 1 removal: concurrent.futures
gateway/platforms/bluebubbles.py
— 1 removal: httpx as _httpx
gateway/platforms/whatsapp.py
— 1 removal: import asyncio
tui_gateway/server.py — 2 removals: from datetime import datetime,
import time
All alias references (_time, _t, _tmod, _re, _os, _os2, _json, _ghh,
_ghome, _sif, _ME, _MT, _BaseAdapter, _load_bedrock_config, _httpx,
_logging, _log, get_all_dists) updated to use the top-level names.
1523 lines
64 KiB
Python
1523 lines
64 KiB
Python
"""Send Message Tool -- cross-channel messaging via platform APIs.
|
|
|
|
Sends a message to a user or channel on any connected messaging platform
|
|
(Telegram, Discord, Slack). Supports listing available targets and resolving
|
|
human-friendly channel names to IDs. Works in both CLI and gateway contexts.
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import os
|
|
import re
|
|
from typing import Dict, Optional
|
|
import ssl
|
|
import time
|
|
|
|
from agent.redact import redact_sensitive_text
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
_TELEGRAM_TOPIC_TARGET_RE = re.compile(r"^\s*(-?\d+)(?::(\d+))?\s*$")
|
|
_FEISHU_TARGET_RE = re.compile(r"^\s*((?:oc|ou|on|chat|open)_[-A-Za-z0-9]+)(?::([-A-Za-z0-9_]+))?\s*$")
|
|
_WEIXIN_TARGET_RE = re.compile(r"^\s*((?:wxid|gh|v\d+|wm|wb)_[A-Za-z0-9_-]+|[A-Za-z0-9._-]+@chatroom|filehelper)\s*$")
|
|
# Discord snowflake IDs are numeric, same regex pattern as Telegram topic targets.
|
|
_NUMERIC_TOPIC_RE = _TELEGRAM_TOPIC_TARGET_RE
|
|
# Platforms that address recipients by phone number and accept E.164 format
|
|
# (with a leading '+'). Without this, "+15551234567" fails the isdigit() check
|
|
# below and falls through to channel-name resolution, which has no way to
|
|
# resolve a raw phone number. Keeping the '+' preserves the E.164 form that
|
|
# downstream adapters (signal, etc.) expect.
|
|
_PHONE_PLATFORMS = frozenset({"signal", "sms", "whatsapp"})
|
|
_E164_TARGET_RE = re.compile(r"^\s*\+(\d{7,15})\s*$")
|
|
_IMAGE_EXTS = {".jpg", ".jpeg", ".png", ".webp", ".gif"}
|
|
_VIDEO_EXTS = {".mp4", ".mov", ".avi", ".mkv", ".3gp"}
|
|
_AUDIO_EXTS = {".ogg", ".opus", ".mp3", ".wav", ".m4a"}
|
|
_VOICE_EXTS = {".ogg", ".opus"}
|
|
_URL_SECRET_QUERY_RE = re.compile(
|
|
r"([?&](?:access_token|api[_-]?key|auth[_-]?token|token|signature|sig)=)([^&#\s]+)",
|
|
re.IGNORECASE,
|
|
)
|
|
_GENERIC_SECRET_ASSIGN_RE = re.compile(
|
|
r"\b(access_token|api[_-]?key|auth[_-]?token|signature|sig)\s*=\s*([^\s,;]+)",
|
|
re.IGNORECASE,
|
|
)
|
|
|
|
|
|
def _sanitize_error_text(text) -> str:
|
|
"""Redact secrets from error text before surfacing it to users/models."""
|
|
redacted = redact_sensitive_text(text)
|
|
redacted = _URL_SECRET_QUERY_RE.sub(lambda m: f"{m.group(1)}***", redacted)
|
|
redacted = _GENERIC_SECRET_ASSIGN_RE.sub(lambda m: f"{m.group(1)}=***", redacted)
|
|
return redacted
|
|
|
|
|
|
def _error(message: str) -> dict:
|
|
"""Build a standardized error payload with redacted content."""
|
|
return {"error": _sanitize_error_text(message)}
|
|
|
|
|
|
def _telegram_retry_delay(exc: Exception, attempt: int) -> float | None:
|
|
retry_after = getattr(exc, "retry_after", None)
|
|
if retry_after is not None:
|
|
try:
|
|
return max(float(retry_after), 0.0)
|
|
except (TypeError, ValueError):
|
|
return 1.0
|
|
|
|
text = str(exc).lower()
|
|
if "timed out" in text or "timeout" in text:
|
|
return None
|
|
if (
|
|
"bad gateway" in text
|
|
or "502" in text
|
|
or "too many requests" in text
|
|
or "429" in text
|
|
or "service unavailable" in text
|
|
or "503" in text
|
|
or "gateway timeout" in text
|
|
or "504" in text
|
|
):
|
|
return float(2 ** attempt)
|
|
return None
|
|
|
|
|
|
async def _send_telegram_message_with_retry(bot, *, attempts: int = 3, **kwargs):
|
|
for attempt in range(attempts):
|
|
try:
|
|
return await bot.send_message(**kwargs)
|
|
except Exception as exc:
|
|
delay = _telegram_retry_delay(exc, attempt)
|
|
if delay is None or attempt >= attempts - 1:
|
|
raise
|
|
logger.warning(
|
|
"Transient Telegram send failure (attempt %d/%d), retrying in %.1fs: %s",
|
|
attempt + 1,
|
|
attempts,
|
|
delay,
|
|
_sanitize_error_text(exc),
|
|
)
|
|
await asyncio.sleep(delay)
|
|
|
|
|
|
SEND_MESSAGE_SCHEMA = {
|
|
"name": "send_message",
|
|
"description": (
|
|
"Send a message to a connected messaging platform, or list available targets.\n\n"
|
|
"IMPORTANT: When the user asks to send to a specific channel or person "
|
|
"(not just a bare platform name), call send_message(action='list') FIRST to see "
|
|
"available targets, then send to the correct one.\n"
|
|
"If the user just says a platform name like 'send to telegram', send directly "
|
|
"to the home channel without listing first."
|
|
),
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"action": {
|
|
"type": "string",
|
|
"enum": ["send", "list"],
|
|
"description": "Action to perform. 'send' (default) sends a message. 'list' returns all available channels/contacts across connected platforms."
|
|
},
|
|
"target": {
|
|
"type": "string",
|
|
"description": "Delivery target. Format: 'platform' (uses home channel), 'platform:#channel-name', 'platform:chat_id', or 'platform:chat_id:thread_id' for Telegram topics and Discord threads. Examples: 'telegram', 'telegram:-1001234567890:17585', 'discord:999888777:555444333', 'discord:#bot-home', 'slack:#engineering', 'signal:+155****4567', 'matrix:!roomid:server.org', 'matrix:@user:server.org'"
|
|
},
|
|
"message": {
|
|
"type": "string",
|
|
"description": "The message text to send"
|
|
}
|
|
},
|
|
"required": []
|
|
}
|
|
}
|
|
|
|
|
|
def send_message_tool(args, **kw):
|
|
"""Handle cross-channel send_message tool calls."""
|
|
action = args.get("action", "send")
|
|
|
|
if action == "list":
|
|
return _handle_list()
|
|
|
|
return _handle_send(args)
|
|
|
|
|
|
def _handle_list():
|
|
"""Return formatted list of available messaging targets."""
|
|
try:
|
|
from gateway.channel_directory import format_directory_for_display
|
|
return json.dumps({"targets": format_directory_for_display()})
|
|
except Exception as e:
|
|
return json.dumps(_error(f"Failed to load channel directory: {e}"))
|
|
|
|
|
|
def _handle_send(args):
|
|
"""Send a message to a platform target."""
|
|
target = args.get("target", "")
|
|
message = args.get("message", "")
|
|
if not target or not message:
|
|
return tool_error("Both 'target' and 'message' are required when action='send'")
|
|
|
|
parts = target.split(":", 1)
|
|
platform_name = parts[0].strip().lower()
|
|
target_ref = parts[1].strip() if len(parts) > 1 else None
|
|
chat_id = None
|
|
thread_id = None
|
|
|
|
if target_ref:
|
|
chat_id, thread_id, is_explicit = _parse_target_ref(platform_name, target_ref)
|
|
else:
|
|
is_explicit = False
|
|
|
|
# Resolve human-friendly channel names to numeric IDs
|
|
if target_ref and not is_explicit:
|
|
try:
|
|
from gateway.channel_directory import resolve_channel_name
|
|
resolved = resolve_channel_name(platform_name, target_ref)
|
|
if resolved:
|
|
chat_id, thread_id, _ = _parse_target_ref(platform_name, resolved)
|
|
else:
|
|
return json.dumps({
|
|
"error": f"Could not resolve '{target_ref}' on {platform_name}. "
|
|
f"Use send_message(action='list') to see available targets."
|
|
})
|
|
except Exception:
|
|
return json.dumps({
|
|
"error": f"Could not resolve '{target_ref}' on {platform_name}. "
|
|
f"Try using a numeric channel ID instead."
|
|
})
|
|
|
|
from tools.interrupt import is_interrupted
|
|
if is_interrupted():
|
|
return tool_error("Interrupted")
|
|
|
|
try:
|
|
from gateway.config import load_gateway_config, Platform
|
|
config = load_gateway_config()
|
|
except Exception as e:
|
|
return json.dumps(_error(f"Failed to load gateway config: {e}"))
|
|
|
|
platform_map = {
|
|
"telegram": Platform.TELEGRAM,
|
|
"discord": Platform.DISCORD,
|
|
"slack": Platform.SLACK,
|
|
"whatsapp": Platform.WHATSAPP,
|
|
"signal": Platform.SIGNAL,
|
|
"bluebubbles": Platform.BLUEBUBBLES,
|
|
"qqbot": Platform.QQBOT,
|
|
"matrix": Platform.MATRIX,
|
|
"mattermost": Platform.MATTERMOST,
|
|
"homeassistant": Platform.HOMEASSISTANT,
|
|
"dingtalk": Platform.DINGTALK,
|
|
"feishu": Platform.FEISHU,
|
|
"wecom": Platform.WECOM,
|
|
"wecom_callback": Platform.WECOM_CALLBACK,
|
|
"weixin": Platform.WEIXIN,
|
|
"email": Platform.EMAIL,
|
|
"sms": Platform.SMS,
|
|
}
|
|
platform = platform_map.get(platform_name)
|
|
if not platform:
|
|
avail = ", ".join(platform_map.keys())
|
|
return tool_error(f"Unknown platform: {platform_name}. Available: {avail}")
|
|
|
|
pconfig = config.platforms.get(platform)
|
|
if not pconfig or not pconfig.enabled:
|
|
# Weixin can be configured purely via .env; synthesize a pconfig so
|
|
# send_message and cron delivery work without a gateway.yaml entry.
|
|
if platform_name == "weixin":
|
|
wx_token = os.getenv("WEIXIN_TOKEN", "").strip()
|
|
wx_account = os.getenv("WEIXIN_ACCOUNT_ID", "").strip()
|
|
if wx_token and wx_account:
|
|
from gateway.config import PlatformConfig
|
|
pconfig = PlatformConfig(
|
|
enabled=True,
|
|
token=wx_token,
|
|
extra={
|
|
"account_id": wx_account,
|
|
"base_url": os.getenv("WEIXIN_BASE_URL", "").strip(),
|
|
"cdn_base_url": os.getenv("WEIXIN_CDN_BASE_URL", "").strip(),
|
|
},
|
|
)
|
|
else:
|
|
return tool_error(f"Platform '{platform_name}' is not configured. Set up credentials in ~/.hermes/config.yaml or environment variables.")
|
|
else:
|
|
return tool_error(f"Platform '{platform_name}' is not configured. Set up credentials in ~/.hermes/config.yaml or environment variables.")
|
|
|
|
from gateway.platforms.base import BasePlatformAdapter
|
|
|
|
media_files, cleaned_message = BasePlatformAdapter.extract_media(message)
|
|
mirror_text = cleaned_message.strip() or _describe_media_for_mirror(media_files)
|
|
|
|
used_home_channel = False
|
|
if not chat_id:
|
|
home = config.get_home_channel(platform)
|
|
if not home and platform_name == "weixin":
|
|
wx_home = os.getenv("WEIXIN_HOME_CHANNEL", "").strip()
|
|
if wx_home:
|
|
from gateway.config import HomeChannel
|
|
home = HomeChannel(platform=platform, chat_id=wx_home, name="Weixin Home")
|
|
if home:
|
|
chat_id = home.chat_id
|
|
used_home_channel = True
|
|
else:
|
|
return json.dumps({
|
|
"error": f"No home channel set for {platform_name} to determine where to send the message. "
|
|
f"Either specify a channel directly with '{platform_name}:CHANNEL_NAME', "
|
|
f"or set a home channel via: hermes config set {platform_name.upper()}_HOME_CHANNEL <channel_id>"
|
|
})
|
|
|
|
duplicate_skip = _maybe_skip_cron_duplicate_send(platform_name, chat_id, thread_id)
|
|
if duplicate_skip:
|
|
return json.dumps(duplicate_skip)
|
|
|
|
try:
|
|
from model_tools import _run_async
|
|
result = _run_async(
|
|
_send_to_platform(
|
|
platform,
|
|
pconfig,
|
|
chat_id,
|
|
cleaned_message,
|
|
thread_id=thread_id,
|
|
media_files=media_files,
|
|
)
|
|
)
|
|
if used_home_channel and isinstance(result, dict) and result.get("success"):
|
|
result["note"] = f"Sent to {platform_name} home channel (chat_id: {chat_id})"
|
|
|
|
# Mirror the sent message into the target's gateway session
|
|
if isinstance(result, dict) and result.get("success") and mirror_text:
|
|
try:
|
|
from gateway.mirror import mirror_to_session
|
|
from gateway.session_context import get_session_env
|
|
source_label = get_session_env("HERMES_SESSION_PLATFORM", "cli")
|
|
if mirror_to_session(platform_name, chat_id, mirror_text, source_label=source_label, thread_id=thread_id):
|
|
result["mirrored"] = True
|
|
except Exception:
|
|
pass
|
|
|
|
if isinstance(result, dict) and "error" in result:
|
|
result["error"] = _sanitize_error_text(result["error"])
|
|
return json.dumps(result)
|
|
except Exception as e:
|
|
return json.dumps(_error(f"Send failed: {e}"))
|
|
|
|
|
|
def _parse_target_ref(platform_name: str, target_ref: str):
|
|
"""Parse a tool target into chat_id/thread_id and whether it is explicit."""
|
|
if platform_name == "telegram":
|
|
match = _TELEGRAM_TOPIC_TARGET_RE.fullmatch(target_ref)
|
|
if match:
|
|
return match.group(1), match.group(2), True
|
|
if platform_name == "feishu":
|
|
match = _FEISHU_TARGET_RE.fullmatch(target_ref)
|
|
if match:
|
|
return match.group(1), match.group(2), True
|
|
if platform_name == "discord":
|
|
match = _NUMERIC_TOPIC_RE.fullmatch(target_ref)
|
|
if match:
|
|
return match.group(1), match.group(2), True
|
|
if platform_name == "weixin":
|
|
match = _WEIXIN_TARGET_RE.fullmatch(target_ref)
|
|
if match:
|
|
return match.group(1), None, True
|
|
if platform_name in _PHONE_PLATFORMS:
|
|
match = _E164_TARGET_RE.fullmatch(target_ref)
|
|
if match:
|
|
# Preserve the leading '+' — signal-cli and sms/whatsapp adapters
|
|
# expect E.164 format for direct recipients.
|
|
return target_ref.strip(), None, True
|
|
if target_ref.lstrip("-").isdigit():
|
|
return target_ref, None, True
|
|
# Matrix room IDs (start with !) and user IDs (start with @) are explicit
|
|
if platform_name == "matrix" and (target_ref.startswith("!") or target_ref.startswith("@")):
|
|
return target_ref, None, True
|
|
return None, None, False
|
|
|
|
|
|
def _describe_media_for_mirror(media_files):
|
|
"""Return a human-readable mirror summary when a message only contains media."""
|
|
if not media_files:
|
|
return ""
|
|
if len(media_files) == 1:
|
|
media_path, is_voice = media_files[0]
|
|
ext = os.path.splitext(media_path)[1].lower()
|
|
if is_voice and ext in _VOICE_EXTS:
|
|
return "[Sent voice message]"
|
|
if ext in _IMAGE_EXTS:
|
|
return "[Sent image attachment]"
|
|
if ext in _VIDEO_EXTS:
|
|
return "[Sent video attachment]"
|
|
if ext in _AUDIO_EXTS:
|
|
return "[Sent audio attachment]"
|
|
return "[Sent document attachment]"
|
|
return f"[Sent {len(media_files)} media attachments]"
|
|
|
|
|
|
def _get_cron_auto_delivery_target():
|
|
"""Return the cron scheduler's auto-delivery target for the current run, if any."""
|
|
from gateway.session_context import get_session_env
|
|
platform = get_session_env("HERMES_CRON_AUTO_DELIVER_PLATFORM", "").strip().lower()
|
|
chat_id = get_session_env("HERMES_CRON_AUTO_DELIVER_CHAT_ID", "").strip()
|
|
if not platform or not chat_id:
|
|
return None
|
|
thread_id = get_session_env("HERMES_CRON_AUTO_DELIVER_THREAD_ID", "").strip() or None
|
|
return {
|
|
"platform": platform,
|
|
"chat_id": chat_id,
|
|
"thread_id": thread_id,
|
|
}
|
|
|
|
|
|
def _maybe_skip_cron_duplicate_send(platform_name: str, chat_id: str, thread_id: str | None):
|
|
"""Skip redundant cron send_message calls when the scheduler will auto-deliver there."""
|
|
auto_target = _get_cron_auto_delivery_target()
|
|
if not auto_target:
|
|
return None
|
|
|
|
same_target = (
|
|
auto_target["platform"] == platform_name
|
|
and str(auto_target["chat_id"]) == str(chat_id)
|
|
and auto_target.get("thread_id") == thread_id
|
|
)
|
|
if not same_target:
|
|
return None
|
|
|
|
target_label = f"{platform_name}:{chat_id}"
|
|
if thread_id is not None:
|
|
target_label += f":{thread_id}"
|
|
|
|
return {
|
|
"success": True,
|
|
"skipped": True,
|
|
"reason": "cron_auto_delivery_duplicate_target",
|
|
"target": target_label,
|
|
"note": (
|
|
f"Skipped send_message to {target_label}. This cron job will already auto-deliver "
|
|
"its final response to that same target. Put the intended user-facing content in "
|
|
"your final response instead, or use a different target if you want an additional message."
|
|
),
|
|
}
|
|
|
|
|
|
async def _send_to_platform(platform, pconfig, chat_id, message, thread_id=None, media_files=None):
|
|
"""Route a message to the appropriate platform sender.
|
|
|
|
Long messages are automatically chunked to fit within platform limits
|
|
using the same smart-splitting algorithm as the gateway adapters
|
|
(preserves code-block boundaries, adds part indicators).
|
|
"""
|
|
from gateway.config import Platform
|
|
from gateway.platforms.base import BasePlatformAdapter, utf16_len
|
|
from gateway.platforms.discord import DiscordAdapter
|
|
from gateway.platforms.slack import SlackAdapter
|
|
|
|
# Telegram adapter import is optional (requires python-telegram-bot)
|
|
try:
|
|
from gateway.platforms.telegram import TelegramAdapter
|
|
_telegram_available = True
|
|
except ImportError:
|
|
_telegram_available = False
|
|
|
|
# Feishu adapter import is optional (requires lark-oapi)
|
|
try:
|
|
from gateway.platforms.feishu import FeishuAdapter
|
|
_feishu_available = True
|
|
except ImportError:
|
|
_feishu_available = False
|
|
|
|
media_files = media_files or []
|
|
|
|
if platform == Platform.SLACK and message:
|
|
try:
|
|
slack_adapter = SlackAdapter.__new__(SlackAdapter)
|
|
message = slack_adapter.format_message(message)
|
|
except Exception:
|
|
logger.debug("Failed to apply Slack mrkdwn formatting in _send_to_platform", exc_info=True)
|
|
|
|
# Platform message length limits (from adapter class attributes)
|
|
_MAX_LENGTHS = {
|
|
Platform.TELEGRAM: TelegramAdapter.MAX_MESSAGE_LENGTH if _telegram_available else 4096,
|
|
Platform.DISCORD: DiscordAdapter.MAX_MESSAGE_LENGTH,
|
|
Platform.SLACK: SlackAdapter.MAX_MESSAGE_LENGTH,
|
|
}
|
|
if _feishu_available:
|
|
_MAX_LENGTHS[Platform.FEISHU] = FeishuAdapter.MAX_MESSAGE_LENGTH
|
|
|
|
# Smart-chunk the message to fit within platform limits.
|
|
# For short messages or platforms without a known limit this is a no-op.
|
|
# Telegram measures length in UTF-16 code units, not Unicode codepoints.
|
|
max_len = _MAX_LENGTHS.get(platform)
|
|
if max_len:
|
|
_len_fn = utf16_len if platform == Platform.TELEGRAM else None
|
|
chunks = BasePlatformAdapter.truncate_message(message, max_len, len_fn=_len_fn)
|
|
else:
|
|
chunks = [message]
|
|
|
|
# --- Telegram: special handling for media attachments ---
|
|
if platform == Platform.TELEGRAM:
|
|
last_result = None
|
|
disable_link_previews = bool(getattr(pconfig, "extra", {}) and pconfig.extra.get("disable_link_previews"))
|
|
for i, chunk in enumerate(chunks):
|
|
is_last = (i == len(chunks) - 1)
|
|
result = await _send_telegram(
|
|
pconfig.token,
|
|
chat_id,
|
|
chunk,
|
|
media_files=media_files if is_last else [],
|
|
thread_id=thread_id,
|
|
disable_link_previews=disable_link_previews,
|
|
)
|
|
if isinstance(result, dict) and result.get("error"):
|
|
return result
|
|
last_result = result
|
|
return last_result
|
|
|
|
# --- Weixin: use the native one-shot adapter helper for text + media ---
|
|
if platform == Platform.WEIXIN:
|
|
return await _send_weixin(pconfig, chat_id, message, media_files=media_files)
|
|
|
|
# --- Discord: special handling for media attachments ---
|
|
if platform == Platform.DISCORD:
|
|
last_result = None
|
|
for i, chunk in enumerate(chunks):
|
|
is_last = (i == len(chunks) - 1)
|
|
result = await _send_discord(
|
|
pconfig.token,
|
|
chat_id,
|
|
chunk,
|
|
media_files=media_files if is_last else [],
|
|
thread_id=thread_id,
|
|
)
|
|
if isinstance(result, dict) and result.get("error"):
|
|
return result
|
|
last_result = result
|
|
return last_result
|
|
|
|
# --- Matrix: use the native adapter helper when media is present ---
|
|
if platform == Platform.MATRIX and media_files:
|
|
last_result = None
|
|
for i, chunk in enumerate(chunks):
|
|
is_last = (i == len(chunks) - 1)
|
|
result = await _send_matrix_via_adapter(
|
|
pconfig,
|
|
chat_id,
|
|
chunk,
|
|
media_files=media_files if is_last else [],
|
|
thread_id=thread_id,
|
|
)
|
|
if isinstance(result, dict) and result.get("error"):
|
|
return result
|
|
last_result = result
|
|
return last_result
|
|
|
|
# --- Signal: native attachment support via JSON-RPC attachments param ---
|
|
if platform == Platform.SIGNAL and media_files:
|
|
last_result = None
|
|
for i, chunk in enumerate(chunks):
|
|
is_last = (i == len(chunks) - 1)
|
|
result = await _send_signal(
|
|
pconfig.extra,
|
|
chat_id,
|
|
chunk,
|
|
media_files=media_files if is_last else [],
|
|
)
|
|
if isinstance(result, dict) and result.get("error"):
|
|
return result
|
|
last_result = result
|
|
return last_result
|
|
|
|
# --- Non-media platforms ---
|
|
if media_files and not message.strip():
|
|
return {
|
|
"error": (
|
|
f"send_message MEDIA delivery is currently only supported for telegram, discord, matrix, weixin, and signal; "
|
|
f"target {platform.value} had only media attachments"
|
|
)
|
|
}
|
|
warning = None
|
|
if media_files:
|
|
warning = (
|
|
f"MEDIA attachments were omitted for {platform.value}; "
|
|
"native send_message media delivery is currently only supported for telegram, discord, matrix, weixin, and signal"
|
|
)
|
|
|
|
last_result = None
|
|
for chunk in chunks:
|
|
if platform == Platform.SLACK:
|
|
result = await _send_slack(pconfig.token, chat_id, chunk)
|
|
elif platform == Platform.WHATSAPP:
|
|
result = await _send_whatsapp(pconfig.extra, chat_id, chunk)
|
|
elif platform == Platform.SIGNAL:
|
|
result = await _send_signal(pconfig.extra, chat_id, chunk)
|
|
elif platform == Platform.EMAIL:
|
|
result = await _send_email(pconfig.extra, chat_id, chunk)
|
|
elif platform == Platform.SMS:
|
|
result = await _send_sms(pconfig.api_key, chat_id, chunk)
|
|
elif platform == Platform.MATTERMOST:
|
|
result = await _send_mattermost(pconfig.token, pconfig.extra, chat_id, chunk)
|
|
elif platform == Platform.MATRIX:
|
|
result = await _send_matrix(pconfig.token, pconfig.extra, chat_id, chunk)
|
|
elif platform == Platform.HOMEASSISTANT:
|
|
result = await _send_homeassistant(pconfig.token, pconfig.extra, chat_id, chunk)
|
|
elif platform == Platform.DINGTALK:
|
|
result = await _send_dingtalk(pconfig.extra, chat_id, chunk)
|
|
elif platform == Platform.FEISHU:
|
|
result = await _send_feishu(pconfig, chat_id, chunk, thread_id=thread_id)
|
|
elif platform == Platform.WECOM:
|
|
result = await _send_wecom(pconfig.extra, chat_id, chunk)
|
|
elif platform == Platform.BLUEBUBBLES:
|
|
result = await _send_bluebubbles(pconfig.extra, chat_id, chunk)
|
|
elif platform == Platform.QQBOT:
|
|
result = await _send_qqbot(pconfig, chat_id, chunk)
|
|
else:
|
|
result = {"error": f"Direct sending not yet implemented for {platform.value}"}
|
|
|
|
if isinstance(result, dict) and result.get("error"):
|
|
return result
|
|
last_result = result
|
|
|
|
if warning and isinstance(last_result, dict) and last_result.get("success"):
|
|
warnings = list(last_result.get("warnings", []))
|
|
warnings.append(warning)
|
|
last_result["warnings"] = warnings
|
|
return last_result
|
|
|
|
|
|
async def _send_telegram(token, chat_id, message, media_files=None, thread_id=None, disable_link_previews=False):
|
|
"""Send via Telegram Bot API (one-shot, no polling needed).
|
|
|
|
Applies markdown→MarkdownV2 formatting (same as the gateway adapter)
|
|
so that bold, links, and headers render correctly. If the message
|
|
already contains HTML tags, it is sent with ``parse_mode='HTML'``
|
|
instead, bypassing MarkdownV2 conversion.
|
|
"""
|
|
try:
|
|
from telegram import Bot
|
|
from telegram.constants import ParseMode
|
|
|
|
# Auto-detect HTML tags — if present, skip MarkdownV2 and send as HTML.
|
|
# Inspired by github.com/ashaney — PR #1568.
|
|
_has_html = bool(re.search(r'<[a-zA-Z/][^>]*>', message))
|
|
|
|
if _has_html:
|
|
formatted = message
|
|
send_parse_mode = ParseMode.HTML
|
|
else:
|
|
# Reuse the gateway adapter's format_message for markdown→MarkdownV2
|
|
try:
|
|
from gateway.platforms.telegram import TelegramAdapter
|
|
_adapter = TelegramAdapter.__new__(TelegramAdapter)
|
|
formatted = _adapter.format_message(message)
|
|
except Exception:
|
|
# Fallback: send as-is if formatting unavailable
|
|
formatted = message
|
|
send_parse_mode = ParseMode.MARKDOWN_V2
|
|
|
|
bot = Bot(token=token)
|
|
int_chat_id = int(chat_id)
|
|
media_files = media_files or []
|
|
thread_kwargs = {}
|
|
if thread_id is not None:
|
|
thread_kwargs["message_thread_id"] = int(thread_id)
|
|
if disable_link_previews:
|
|
thread_kwargs["disable_web_page_preview"] = True
|
|
|
|
last_msg = None
|
|
warnings = []
|
|
|
|
if formatted.strip():
|
|
try:
|
|
last_msg = await _send_telegram_message_with_retry(
|
|
bot,
|
|
chat_id=int_chat_id, text=formatted,
|
|
parse_mode=send_parse_mode, **thread_kwargs
|
|
)
|
|
except Exception as md_error:
|
|
# Parse failed, fall back to plain text
|
|
if "parse" in str(md_error).lower() or "markdown" in str(md_error).lower() or "html" in str(md_error).lower():
|
|
logger.warning(
|
|
"Parse mode %s failed in _send_telegram, falling back to plain text: %s",
|
|
send_parse_mode,
|
|
_sanitize_error_text(md_error),
|
|
)
|
|
if not _has_html:
|
|
try:
|
|
from gateway.platforms.telegram import _strip_mdv2
|
|
plain = _strip_mdv2(formatted)
|
|
except Exception:
|
|
plain = message
|
|
else:
|
|
plain = message
|
|
last_msg = await _send_telegram_message_with_retry(
|
|
bot,
|
|
chat_id=int_chat_id, text=plain,
|
|
parse_mode=None, **thread_kwargs
|
|
)
|
|
else:
|
|
raise
|
|
|
|
for media_path, is_voice in media_files:
|
|
if not os.path.exists(media_path):
|
|
warning = f"Media file not found, skipping: {media_path}"
|
|
logger.warning(warning)
|
|
warnings.append(warning)
|
|
continue
|
|
|
|
ext = os.path.splitext(media_path)[1].lower()
|
|
try:
|
|
with open(media_path, "rb") as f:
|
|
if ext in _IMAGE_EXTS:
|
|
last_msg = await bot.send_photo(
|
|
chat_id=int_chat_id, photo=f, **thread_kwargs
|
|
)
|
|
elif ext in _VIDEO_EXTS:
|
|
last_msg = await bot.send_video(
|
|
chat_id=int_chat_id, video=f, **thread_kwargs
|
|
)
|
|
elif ext in _VOICE_EXTS and is_voice:
|
|
last_msg = await bot.send_voice(
|
|
chat_id=int_chat_id, voice=f, **thread_kwargs
|
|
)
|
|
elif ext in _AUDIO_EXTS:
|
|
last_msg = await bot.send_audio(
|
|
chat_id=int_chat_id, audio=f, **thread_kwargs
|
|
)
|
|
else:
|
|
last_msg = await bot.send_document(
|
|
chat_id=int_chat_id, document=f, **thread_kwargs
|
|
)
|
|
except Exception as e:
|
|
warning = _sanitize_error_text(f"Failed to send media {media_path}: {e}")
|
|
logger.error(warning)
|
|
warnings.append(warning)
|
|
|
|
if last_msg is None:
|
|
error = "No deliverable text or media remained after processing MEDIA tags"
|
|
if warnings:
|
|
return {"error": error, "warnings": warnings}
|
|
return {"error": error}
|
|
|
|
result = {
|
|
"success": True,
|
|
"platform": "telegram",
|
|
"chat_id": chat_id,
|
|
"message_id": str(last_msg.message_id),
|
|
}
|
|
if warnings:
|
|
result["warnings"] = warnings
|
|
return result
|
|
except ImportError:
|
|
return {"error": "python-telegram-bot not installed. Run: pip install python-telegram-bot"}
|
|
except Exception as e:
|
|
return _error(f"Telegram send failed: {e}")
|
|
|
|
|
|
def _derive_forum_thread_name(message: str) -> str:
|
|
"""Derive a thread name from the first line of the message, capped at 100 chars."""
|
|
first_line = message.strip().split("\n", 1)[0].strip()
|
|
# Strip common markdown heading prefixes
|
|
first_line = first_line.lstrip("#").strip()
|
|
if not first_line:
|
|
first_line = "New Post"
|
|
return first_line[:100]
|
|
|
|
|
|
# Process-local cache for Discord channel-type probes. Avoids re-probing the
|
|
# same channel on every send when the directory cache has no entry (e.g. fresh
|
|
# install, or channel created after the last directory build).
|
|
_DISCORD_CHANNEL_TYPE_PROBE_CACHE: Dict[str, bool] = {}
|
|
|
|
|
|
def _remember_channel_is_forum(chat_id: str, is_forum: bool) -> None:
|
|
_DISCORD_CHANNEL_TYPE_PROBE_CACHE[str(chat_id)] = bool(is_forum)
|
|
|
|
|
|
def _probe_is_forum_cached(chat_id: str) -> Optional[bool]:
|
|
return _DISCORD_CHANNEL_TYPE_PROBE_CACHE.get(str(chat_id))
|
|
|
|
|
|
async def _send_discord(token, chat_id, message, thread_id=None, media_files=None):
|
|
"""Send a single message via Discord REST API (no websocket client needed).
|
|
|
|
Chunking is handled by _send_to_platform() before this is called.
|
|
|
|
When thread_id is provided, the message is sent directly to that thread
|
|
via the /channels/{thread_id}/messages endpoint.
|
|
|
|
Media files are uploaded one-by-one via multipart/form-data after the
|
|
text message is sent (same pattern as Telegram).
|
|
|
|
Forum channels (type 15) reject POST /messages — a thread post is created
|
|
automatically via POST /channels/{id}/threads. Media files are uploaded
|
|
as multipart attachments on the starter message of the new thread.
|
|
|
|
Channel type is resolved from the channel directory first, then a
|
|
process-local probe cache, and only as a last resort with a live
|
|
GET /channels/{id} probe (whose result is memoized).
|
|
"""
|
|
try:
|
|
import aiohttp
|
|
except ImportError:
|
|
return {"error": "aiohttp not installed. Run: pip install aiohttp"}
|
|
try:
|
|
from gateway.platforms.base import resolve_proxy_url, proxy_kwargs_for_aiohttp
|
|
_proxy = resolve_proxy_url(platform_env_var="DISCORD_PROXY")
|
|
_sess_kw, _req_kw = proxy_kwargs_for_aiohttp(_proxy)
|
|
auth_headers = {"Authorization": f"Bot {token}"}
|
|
json_headers = {**auth_headers, "Content-Type": "application/json"}
|
|
media_files = media_files or []
|
|
last_data = None
|
|
warnings = []
|
|
|
|
# Thread endpoint: Discord threads are channels; send directly to the thread ID.
|
|
if thread_id:
|
|
url = f"https://discord.com/api/v10/channels/{thread_id}/messages"
|
|
else:
|
|
# Check if the target channel is a forum channel (type 15).
|
|
# Forum channels reject POST /messages — create a thread post instead.
|
|
# Three-layer detection: directory cache → process-local probe
|
|
# cache → GET /channels/{id} probe (with result memoized).
|
|
_channel_type = None
|
|
try:
|
|
from gateway.channel_directory import lookup_channel_type
|
|
_channel_type = lookup_channel_type("discord", chat_id)
|
|
except Exception:
|
|
pass
|
|
|
|
if _channel_type == "forum":
|
|
is_forum = True
|
|
elif _channel_type is not None:
|
|
is_forum = False
|
|
else:
|
|
cached = _probe_is_forum_cached(chat_id)
|
|
if cached is not None:
|
|
is_forum = cached
|
|
else:
|
|
is_forum = False
|
|
try:
|
|
info_url = f"https://discord.com/api/v10/channels/{chat_id}"
|
|
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=15), **_sess_kw) as info_sess:
|
|
async with info_sess.get(info_url, headers=json_headers, **_req_kw) as info_resp:
|
|
if info_resp.status == 200:
|
|
info = await info_resp.json()
|
|
is_forum = info.get("type") == 15
|
|
_remember_channel_is_forum(chat_id, is_forum)
|
|
except Exception:
|
|
logger.debug("Failed to probe channel type for %s", chat_id, exc_info=True)
|
|
|
|
if is_forum:
|
|
thread_name = _derive_forum_thread_name(message)
|
|
thread_url = f"https://discord.com/api/v10/channels/{chat_id}/threads"
|
|
|
|
# Filter to readable media files up front so we can pick the
|
|
# right code path (JSON vs multipart) before opening a session.
|
|
valid_media = []
|
|
for media_path, _is_voice in media_files:
|
|
if not os.path.exists(media_path):
|
|
warning = f"Media file not found, skipping: {media_path}"
|
|
logger.warning(warning)
|
|
warnings.append(warning)
|
|
continue
|
|
valid_media.append(media_path)
|
|
|
|
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=60), **_sess_kw) as session:
|
|
if valid_media:
|
|
# Multipart: payload_json + files[N] creates a forum
|
|
# thread with the starter message plus attachments in
|
|
# a single API call.
|
|
attachments_meta = [
|
|
{"id": str(idx), "filename": os.path.basename(path)}
|
|
for idx, path in enumerate(valid_media)
|
|
]
|
|
starter_message = {"content": message, "attachments": attachments_meta}
|
|
payload_json = json.dumps({"name": thread_name, "message": starter_message})
|
|
|
|
form = aiohttp.FormData()
|
|
form.add_field("payload_json", payload_json, content_type="application/json")
|
|
|
|
# Buffer file bytes up front — aiohttp's FormData can
|
|
# read lazily and we don't want handles closing under
|
|
# it on retry.
|
|
try:
|
|
for idx, media_path in enumerate(valid_media):
|
|
with open(media_path, "rb") as fh:
|
|
form.add_field(
|
|
f"files[{idx}]",
|
|
fh.read(),
|
|
filename=os.path.basename(media_path),
|
|
)
|
|
async with session.post(thread_url, headers=auth_headers, data=form, **_req_kw) as resp:
|
|
if resp.status not in (200, 201):
|
|
body = await resp.text()
|
|
return _error(f"Discord forum thread creation error ({resp.status}): {body}")
|
|
data = await resp.json()
|
|
except Exception as e:
|
|
return _error(_sanitize_error_text(f"Discord forum thread upload failed: {e}"))
|
|
else:
|
|
# No media — simple JSON POST creates the thread with
|
|
# just the text starter.
|
|
async with session.post(
|
|
thread_url,
|
|
headers=json_headers,
|
|
json={
|
|
"name": thread_name,
|
|
"message": {"content": message},
|
|
},
|
|
**_req_kw,
|
|
) as resp:
|
|
if resp.status not in (200, 201):
|
|
body = await resp.text()
|
|
return _error(f"Discord forum thread creation error ({resp.status}): {body}")
|
|
data = await resp.json()
|
|
|
|
thread_id_created = data.get("id")
|
|
starter_msg_id = (data.get("message") or {}).get("id", thread_id_created)
|
|
result = {
|
|
"success": True,
|
|
"platform": "discord",
|
|
"chat_id": chat_id,
|
|
"thread_id": thread_id_created,
|
|
"message_id": starter_msg_id,
|
|
}
|
|
if warnings:
|
|
result["warnings"] = warnings
|
|
return result
|
|
|
|
url = f"https://discord.com/api/v10/channels/{chat_id}/messages"
|
|
|
|
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=30), **_sess_kw) as session:
|
|
# Send text message (skip if empty and media is present)
|
|
if message.strip() or not media_files:
|
|
async with session.post(url, headers=json_headers, json={"content": message}, **_req_kw) as resp:
|
|
if resp.status not in (200, 201):
|
|
body = await resp.text()
|
|
return _error(f"Discord API error ({resp.status}): {body}")
|
|
last_data = await resp.json()
|
|
|
|
# Send each media file as a separate multipart upload
|
|
for media_path, _is_voice in media_files:
|
|
if not os.path.exists(media_path):
|
|
warning = f"Media file not found, skipping: {media_path}"
|
|
logger.warning(warning)
|
|
warnings.append(warning)
|
|
continue
|
|
try:
|
|
form = aiohttp.FormData()
|
|
filename = os.path.basename(media_path)
|
|
with open(media_path, "rb") as f:
|
|
form.add_field("files[0]", f, filename=filename)
|
|
async with session.post(url, headers=auth_headers, data=form, **_req_kw) as resp:
|
|
if resp.status not in (200, 201):
|
|
body = await resp.text()
|
|
warning = _sanitize_error_text(f"Failed to send media {media_path}: Discord API error ({resp.status}): {body}")
|
|
logger.error(warning)
|
|
warnings.append(warning)
|
|
continue
|
|
last_data = await resp.json()
|
|
except Exception as e:
|
|
warning = _sanitize_error_text(f"Failed to send media {media_path}: {e}")
|
|
logger.error(warning)
|
|
warnings.append(warning)
|
|
|
|
if last_data is None:
|
|
error = "No deliverable text or media remained after processing"
|
|
if warnings:
|
|
return {"error": error, "warnings": warnings}
|
|
return {"error": error}
|
|
|
|
result = {"success": True, "platform": "discord", "chat_id": chat_id, "message_id": last_data.get("id")}
|
|
if warnings:
|
|
result["warnings"] = warnings
|
|
return result
|
|
except Exception as e:
|
|
return _error(f"Discord send failed: {e}")
|
|
|
|
|
|
async def _send_slack(token, chat_id, message):
|
|
"""Send via Slack Web API."""
|
|
try:
|
|
import aiohttp
|
|
except ImportError:
|
|
return {"error": "aiohttp not installed. Run: pip install aiohttp"}
|
|
try:
|
|
from gateway.platforms.base import resolve_proxy_url, proxy_kwargs_for_aiohttp
|
|
_proxy = resolve_proxy_url()
|
|
_sess_kw, _req_kw = proxy_kwargs_for_aiohttp(_proxy)
|
|
url = "https://slack.com/api/chat.postMessage"
|
|
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
|
|
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=30), **_sess_kw) as session:
|
|
payload = {"channel": chat_id, "text": message, "mrkdwn": True}
|
|
async with session.post(url, headers=headers, json=payload, **_req_kw) as resp:
|
|
data = await resp.json()
|
|
if data.get("ok"):
|
|
return {"success": True, "platform": "slack", "chat_id": chat_id, "message_id": data.get("ts")}
|
|
return _error(f"Slack API error: {data.get('error', 'unknown')}")
|
|
except Exception as e:
|
|
return _error(f"Slack send failed: {e}")
|
|
|
|
|
|
async def _send_whatsapp(extra, chat_id, message):
|
|
"""Send via the local WhatsApp bridge HTTP API."""
|
|
try:
|
|
import aiohttp
|
|
except ImportError:
|
|
return {"error": "aiohttp not installed. Run: pip install aiohttp"}
|
|
try:
|
|
bridge_port = extra.get("bridge_port", 3000)
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.post(
|
|
f"http://localhost:{bridge_port}/send",
|
|
json={"chatId": chat_id, "message": message},
|
|
timeout=aiohttp.ClientTimeout(total=30),
|
|
) as resp:
|
|
if resp.status == 200:
|
|
data = await resp.json()
|
|
return {
|
|
"success": True,
|
|
"platform": "whatsapp",
|
|
"chat_id": chat_id,
|
|
"message_id": data.get("messageId"),
|
|
}
|
|
body = await resp.text()
|
|
return _error(f"WhatsApp bridge error ({resp.status}): {body}")
|
|
except Exception as e:
|
|
return _error(f"WhatsApp send failed: {e}")
|
|
|
|
|
|
async def _send_signal(extra, chat_id, message, media_files=None):
|
|
"""Send via signal-cli JSON-RPC API.
|
|
|
|
Supports both text-only and text-with-attachments (images/audio/documents).
|
|
Attachments are sent as an 'attachments' array in the JSON-RPC params.
|
|
"""
|
|
try:
|
|
import httpx
|
|
except ImportError:
|
|
return {"error": "httpx not installed"}
|
|
try:
|
|
http_url = extra.get("http_url", "http://127.0.0.1:8080").rstrip("/")
|
|
account = extra.get("account", "")
|
|
if not account:
|
|
return {"error": "Signal account not configured"}
|
|
|
|
params = {"account": account, "message": message}
|
|
if chat_id.startswith("group:"):
|
|
params["groupId"] = chat_id[6:]
|
|
else:
|
|
params["recipient"] = [chat_id]
|
|
|
|
# Add attachments if media_files are present
|
|
valid_media = media_files or []
|
|
attachment_paths = []
|
|
for media_path, _is_voice in valid_media:
|
|
if os.path.exists(media_path):
|
|
attachment_paths.append(media_path)
|
|
else:
|
|
logger.warning("Signal media file not found, skipping: %s", media_path)
|
|
|
|
if attachment_paths:
|
|
params["attachments"] = attachment_paths
|
|
|
|
payload = {
|
|
"jsonrpc": "2.0",
|
|
"method": "send",
|
|
"params": params,
|
|
"id": f"send_{int(time.time() * 1000)}",
|
|
}
|
|
|
|
async with httpx.AsyncClient(timeout=30.0) as client:
|
|
resp = await client.post(f"{http_url}/api/v1/rpc", json=payload)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
if "error" in data:
|
|
return _error(f"Signal RPC error: {data['error']}")
|
|
|
|
# Return warning for any skipped media files
|
|
result = {"success": True, "platform": "signal", "chat_id": chat_id}
|
|
if len(attachment_paths) < len(valid_media):
|
|
result["warnings"] = [f"Some media files were skipped (not found on disk)"]
|
|
return result
|
|
except Exception as e:
|
|
return _error(f"Signal send failed: {e}")
|
|
|
|
|
|
async def _send_email(extra, chat_id, message):
|
|
"""Send via SMTP (one-shot, no persistent connection needed)."""
|
|
import smtplib
|
|
from email.mime.text import MIMEText
|
|
|
|
address = extra.get("address") or os.getenv("EMAIL_ADDRESS", "")
|
|
password = os.getenv("EMAIL_PASSWORD", "")
|
|
smtp_host = extra.get("smtp_host") or os.getenv("EMAIL_SMTP_HOST", "")
|
|
try:
|
|
smtp_port = int(os.getenv("EMAIL_SMTP_PORT", "587"))
|
|
except (ValueError, TypeError):
|
|
smtp_port = 587
|
|
|
|
if not all([address, password, smtp_host]):
|
|
return {"error": "Email not configured (EMAIL_ADDRESS, EMAIL_PASSWORD, EMAIL_SMTP_HOST required)"}
|
|
|
|
try:
|
|
msg = MIMEText(message, "plain", "utf-8")
|
|
msg["From"] = address
|
|
msg["To"] = chat_id
|
|
msg["Subject"] = "Hermes Agent"
|
|
|
|
server = smtplib.SMTP(smtp_host, smtp_port)
|
|
server.starttls(context=ssl.create_default_context())
|
|
server.login(address, password)
|
|
server.send_message(msg)
|
|
server.quit()
|
|
return {"success": True, "platform": "email", "chat_id": chat_id}
|
|
except Exception as e:
|
|
return _error(f"Email send failed: {e}")
|
|
|
|
|
|
async def _send_sms(auth_token, chat_id, message):
|
|
"""Send a single SMS via Twilio REST API.
|
|
|
|
Uses HTTP Basic auth (Account SID : Auth Token) and form-encoded POST.
|
|
Chunking is handled by _send_to_platform() before this is called.
|
|
"""
|
|
try:
|
|
import aiohttp
|
|
except ImportError:
|
|
return {"error": "aiohttp not installed. Run: pip install aiohttp"}
|
|
|
|
import base64
|
|
|
|
account_sid = os.getenv("TWILIO_ACCOUNT_SID", "")
|
|
from_number = os.getenv("TWILIO_PHONE_NUMBER", "")
|
|
if not account_sid or not auth_token or not from_number:
|
|
return {"error": "SMS not configured (TWILIO_ACCOUNT_SID, TWILIO_AUTH_TOKEN, TWILIO_PHONE_NUMBER required)"}
|
|
|
|
# Strip markdown — SMS renders it as literal characters
|
|
message = re.sub(r"\*\*(.+?)\*\*", r"\1", message, flags=re.DOTALL)
|
|
message = re.sub(r"\*(.+?)\*", r"\1", message, flags=re.DOTALL)
|
|
message = re.sub(r"__(.+?)__", r"\1", message, flags=re.DOTALL)
|
|
message = re.sub(r"_(.+?)_", r"\1", message, flags=re.DOTALL)
|
|
message = re.sub(r"```[a-z]*\n?", "", message)
|
|
message = re.sub(r"`(.+?)`", r"\1", message)
|
|
message = re.sub(r"^#{1,6}\s+", "", message, flags=re.MULTILINE)
|
|
message = re.sub(r"\[([^\]]+)\]\([^\)]+\)", r"\1", message)
|
|
message = re.sub(r"\n{3,}", "\n\n", message)
|
|
message = message.strip()
|
|
|
|
try:
|
|
from gateway.platforms.base import resolve_proxy_url, proxy_kwargs_for_aiohttp
|
|
_proxy = resolve_proxy_url()
|
|
_sess_kw, _req_kw = proxy_kwargs_for_aiohttp(_proxy)
|
|
creds = f"{account_sid}:{auth_token}"
|
|
encoded = base64.b64encode(creds.encode("ascii")).decode("ascii")
|
|
url = f"https://api.twilio.com/2010-04-01/Accounts/{account_sid}/Messages.json"
|
|
headers = {"Authorization": f"Basic {encoded}"}
|
|
|
|
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=30), **_sess_kw) as session:
|
|
form_data = aiohttp.FormData()
|
|
form_data.add_field("From", from_number)
|
|
form_data.add_field("To", chat_id)
|
|
form_data.add_field("Body", message)
|
|
|
|
async with session.post(url, data=form_data, headers=headers, **_req_kw) as resp:
|
|
body = await resp.json()
|
|
if resp.status >= 400:
|
|
error_msg = body.get("message", str(body))
|
|
return _error(f"Twilio API error ({resp.status}): {error_msg}")
|
|
msg_sid = body.get("sid", "")
|
|
return {"success": True, "platform": "sms", "chat_id": chat_id, "message_id": msg_sid}
|
|
except Exception as e:
|
|
return _error(f"SMS send failed: {e}")
|
|
|
|
|
|
async def _send_mattermost(token, extra, chat_id, message):
|
|
"""Send via Mattermost REST API."""
|
|
try:
|
|
import aiohttp
|
|
except ImportError:
|
|
return {"error": "aiohttp not installed. Run: pip install aiohttp"}
|
|
try:
|
|
base_url = (extra.get("url") or os.getenv("MATTERMOST_URL", "")).rstrip("/")
|
|
token = token or os.getenv("MATTERMOST_TOKEN", "")
|
|
if not base_url or not token:
|
|
return {"error": "Mattermost not configured (MATTERMOST_URL, MATTERMOST_TOKEN required)"}
|
|
url = f"{base_url}/api/v4/posts"
|
|
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
|
|
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=30)) as session:
|
|
async with session.post(url, headers=headers, json={"channel_id": chat_id, "message": message}) as resp:
|
|
if resp.status not in (200, 201):
|
|
body = await resp.text()
|
|
return _error(f"Mattermost API error ({resp.status}): {body}")
|
|
data = await resp.json()
|
|
return {"success": True, "platform": "mattermost", "chat_id": chat_id, "message_id": data.get("id")}
|
|
except Exception as e:
|
|
return _error(f"Mattermost send failed: {e}")
|
|
|
|
|
|
async def _send_matrix(token, extra, chat_id, message):
|
|
"""Send via Matrix Client-Server API.
|
|
|
|
Converts markdown to HTML for rich rendering in Matrix clients.
|
|
Falls back to plain text if the ``markdown`` library is not installed.
|
|
"""
|
|
try:
|
|
import aiohttp
|
|
except ImportError:
|
|
return {"error": "aiohttp not installed. Run: pip install aiohttp"}
|
|
try:
|
|
homeserver = (extra.get("homeserver") or os.getenv("MATRIX_HOMESERVER", "")).rstrip("/")
|
|
token = token or os.getenv("MATRIX_ACCESS_TOKEN", "")
|
|
if not homeserver or not token:
|
|
return {"error": "Matrix not configured (MATRIX_HOMESERVER, MATRIX_ACCESS_TOKEN required)"}
|
|
txn_id = f"hermes_{int(time.time() * 1000)}_{os.urandom(4).hex()}"
|
|
from urllib.parse import quote
|
|
encoded_room = quote(chat_id, safe="")
|
|
url = f"{homeserver}/_matrix/client/v3/rooms/{encoded_room}/send/m.room.message/{txn_id}"
|
|
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
|
|
|
|
# Build message payload with optional HTML formatted_body.
|
|
payload = {"msgtype": "m.text", "body": message}
|
|
try:
|
|
import markdown as _md
|
|
html = _md.markdown(message, extensions=["fenced_code", "tables"])
|
|
# Convert h1-h6 to bold for Element X compatibility.
|
|
html = re.sub(r"<h[1-6]>(.*?)</h[1-6]>", r"<strong>\1</strong>", html)
|
|
payload["format"] = "org.matrix.custom.html"
|
|
payload["formatted_body"] = html
|
|
except ImportError:
|
|
pass
|
|
|
|
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=30)) as session:
|
|
async with session.put(url, headers=headers, json=payload) as resp:
|
|
if resp.status not in (200, 201):
|
|
body = await resp.text()
|
|
return _error(f"Matrix API error ({resp.status}): {body}")
|
|
data = await resp.json()
|
|
return {"success": True, "platform": "matrix", "chat_id": chat_id, "message_id": data.get("event_id")}
|
|
except Exception as e:
|
|
return _error(f"Matrix send failed: {e}")
|
|
|
|
|
|
async def _send_matrix_via_adapter(pconfig, chat_id, message, media_files=None, thread_id=None):
|
|
"""Send via the Matrix adapter so native Matrix media uploads are preserved."""
|
|
try:
|
|
from gateway.platforms.matrix import MatrixAdapter
|
|
except ImportError:
|
|
return {"error": "Matrix dependencies not installed. Run: pip install 'mautrix[encryption]'"}
|
|
|
|
media_files = media_files or []
|
|
|
|
try:
|
|
adapter = MatrixAdapter(pconfig)
|
|
connected = await adapter.connect()
|
|
if not connected:
|
|
return _error("Matrix connect failed")
|
|
|
|
metadata = {"thread_id": thread_id} if thread_id else None
|
|
last_result = None
|
|
|
|
if message.strip():
|
|
last_result = await adapter.send(chat_id, message, metadata=metadata)
|
|
if not last_result.success:
|
|
return _error(f"Matrix send failed: {last_result.error}")
|
|
|
|
for media_path, is_voice in media_files:
|
|
if not os.path.exists(media_path):
|
|
return _error(f"Media file not found: {media_path}")
|
|
|
|
ext = os.path.splitext(media_path)[1].lower()
|
|
if ext in _IMAGE_EXTS:
|
|
last_result = await adapter.send_image_file(chat_id, media_path, metadata=metadata)
|
|
elif ext in _VIDEO_EXTS:
|
|
last_result = await adapter.send_video(chat_id, media_path, metadata=metadata)
|
|
elif ext in _VOICE_EXTS and is_voice:
|
|
last_result = await adapter.send_voice(chat_id, media_path, metadata=metadata)
|
|
elif ext in _AUDIO_EXTS:
|
|
last_result = await adapter.send_voice(chat_id, media_path, metadata=metadata)
|
|
else:
|
|
last_result = await adapter.send_document(chat_id, media_path, metadata=metadata)
|
|
|
|
if not last_result.success:
|
|
return _error(f"Matrix media send failed: {last_result.error}")
|
|
|
|
if last_result is None:
|
|
return {"error": "No deliverable text or media remained after processing MEDIA tags"}
|
|
|
|
return {
|
|
"success": True,
|
|
"platform": "matrix",
|
|
"chat_id": chat_id,
|
|
"message_id": last_result.message_id,
|
|
}
|
|
except Exception as e:
|
|
return _error(f"Matrix send failed: {e}")
|
|
finally:
|
|
try:
|
|
await adapter.disconnect()
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
async def _send_homeassistant(token, extra, chat_id, message):
|
|
"""Send via Home Assistant notify service."""
|
|
try:
|
|
import aiohttp
|
|
except ImportError:
|
|
return {"error": "aiohttp not installed. Run: pip install aiohttp"}
|
|
try:
|
|
hass_url = (extra.get("url") or os.getenv("HASS_URL", "")).rstrip("/")
|
|
token = token or os.getenv("HASS_TOKEN", "")
|
|
if not hass_url or not token:
|
|
return {"error": "Home Assistant not configured (HASS_URL, HASS_TOKEN required)"}
|
|
url = f"{hass_url}/api/services/notify/notify"
|
|
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
|
|
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=30)) as session:
|
|
async with session.post(url, headers=headers, json={"message": message, "target": chat_id}) as resp:
|
|
if resp.status not in (200, 201):
|
|
body = await resp.text()
|
|
return _error(f"Home Assistant API error ({resp.status}): {body}")
|
|
return {"success": True, "platform": "homeassistant", "chat_id": chat_id}
|
|
except Exception as e:
|
|
return _error(f"Home Assistant send failed: {e}")
|
|
|
|
|
|
async def _send_dingtalk(extra, chat_id, message):
|
|
"""Send via DingTalk robot webhook.
|
|
|
|
Note: The gateway's DingTalk adapter uses per-session webhook URLs from
|
|
incoming messages (dingtalk-stream SDK). For cross-platform send_message
|
|
delivery we use a static robot webhook URL instead, which must be
|
|
configured via ``DINGTALK_WEBHOOK_URL`` env var or ``webhook_url`` in the
|
|
platform's extra config.
|
|
"""
|
|
try:
|
|
import httpx
|
|
except ImportError:
|
|
return {"error": "httpx not installed"}
|
|
try:
|
|
webhook_url = extra.get("webhook_url") or os.getenv("DINGTALK_WEBHOOK_URL", "")
|
|
if not webhook_url:
|
|
return {"error": "DingTalk not configured. Set DINGTALK_WEBHOOK_URL env var or webhook_url in dingtalk platform extra config."}
|
|
async with httpx.AsyncClient(timeout=30.0) as client:
|
|
resp = await client.post(
|
|
webhook_url,
|
|
json={"msgtype": "text", "text": {"content": message}},
|
|
)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
if data.get("errcode", 0) != 0:
|
|
return _error(f"DingTalk API error: {data.get('errmsg', 'unknown')}")
|
|
return {"success": True, "platform": "dingtalk", "chat_id": chat_id}
|
|
except Exception as e:
|
|
return _error(f"DingTalk send failed: {e}")
|
|
|
|
|
|
async def _send_wecom(extra, chat_id, message):
|
|
"""Send via WeCom using the adapter's WebSocket send pipeline."""
|
|
try:
|
|
from gateway.platforms.wecom import WeComAdapter, check_wecom_requirements
|
|
if not check_wecom_requirements():
|
|
return {"error": "WeCom requirements not met. Need aiohttp + WECOM_BOT_ID/SECRET."}
|
|
except ImportError:
|
|
return {"error": "WeCom adapter not available."}
|
|
|
|
try:
|
|
from gateway.config import PlatformConfig
|
|
pconfig = PlatformConfig(extra=extra)
|
|
adapter = WeComAdapter(pconfig)
|
|
connected = await adapter.connect()
|
|
if not connected:
|
|
return _error(f"WeCom: failed to connect - {adapter.fatal_error_message or 'unknown error'}")
|
|
try:
|
|
result = await adapter.send(chat_id, message)
|
|
if not result.success:
|
|
return _error(f"WeCom send failed: {result.error}")
|
|
return {"success": True, "platform": "wecom", "chat_id": chat_id, "message_id": result.message_id}
|
|
finally:
|
|
await adapter.disconnect()
|
|
except Exception as e:
|
|
return _error(f"WeCom send failed: {e}")
|
|
|
|
|
|
async def _send_weixin(pconfig, chat_id, message, media_files=None):
|
|
"""Send via Weixin iLink using the native adapter helper."""
|
|
try:
|
|
from gateway.platforms.weixin import check_weixin_requirements, send_weixin_direct
|
|
if not check_weixin_requirements():
|
|
return {"error": "Weixin requirements not met. Need aiohttp + cryptography."}
|
|
except ImportError:
|
|
return {"error": "Weixin adapter not available."}
|
|
|
|
try:
|
|
return await send_weixin_direct(
|
|
extra=pconfig.extra,
|
|
token=pconfig.token,
|
|
chat_id=chat_id,
|
|
message=message,
|
|
media_files=media_files,
|
|
)
|
|
except Exception as e:
|
|
return _error(f"Weixin send failed: {e}")
|
|
|
|
|
|
async def _send_bluebubbles(extra, chat_id, message):
|
|
"""Send via BlueBubbles iMessage server using the adapter's REST API."""
|
|
try:
|
|
from gateway.platforms.bluebubbles import BlueBubblesAdapter, check_bluebubbles_requirements
|
|
if not check_bluebubbles_requirements():
|
|
return {"error": "BlueBubbles requirements not met (need aiohttp + httpx)."}
|
|
except ImportError:
|
|
return {"error": "BlueBubbles adapter not available."}
|
|
|
|
try:
|
|
from gateway.config import PlatformConfig
|
|
pconfig = PlatformConfig(extra=extra)
|
|
adapter = BlueBubblesAdapter(pconfig)
|
|
connected = await adapter.connect()
|
|
if not connected:
|
|
return _error("BlueBubbles: failed to connect to server")
|
|
try:
|
|
result = await adapter.send(chat_id, message)
|
|
if not result.success:
|
|
return _error(f"BlueBubbles send failed: {result.error}")
|
|
return {"success": True, "platform": "bluebubbles", "chat_id": chat_id, "message_id": result.message_id}
|
|
finally:
|
|
await adapter.disconnect()
|
|
except Exception as e:
|
|
return _error(f"BlueBubbles send failed: {e}")
|
|
|
|
|
|
async def _send_feishu(pconfig, chat_id, message, media_files=None, thread_id=None):
|
|
"""Send via Feishu/Lark using the adapter's send pipeline."""
|
|
try:
|
|
from gateway.platforms.feishu import FeishuAdapter, FEISHU_AVAILABLE
|
|
if not FEISHU_AVAILABLE:
|
|
return {"error": "Feishu dependencies not installed. Run: pip install 'hermes-agent[feishu]'"}
|
|
from gateway.platforms.feishu import FEISHU_DOMAIN, LARK_DOMAIN
|
|
except ImportError:
|
|
return {"error": "Feishu dependencies not installed. Run: pip install 'hermes-agent[feishu]'"}
|
|
|
|
media_files = media_files or []
|
|
|
|
try:
|
|
adapter = FeishuAdapter(pconfig)
|
|
domain_name = getattr(adapter, "_domain_name", "feishu")
|
|
domain = FEISHU_DOMAIN if domain_name != "lark" else LARK_DOMAIN
|
|
adapter._client = adapter._build_lark_client(domain)
|
|
metadata = {"thread_id": thread_id} if thread_id else None
|
|
|
|
last_result = None
|
|
if message.strip():
|
|
last_result = await adapter.send(chat_id, message, metadata=metadata)
|
|
if not last_result.success:
|
|
return _error(f"Feishu send failed: {last_result.error}")
|
|
|
|
for media_path, is_voice in media_files:
|
|
if not os.path.exists(media_path):
|
|
return _error(f"Media file not found: {media_path}")
|
|
|
|
ext = os.path.splitext(media_path)[1].lower()
|
|
if ext in _IMAGE_EXTS:
|
|
last_result = await adapter.send_image_file(chat_id, media_path, metadata=metadata)
|
|
elif ext in _VIDEO_EXTS:
|
|
last_result = await adapter.send_video(chat_id, media_path, metadata=metadata)
|
|
elif ext in _VOICE_EXTS and is_voice:
|
|
last_result = await adapter.send_voice(chat_id, media_path, metadata=metadata)
|
|
elif ext in _AUDIO_EXTS:
|
|
last_result = await adapter.send_voice(chat_id, media_path, metadata=metadata)
|
|
else:
|
|
last_result = await adapter.send_document(chat_id, media_path, metadata=metadata)
|
|
|
|
if not last_result.success:
|
|
return _error(f"Feishu media send failed: {last_result.error}")
|
|
|
|
if last_result is None:
|
|
return {"error": "No deliverable text or media remained after processing MEDIA tags"}
|
|
|
|
return {
|
|
"success": True,
|
|
"platform": "feishu",
|
|
"chat_id": chat_id,
|
|
"message_id": last_result.message_id,
|
|
}
|
|
except Exception as e:
|
|
return _error(f"Feishu send failed: {e}")
|
|
|
|
|
|
def _check_send_message():
|
|
"""Gate send_message on gateway running (always available on messaging platforms)."""
|
|
from gateway.session_context import get_session_env
|
|
platform = get_session_env("HERMES_SESSION_PLATFORM", "")
|
|
if platform and platform != "local":
|
|
return True
|
|
try:
|
|
from gateway.status import is_gateway_running
|
|
return is_gateway_running()
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
async def _send_qqbot(pconfig, chat_id, message):
|
|
"""Send via QQBot using the REST API directly (no WebSocket needed).
|
|
|
|
Uses the QQ Bot Open Platform REST endpoints to get an access token
|
|
and post a message. Works for guild channels without requiring
|
|
a running gateway adapter.
|
|
"""
|
|
try:
|
|
import httpx
|
|
except ImportError:
|
|
return _error("QQBot direct send requires httpx. Run: pip install httpx")
|
|
|
|
extra = pconfig.extra or {}
|
|
appid = extra.get("app_id") or os.getenv("QQ_APP_ID", "")
|
|
secret = (pconfig.token or extra.get("client_secret")
|
|
or os.getenv("QQ_CLIENT_SECRET", ""))
|
|
if not appid or not secret:
|
|
return _error("QQBot: QQ_APP_ID / QQ_CLIENT_SECRET not configured.")
|
|
|
|
try:
|
|
async with httpx.AsyncClient(timeout=15) as client:
|
|
# Step 1: Get access token
|
|
token_resp = await client.post(
|
|
"https://bots.qq.com/app/getAppAccessToken",
|
|
json={"appId": str(appid), "clientSecret": str(secret)},
|
|
)
|
|
if token_resp.status_code != 200:
|
|
return _error(f"QQBot token request failed: {token_resp.status_code}")
|
|
token_data = token_resp.json()
|
|
access_token = token_data.get("access_token")
|
|
if not access_token:
|
|
return _error(f"QQBot: no access_token in response")
|
|
|
|
# Step 2: Send message via REST
|
|
headers = {
|
|
"Authorization": f"QQBot {access_token}",
|
|
"Content-Type": "application/json",
|
|
}
|
|
url = f"https://api.sgroup.qq.com/channels/{chat_id}/messages"
|
|
payload = {"content": message[:4000], "msg_type": 0}
|
|
|
|
resp = await client.post(url, json=payload, headers=headers)
|
|
if resp.status_code in (200, 201):
|
|
data = resp.json()
|
|
return {"success": True, "platform": "qqbot", "chat_id": chat_id,
|
|
"message_id": data.get("id")}
|
|
else:
|
|
return _error(f"QQBot send failed: {resp.status_code} {resp.text}")
|
|
except Exception as e:
|
|
return _error(f"QQBot send failed: {e}")
|
|
|
|
|
|
# --- Registry ---
|
|
from tools.registry import registry, tool_error
|
|
|
|
registry.register(
|
|
name="send_message",
|
|
toolset="messaging",
|
|
schema=SEND_MESSAGE_SCHEMA,
|
|
handler=send_message_tool,
|
|
check_fn=_check_send_message,
|
|
emoji="📨",
|
|
)
|