mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-25 00:51:20 +00:00
Adds Apple iMessage as a gateway platform via BlueBubbles macOS server. Architecture: - Webhook-based inbound (event-driven, no polling/dedup needed) - Email/phone → chat GUID resolution for user-friendly addressing - Private API safety (checks helper_connected before tapback/typing) - Inbound attachment downloading (images, audio, documents cached locally) - Markdown stripping for clean iMessage delivery - Smart progress suppression for platforms without message editing Based on PR #5869 by @benjaminsehl (webhook architecture, GUID resolution, Private API safety, progress suppression) with inbound attachment downloading from PR #4588 by @1960697431 (attachment cache routing). Integration points: Platform enum, env config, adapter factory, auth maps, cron delivery, send_message routing, channel directory, platform hints, toolset definition, setup wizard, status display. 27 tests covering config, adapter, webhook parsing, GUID resolution, attachment download routing, toolset consistency, and prompt hints.
982 lines
40 KiB
Python
982 lines
40 KiB
Python
"""Send Message Tool -- cross-channel messaging via platform APIs.
|
|
|
|
Sends a message to a user or channel on any connected messaging platform
|
|
(Telegram, Discord, Slack). Supports listing available targets and resolving
|
|
human-friendly channel names to IDs. Works in both CLI and gateway contexts.
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import re
|
|
import ssl
|
|
import time
|
|
|
|
from agent.redact import redact_sensitive_text
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
_TELEGRAM_TOPIC_TARGET_RE = re.compile(r"^\s*(-?\d+)(?::(\d+))?\s*$")
|
|
_FEISHU_TARGET_RE = re.compile(r"^\s*((?:oc|ou|on|chat|open)_[-A-Za-z0-9]+)(?::([-A-Za-z0-9_]+))?\s*$")
|
|
_IMAGE_EXTS = {".jpg", ".jpeg", ".png", ".webp", ".gif"}
|
|
_VIDEO_EXTS = {".mp4", ".mov", ".avi", ".mkv", ".3gp"}
|
|
_AUDIO_EXTS = {".ogg", ".opus", ".mp3", ".wav", ".m4a"}
|
|
_VOICE_EXTS = {".ogg", ".opus"}
|
|
_URL_SECRET_QUERY_RE = re.compile(
|
|
r"([?&](?:access_token|api[_-]?key|auth[_-]?token|token|signature|sig)=)([^&#\s]+)",
|
|
re.IGNORECASE,
|
|
)
|
|
_GENERIC_SECRET_ASSIGN_RE = re.compile(
|
|
r"\b(access_token|api[_-]?key|auth[_-]?token|signature|sig)\s*=\s*([^\s,;]+)",
|
|
re.IGNORECASE,
|
|
)
|
|
|
|
|
|
def _sanitize_error_text(text) -> str:
|
|
"""Redact secrets from error text before surfacing it to users/models."""
|
|
redacted = redact_sensitive_text(text)
|
|
redacted = _URL_SECRET_QUERY_RE.sub(lambda m: f"{m.group(1)}***", redacted)
|
|
redacted = _GENERIC_SECRET_ASSIGN_RE.sub(lambda m: f"{m.group(1)}=***", redacted)
|
|
return redacted
|
|
|
|
|
|
def _error(message: str) -> dict:
|
|
"""Build a standardized error payload with redacted content."""
|
|
return {"error": _sanitize_error_text(message)}
|
|
|
|
|
|
SEND_MESSAGE_SCHEMA = {
|
|
"name": "send_message",
|
|
"description": (
|
|
"Send a message to a connected messaging platform, or list available targets.\n\n"
|
|
"IMPORTANT: When the user asks to send to a specific channel or person "
|
|
"(not just a bare platform name), call send_message(action='list') FIRST to see "
|
|
"available targets, then send to the correct one.\n"
|
|
"If the user just says a platform name like 'send to telegram', send directly "
|
|
"to the home channel without listing first."
|
|
),
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"action": {
|
|
"type": "string",
|
|
"enum": ["send", "list"],
|
|
"description": "Action to perform. 'send' (default) sends a message. 'list' returns all available channels/contacts across connected platforms."
|
|
},
|
|
"target": {
|
|
"type": "string",
|
|
"description": "Delivery target. Format: 'platform' (uses home channel), 'platform:#channel-name', 'platform:chat_id', or Telegram topic 'telegram:chat_id:thread_id'. Examples: 'telegram', 'telegram:-1001234567890:17585', 'discord:#bot-home', 'slack:#engineering', 'signal:+15551234567'"
|
|
},
|
|
"message": {
|
|
"type": "string",
|
|
"description": "The message text to send"
|
|
}
|
|
},
|
|
"required": []
|
|
}
|
|
}
|
|
|
|
|
|
def send_message_tool(args, **kw):
|
|
"""Handle cross-channel send_message tool calls."""
|
|
action = args.get("action", "send")
|
|
|
|
if action == "list":
|
|
return _handle_list()
|
|
|
|
return _handle_send(args)
|
|
|
|
|
|
def _handle_list():
|
|
"""Return formatted list of available messaging targets."""
|
|
try:
|
|
from gateway.channel_directory import format_directory_for_display
|
|
return json.dumps({"targets": format_directory_for_display()})
|
|
except Exception as e:
|
|
return json.dumps(_error(f"Failed to load channel directory: {e}"))
|
|
|
|
|
|
def _handle_send(args):
|
|
"""Send a message to a platform target."""
|
|
target = args.get("target", "")
|
|
message = args.get("message", "")
|
|
if not target or not message:
|
|
return tool_error("Both 'target' and 'message' are required when action='send'")
|
|
|
|
parts = target.split(":", 1)
|
|
platform_name = parts[0].strip().lower()
|
|
target_ref = parts[1].strip() if len(parts) > 1 else None
|
|
chat_id = None
|
|
thread_id = None
|
|
|
|
if target_ref:
|
|
chat_id, thread_id, is_explicit = _parse_target_ref(platform_name, target_ref)
|
|
else:
|
|
is_explicit = False
|
|
|
|
# Resolve human-friendly channel names to numeric IDs
|
|
if target_ref and not is_explicit:
|
|
try:
|
|
from gateway.channel_directory import resolve_channel_name
|
|
resolved = resolve_channel_name(platform_name, target_ref)
|
|
if resolved:
|
|
chat_id, thread_id, _ = _parse_target_ref(platform_name, resolved)
|
|
else:
|
|
return json.dumps({
|
|
"error": f"Could not resolve '{target_ref}' on {platform_name}. "
|
|
f"Use send_message(action='list') to see available targets."
|
|
})
|
|
except Exception:
|
|
return json.dumps({
|
|
"error": f"Could not resolve '{target_ref}' on {platform_name}. "
|
|
f"Try using a numeric channel ID instead."
|
|
})
|
|
|
|
from tools.interrupt import is_interrupted
|
|
if is_interrupted():
|
|
return tool_error("Interrupted")
|
|
|
|
try:
|
|
from gateway.config import load_gateway_config, Platform
|
|
config = load_gateway_config()
|
|
except Exception as e:
|
|
return json.dumps(_error(f"Failed to load gateway config: {e}"))
|
|
|
|
platform_map = {
|
|
"telegram": Platform.TELEGRAM,
|
|
"discord": Platform.DISCORD,
|
|
"slack": Platform.SLACK,
|
|
"whatsapp": Platform.WHATSAPP,
|
|
"signal": Platform.SIGNAL,
|
|
"bluebubbles": Platform.BLUEBUBBLES,
|
|
"matrix": Platform.MATRIX,
|
|
"mattermost": Platform.MATTERMOST,
|
|
"homeassistant": Platform.HOMEASSISTANT,
|
|
"dingtalk": Platform.DINGTALK,
|
|
"feishu": Platform.FEISHU,
|
|
"wecom": Platform.WECOM,
|
|
"email": Platform.EMAIL,
|
|
"sms": Platform.SMS,
|
|
}
|
|
platform = platform_map.get(platform_name)
|
|
if not platform:
|
|
avail = ", ".join(platform_map.keys())
|
|
return tool_error(f"Unknown platform: {platform_name}. Available: {avail}")
|
|
|
|
pconfig = config.platforms.get(platform)
|
|
if not pconfig or not pconfig.enabled:
|
|
return tool_error(f"Platform '{platform_name}' is not configured. Set up credentials in ~/.hermes/config.yaml or environment variables.")
|
|
|
|
from gateway.platforms.base import BasePlatformAdapter
|
|
|
|
media_files, cleaned_message = BasePlatformAdapter.extract_media(message)
|
|
mirror_text = cleaned_message.strip() or _describe_media_for_mirror(media_files)
|
|
|
|
used_home_channel = False
|
|
if not chat_id:
|
|
home = config.get_home_channel(platform)
|
|
if home:
|
|
chat_id = home.chat_id
|
|
used_home_channel = True
|
|
else:
|
|
return json.dumps({
|
|
"error": f"No home channel set for {platform_name} to determine where to send the message. "
|
|
f"Either specify a channel directly with '{platform_name}:CHANNEL_NAME', "
|
|
f"or set a home channel via: hermes config set {platform_name.upper()}_HOME_CHANNEL <channel_id>"
|
|
})
|
|
|
|
duplicate_skip = _maybe_skip_cron_duplicate_send(platform_name, chat_id, thread_id)
|
|
if duplicate_skip:
|
|
return json.dumps(duplicate_skip)
|
|
|
|
try:
|
|
from model_tools import _run_async
|
|
result = _run_async(
|
|
_send_to_platform(
|
|
platform,
|
|
pconfig,
|
|
chat_id,
|
|
cleaned_message,
|
|
thread_id=thread_id,
|
|
media_files=media_files,
|
|
)
|
|
)
|
|
if used_home_channel and isinstance(result, dict) and result.get("success"):
|
|
result["note"] = f"Sent to {platform_name} home channel (chat_id: {chat_id})"
|
|
|
|
# Mirror the sent message into the target's gateway session
|
|
if isinstance(result, dict) and result.get("success") and mirror_text:
|
|
try:
|
|
from gateway.mirror import mirror_to_session
|
|
source_label = os.getenv("HERMES_SESSION_PLATFORM", "cli")
|
|
if mirror_to_session(platform_name, chat_id, mirror_text, source_label=source_label, thread_id=thread_id):
|
|
result["mirrored"] = True
|
|
except Exception:
|
|
pass
|
|
|
|
if isinstance(result, dict) and "error" in result:
|
|
result["error"] = _sanitize_error_text(result["error"])
|
|
return json.dumps(result)
|
|
except Exception as e:
|
|
return json.dumps(_error(f"Send failed: {e}"))
|
|
|
|
|
|
def _parse_target_ref(platform_name: str, target_ref: str):
|
|
"""Parse a tool target into chat_id/thread_id and whether it is explicit."""
|
|
if platform_name == "telegram":
|
|
match = _TELEGRAM_TOPIC_TARGET_RE.fullmatch(target_ref)
|
|
if match:
|
|
return match.group(1), match.group(2), True
|
|
if platform_name == "feishu":
|
|
match = _FEISHU_TARGET_RE.fullmatch(target_ref)
|
|
if match:
|
|
return match.group(1), match.group(2), True
|
|
if target_ref.lstrip("-").isdigit():
|
|
return target_ref, None, True
|
|
return None, None, False
|
|
|
|
|
|
def _describe_media_for_mirror(media_files):
|
|
"""Return a human-readable mirror summary when a message only contains media."""
|
|
if not media_files:
|
|
return ""
|
|
if len(media_files) == 1:
|
|
media_path, is_voice = media_files[0]
|
|
ext = os.path.splitext(media_path)[1].lower()
|
|
if is_voice and ext in _VOICE_EXTS:
|
|
return "[Sent voice message]"
|
|
if ext in _IMAGE_EXTS:
|
|
return "[Sent image attachment]"
|
|
if ext in _VIDEO_EXTS:
|
|
return "[Sent video attachment]"
|
|
if ext in _AUDIO_EXTS:
|
|
return "[Sent audio attachment]"
|
|
return "[Sent document attachment]"
|
|
return f"[Sent {len(media_files)} media attachments]"
|
|
|
|
|
|
def _get_cron_auto_delivery_target():
|
|
"""Return the cron scheduler's auto-delivery target for the current run, if any."""
|
|
platform = os.getenv("HERMES_CRON_AUTO_DELIVER_PLATFORM", "").strip().lower()
|
|
chat_id = os.getenv("HERMES_CRON_AUTO_DELIVER_CHAT_ID", "").strip()
|
|
if not platform or not chat_id:
|
|
return None
|
|
thread_id = os.getenv("HERMES_CRON_AUTO_DELIVER_THREAD_ID", "").strip() or None
|
|
return {
|
|
"platform": platform,
|
|
"chat_id": chat_id,
|
|
"thread_id": thread_id,
|
|
}
|
|
|
|
|
|
def _maybe_skip_cron_duplicate_send(platform_name: str, chat_id: str, thread_id: str | None):
|
|
"""Skip redundant cron send_message calls when the scheduler will auto-deliver there."""
|
|
auto_target = _get_cron_auto_delivery_target()
|
|
if not auto_target:
|
|
return None
|
|
|
|
same_target = (
|
|
auto_target["platform"] == platform_name
|
|
and str(auto_target["chat_id"]) == str(chat_id)
|
|
and auto_target.get("thread_id") == thread_id
|
|
)
|
|
if not same_target:
|
|
return None
|
|
|
|
target_label = f"{platform_name}:{chat_id}"
|
|
if thread_id is not None:
|
|
target_label += f":{thread_id}"
|
|
|
|
return {
|
|
"success": True,
|
|
"skipped": True,
|
|
"reason": "cron_auto_delivery_duplicate_target",
|
|
"target": target_label,
|
|
"note": (
|
|
f"Skipped send_message to {target_label}. This cron job will already auto-deliver "
|
|
"its final response to that same target. Put the intended user-facing content in "
|
|
"your final response instead, or use a different target if you want an additional message."
|
|
),
|
|
}
|
|
|
|
|
|
async def _send_to_platform(platform, pconfig, chat_id, message, thread_id=None, media_files=None):
|
|
"""Route a message to the appropriate platform sender.
|
|
|
|
Long messages are automatically chunked to fit within platform limits
|
|
using the same smart-splitting algorithm as the gateway adapters
|
|
(preserves code-block boundaries, adds part indicators).
|
|
"""
|
|
from gateway.config import Platform
|
|
from gateway.platforms.base import BasePlatformAdapter
|
|
from gateway.platforms.telegram import TelegramAdapter
|
|
from gateway.platforms.discord import DiscordAdapter
|
|
from gateway.platforms.slack import SlackAdapter
|
|
|
|
# Feishu adapter import is optional (requires lark-oapi)
|
|
try:
|
|
from gateway.platforms.feishu import FeishuAdapter
|
|
_feishu_available = True
|
|
except ImportError:
|
|
_feishu_available = False
|
|
|
|
media_files = media_files or []
|
|
|
|
# Platform message length limits (from adapter class attributes)
|
|
_MAX_LENGTHS = {
|
|
Platform.TELEGRAM: TelegramAdapter.MAX_MESSAGE_LENGTH,
|
|
Platform.DISCORD: DiscordAdapter.MAX_MESSAGE_LENGTH,
|
|
Platform.SLACK: SlackAdapter.MAX_MESSAGE_LENGTH,
|
|
}
|
|
if _feishu_available:
|
|
_MAX_LENGTHS[Platform.FEISHU] = FeishuAdapter.MAX_MESSAGE_LENGTH
|
|
|
|
# Smart-chunk the message to fit within platform limits.
|
|
# For short messages or platforms without a known limit this is a no-op.
|
|
max_len = _MAX_LENGTHS.get(platform)
|
|
if max_len:
|
|
chunks = BasePlatformAdapter.truncate_message(message, max_len)
|
|
else:
|
|
chunks = [message]
|
|
|
|
# --- Telegram: special handling for media attachments ---
|
|
if platform == Platform.TELEGRAM:
|
|
last_result = None
|
|
for i, chunk in enumerate(chunks):
|
|
is_last = (i == len(chunks) - 1)
|
|
result = await _send_telegram(
|
|
pconfig.token,
|
|
chat_id,
|
|
chunk,
|
|
media_files=media_files if is_last else [],
|
|
thread_id=thread_id,
|
|
)
|
|
if isinstance(result, dict) and result.get("error"):
|
|
return result
|
|
last_result = result
|
|
return last_result
|
|
|
|
# --- Non-Telegram platforms ---
|
|
if media_files and not message.strip():
|
|
return {
|
|
"error": (
|
|
f"send_message MEDIA delivery is currently only supported for telegram; "
|
|
f"target {platform.value} had only media attachments"
|
|
)
|
|
}
|
|
warning = None
|
|
if media_files:
|
|
warning = (
|
|
f"MEDIA attachments were omitted for {platform.value}; "
|
|
"native send_message media delivery is currently only supported for telegram"
|
|
)
|
|
|
|
last_result = None
|
|
for chunk in chunks:
|
|
if platform == Platform.DISCORD:
|
|
result = await _send_discord(pconfig.token, chat_id, chunk)
|
|
elif platform == Platform.SLACK:
|
|
result = await _send_slack(pconfig.token, chat_id, chunk)
|
|
elif platform == Platform.WHATSAPP:
|
|
result = await _send_whatsapp(pconfig.extra, chat_id, chunk)
|
|
elif platform == Platform.SIGNAL:
|
|
result = await _send_signal(pconfig.extra, chat_id, chunk)
|
|
elif platform == Platform.EMAIL:
|
|
result = await _send_email(pconfig.extra, chat_id, chunk)
|
|
elif platform == Platform.SMS:
|
|
result = await _send_sms(pconfig.api_key, chat_id, chunk)
|
|
elif platform == Platform.MATTERMOST:
|
|
result = await _send_mattermost(pconfig.token, pconfig.extra, chat_id, chunk)
|
|
elif platform == Platform.MATRIX:
|
|
result = await _send_matrix(pconfig.token, pconfig.extra, chat_id, chunk)
|
|
elif platform == Platform.HOMEASSISTANT:
|
|
result = await _send_homeassistant(pconfig.token, pconfig.extra, chat_id, chunk)
|
|
elif platform == Platform.DINGTALK:
|
|
result = await _send_dingtalk(pconfig.extra, chat_id, chunk)
|
|
elif platform == Platform.FEISHU:
|
|
result = await _send_feishu(pconfig, chat_id, chunk, thread_id=thread_id)
|
|
elif platform == Platform.WECOM:
|
|
result = await _send_wecom(pconfig.extra, chat_id, chunk)
|
|
elif platform == Platform.BLUEBUBBLES:
|
|
result = await _send_bluebubbles(pconfig.extra, chat_id, chunk)
|
|
else:
|
|
result = {"error": f"Direct sending not yet implemented for {platform.value}"}
|
|
|
|
if isinstance(result, dict) and result.get("error"):
|
|
return result
|
|
last_result = result
|
|
|
|
if warning and isinstance(last_result, dict) and last_result.get("success"):
|
|
warnings = list(last_result.get("warnings", []))
|
|
warnings.append(warning)
|
|
last_result["warnings"] = warnings
|
|
return last_result
|
|
|
|
|
|
async def _send_telegram(token, chat_id, message, media_files=None, thread_id=None):
|
|
"""Send via Telegram Bot API (one-shot, no polling needed).
|
|
|
|
Applies markdown→MarkdownV2 formatting (same as the gateway adapter)
|
|
so that bold, links, and headers render correctly. If the message
|
|
already contains HTML tags, it is sent with ``parse_mode='HTML'``
|
|
instead, bypassing MarkdownV2 conversion.
|
|
"""
|
|
try:
|
|
from telegram import Bot
|
|
from telegram.constants import ParseMode
|
|
|
|
# Auto-detect HTML tags — if present, skip MarkdownV2 and send as HTML.
|
|
# Inspired by github.com/ashaney — PR #1568.
|
|
_has_html = bool(re.search(r'<[a-zA-Z/][^>]*>', message))
|
|
|
|
if _has_html:
|
|
formatted = message
|
|
send_parse_mode = ParseMode.HTML
|
|
else:
|
|
# Reuse the gateway adapter's format_message for markdown→MarkdownV2
|
|
try:
|
|
from gateway.platforms.telegram import TelegramAdapter
|
|
_adapter = TelegramAdapter.__new__(TelegramAdapter)
|
|
formatted = _adapter.format_message(message)
|
|
except Exception:
|
|
# Fallback: send as-is if formatting unavailable
|
|
formatted = message
|
|
send_parse_mode = ParseMode.MARKDOWN_V2
|
|
|
|
bot = Bot(token=token)
|
|
int_chat_id = int(chat_id)
|
|
media_files = media_files or []
|
|
thread_kwargs = {}
|
|
if thread_id is not None:
|
|
thread_kwargs["message_thread_id"] = int(thread_id)
|
|
|
|
last_msg = None
|
|
warnings = []
|
|
|
|
if formatted.strip():
|
|
try:
|
|
last_msg = await bot.send_message(
|
|
chat_id=int_chat_id, text=formatted,
|
|
parse_mode=send_parse_mode, **thread_kwargs
|
|
)
|
|
except Exception as md_error:
|
|
# Parse failed, fall back to plain text
|
|
if "parse" in str(md_error).lower() or "markdown" in str(md_error).lower() or "html" in str(md_error).lower():
|
|
logger.warning(
|
|
"Parse mode %s failed in _send_telegram, falling back to plain text: %s",
|
|
send_parse_mode,
|
|
_sanitize_error_text(md_error),
|
|
)
|
|
if not _has_html:
|
|
try:
|
|
from gateway.platforms.telegram import _strip_mdv2
|
|
plain = _strip_mdv2(formatted)
|
|
except Exception:
|
|
plain = message
|
|
else:
|
|
plain = message
|
|
last_msg = await bot.send_message(
|
|
chat_id=int_chat_id, text=plain,
|
|
parse_mode=None, **thread_kwargs
|
|
)
|
|
else:
|
|
raise
|
|
|
|
for media_path, is_voice in media_files:
|
|
if not os.path.exists(media_path):
|
|
warning = f"Media file not found, skipping: {media_path}"
|
|
logger.warning(warning)
|
|
warnings.append(warning)
|
|
continue
|
|
|
|
ext = os.path.splitext(media_path)[1].lower()
|
|
try:
|
|
with open(media_path, "rb") as f:
|
|
if ext in _IMAGE_EXTS:
|
|
last_msg = await bot.send_photo(
|
|
chat_id=int_chat_id, photo=f, **thread_kwargs
|
|
)
|
|
elif ext in _VIDEO_EXTS:
|
|
last_msg = await bot.send_video(
|
|
chat_id=int_chat_id, video=f, **thread_kwargs
|
|
)
|
|
elif ext in _VOICE_EXTS and is_voice:
|
|
last_msg = await bot.send_voice(
|
|
chat_id=int_chat_id, voice=f, **thread_kwargs
|
|
)
|
|
elif ext in _AUDIO_EXTS:
|
|
last_msg = await bot.send_audio(
|
|
chat_id=int_chat_id, audio=f, **thread_kwargs
|
|
)
|
|
else:
|
|
last_msg = await bot.send_document(
|
|
chat_id=int_chat_id, document=f, **thread_kwargs
|
|
)
|
|
except Exception as e:
|
|
warning = _sanitize_error_text(f"Failed to send media {media_path}: {e}")
|
|
logger.error(warning)
|
|
warnings.append(warning)
|
|
|
|
if last_msg is None:
|
|
error = "No deliverable text or media remained after processing MEDIA tags"
|
|
if warnings:
|
|
return {"error": error, "warnings": warnings}
|
|
return {"error": error}
|
|
|
|
result = {
|
|
"success": True,
|
|
"platform": "telegram",
|
|
"chat_id": chat_id,
|
|
"message_id": str(last_msg.message_id),
|
|
}
|
|
if warnings:
|
|
result["warnings"] = warnings
|
|
return result
|
|
except ImportError:
|
|
return {"error": "python-telegram-bot not installed. Run: pip install python-telegram-bot"}
|
|
except Exception as e:
|
|
return _error(f"Telegram send failed: {e}")
|
|
|
|
|
|
async def _send_discord(token, chat_id, message):
|
|
"""Send a single message via Discord REST API (no websocket client needed).
|
|
|
|
Chunking is handled by _send_to_platform() before this is called.
|
|
"""
|
|
try:
|
|
import aiohttp
|
|
except ImportError:
|
|
return {"error": "aiohttp not installed. Run: pip install aiohttp"}
|
|
try:
|
|
url = f"https://discord.com/api/v10/channels/{chat_id}/messages"
|
|
headers = {"Authorization": f"Bot {token}", "Content-Type": "application/json"}
|
|
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=30)) as session:
|
|
async with session.post(url, headers=headers, json={"content": message}) as resp:
|
|
if resp.status not in (200, 201):
|
|
body = await resp.text()
|
|
return _error(f"Discord API error ({resp.status}): {body}")
|
|
data = await resp.json()
|
|
return {"success": True, "platform": "discord", "chat_id": chat_id, "message_id": data.get("id")}
|
|
except Exception as e:
|
|
return _error(f"Discord send failed: {e}")
|
|
|
|
|
|
async def _send_slack(token, chat_id, message):
|
|
"""Send via Slack Web API."""
|
|
try:
|
|
import aiohttp
|
|
except ImportError:
|
|
return {"error": "aiohttp not installed. Run: pip install aiohttp"}
|
|
try:
|
|
url = "https://slack.com/api/chat.postMessage"
|
|
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
|
|
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=30)) as session:
|
|
async with session.post(url, headers=headers, json={"channel": chat_id, "text": message}) as resp:
|
|
data = await resp.json()
|
|
if data.get("ok"):
|
|
return {"success": True, "platform": "slack", "chat_id": chat_id, "message_id": data.get("ts")}
|
|
return _error(f"Slack API error: {data.get('error', 'unknown')}")
|
|
except Exception as e:
|
|
return _error(f"Slack send failed: {e}")
|
|
|
|
|
|
async def _send_whatsapp(extra, chat_id, message):
|
|
"""Send via the local WhatsApp bridge HTTP API."""
|
|
try:
|
|
import aiohttp
|
|
except ImportError:
|
|
return {"error": "aiohttp not installed. Run: pip install aiohttp"}
|
|
try:
|
|
bridge_port = extra.get("bridge_port", 3000)
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.post(
|
|
f"http://localhost:{bridge_port}/send",
|
|
json={"chatId": chat_id, "message": message},
|
|
timeout=aiohttp.ClientTimeout(total=30),
|
|
) as resp:
|
|
if resp.status == 200:
|
|
data = await resp.json()
|
|
return {
|
|
"success": True,
|
|
"platform": "whatsapp",
|
|
"chat_id": chat_id,
|
|
"message_id": data.get("messageId"),
|
|
}
|
|
body = await resp.text()
|
|
return _error(f"WhatsApp bridge error ({resp.status}): {body}")
|
|
except Exception as e:
|
|
return _error(f"WhatsApp send failed: {e}")
|
|
|
|
|
|
async def _send_signal(extra, chat_id, message):
|
|
"""Send via signal-cli JSON-RPC API."""
|
|
try:
|
|
import httpx
|
|
except ImportError:
|
|
return {"error": "httpx not installed"}
|
|
try:
|
|
http_url = extra.get("http_url", "http://127.0.0.1:8080").rstrip("/")
|
|
account = extra.get("account", "")
|
|
if not account:
|
|
return {"error": "Signal account not configured"}
|
|
|
|
params = {"account": account, "message": message}
|
|
if chat_id.startswith("group:"):
|
|
params["groupId"] = chat_id[6:]
|
|
else:
|
|
params["recipient"] = [chat_id]
|
|
|
|
payload = {
|
|
"jsonrpc": "2.0",
|
|
"method": "send",
|
|
"params": params,
|
|
"id": f"send_{int(time.time() * 1000)}",
|
|
}
|
|
|
|
async with httpx.AsyncClient(timeout=30.0) as client:
|
|
resp = await client.post(f"{http_url}/api/v1/rpc", json=payload)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
if "error" in data:
|
|
return _error(f"Signal RPC error: {data['error']}")
|
|
return {"success": True, "platform": "signal", "chat_id": chat_id}
|
|
except Exception as e:
|
|
return _error(f"Signal send failed: {e}")
|
|
|
|
|
|
async def _send_email(extra, chat_id, message):
|
|
"""Send via SMTP (one-shot, no persistent connection needed)."""
|
|
import smtplib
|
|
from email.mime.text import MIMEText
|
|
|
|
address = extra.get("address") or os.getenv("EMAIL_ADDRESS", "")
|
|
password = os.getenv("EMAIL_PASSWORD", "")
|
|
smtp_host = extra.get("smtp_host") or os.getenv("EMAIL_SMTP_HOST", "")
|
|
smtp_port = int(os.getenv("EMAIL_SMTP_PORT", "587"))
|
|
|
|
if not all([address, password, smtp_host]):
|
|
return {"error": "Email not configured (EMAIL_ADDRESS, EMAIL_PASSWORD, EMAIL_SMTP_HOST required)"}
|
|
|
|
try:
|
|
msg = MIMEText(message, "plain", "utf-8")
|
|
msg["From"] = address
|
|
msg["To"] = chat_id
|
|
msg["Subject"] = "Hermes Agent"
|
|
|
|
server = smtplib.SMTP(smtp_host, smtp_port)
|
|
server.starttls(context=ssl.create_default_context())
|
|
server.login(address, password)
|
|
server.send_message(msg)
|
|
server.quit()
|
|
return {"success": True, "platform": "email", "chat_id": chat_id}
|
|
except Exception as e:
|
|
return _error(f"Email send failed: {e}")
|
|
|
|
|
|
async def _send_sms(auth_token, chat_id, message):
|
|
"""Send a single SMS via Twilio REST API.
|
|
|
|
Uses HTTP Basic auth (Account SID : Auth Token) and form-encoded POST.
|
|
Chunking is handled by _send_to_platform() before this is called.
|
|
"""
|
|
try:
|
|
import aiohttp
|
|
except ImportError:
|
|
return {"error": "aiohttp not installed. Run: pip install aiohttp"}
|
|
|
|
import base64
|
|
|
|
account_sid = os.getenv("TWILIO_ACCOUNT_SID", "")
|
|
from_number = os.getenv("TWILIO_PHONE_NUMBER", "")
|
|
if not account_sid or not auth_token or not from_number:
|
|
return {"error": "SMS not configured (TWILIO_ACCOUNT_SID, TWILIO_AUTH_TOKEN, TWILIO_PHONE_NUMBER required)"}
|
|
|
|
# Strip markdown — SMS renders it as literal characters
|
|
message = re.sub(r"\*\*(.+?)\*\*", r"\1", message, flags=re.DOTALL)
|
|
message = re.sub(r"\*(.+?)\*", r"\1", message, flags=re.DOTALL)
|
|
message = re.sub(r"__(.+?)__", r"\1", message, flags=re.DOTALL)
|
|
message = re.sub(r"_(.+?)_", r"\1", message, flags=re.DOTALL)
|
|
message = re.sub(r"```[a-z]*\n?", "", message)
|
|
message = re.sub(r"`(.+?)`", r"\1", message)
|
|
message = re.sub(r"^#{1,6}\s+", "", message, flags=re.MULTILINE)
|
|
message = re.sub(r"\[([^\]]+)\]\([^\)]+\)", r"\1", message)
|
|
message = re.sub(r"\n{3,}", "\n\n", message)
|
|
message = message.strip()
|
|
|
|
try:
|
|
creds = f"{account_sid}:{auth_token}"
|
|
encoded = base64.b64encode(creds.encode("ascii")).decode("ascii")
|
|
url = f"https://api.twilio.com/2010-04-01/Accounts/{account_sid}/Messages.json"
|
|
headers = {"Authorization": f"Basic {encoded}"}
|
|
|
|
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=30)) as session:
|
|
form_data = aiohttp.FormData()
|
|
form_data.add_field("From", from_number)
|
|
form_data.add_field("To", chat_id)
|
|
form_data.add_field("Body", message)
|
|
|
|
async with session.post(url, data=form_data, headers=headers) as resp:
|
|
body = await resp.json()
|
|
if resp.status >= 400:
|
|
error_msg = body.get("message", str(body))
|
|
return _error(f"Twilio API error ({resp.status}): {error_msg}")
|
|
msg_sid = body.get("sid", "")
|
|
return {"success": True, "platform": "sms", "chat_id": chat_id, "message_id": msg_sid}
|
|
except Exception as e:
|
|
return _error(f"SMS send failed: {e}")
|
|
|
|
|
|
async def _send_mattermost(token, extra, chat_id, message):
|
|
"""Send via Mattermost REST API."""
|
|
try:
|
|
import aiohttp
|
|
except ImportError:
|
|
return {"error": "aiohttp not installed. Run: pip install aiohttp"}
|
|
try:
|
|
base_url = (extra.get("url") or os.getenv("MATTERMOST_URL", "")).rstrip("/")
|
|
token = token or os.getenv("MATTERMOST_TOKEN", "")
|
|
if not base_url or not token:
|
|
return {"error": "Mattermost not configured (MATTERMOST_URL, MATTERMOST_TOKEN required)"}
|
|
url = f"{base_url}/api/v4/posts"
|
|
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
|
|
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=30)) as session:
|
|
async with session.post(url, headers=headers, json={"channel_id": chat_id, "message": message}) as resp:
|
|
if resp.status not in (200, 201):
|
|
body = await resp.text()
|
|
return _error(f"Mattermost API error ({resp.status}): {body}")
|
|
data = await resp.json()
|
|
return {"success": True, "platform": "mattermost", "chat_id": chat_id, "message_id": data.get("id")}
|
|
except Exception as e:
|
|
return _error(f"Mattermost send failed: {e}")
|
|
|
|
|
|
async def _send_matrix(token, extra, chat_id, message):
|
|
"""Send via Matrix Client-Server API.
|
|
|
|
Converts markdown to HTML for rich rendering in Matrix clients.
|
|
Falls back to plain text if the ``markdown`` library is not installed.
|
|
"""
|
|
try:
|
|
import aiohttp
|
|
except ImportError:
|
|
return {"error": "aiohttp not installed. Run: pip install aiohttp"}
|
|
try:
|
|
homeserver = (extra.get("homeserver") or os.getenv("MATRIX_HOMESERVER", "")).rstrip("/")
|
|
token = token or os.getenv("MATRIX_ACCESS_TOKEN", "")
|
|
if not homeserver or not token:
|
|
return {"error": "Matrix not configured (MATRIX_HOMESERVER, MATRIX_ACCESS_TOKEN required)"}
|
|
txn_id = f"hermes_{int(time.time() * 1000)}_{os.urandom(4).hex()}"
|
|
url = f"{homeserver}/_matrix/client/v3/rooms/{chat_id}/send/m.room.message/{txn_id}"
|
|
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
|
|
|
|
# Build message payload with optional HTML formatted_body.
|
|
payload = {"msgtype": "m.text", "body": message}
|
|
try:
|
|
import markdown as _md
|
|
html = _md.markdown(message, extensions=["fenced_code", "tables"])
|
|
# Convert h1-h6 to bold for Element X compatibility.
|
|
html = re.sub(r"<h[1-6]>(.*?)</h[1-6]>", r"<strong>\1</strong>", html)
|
|
payload["format"] = "org.matrix.custom.html"
|
|
payload["formatted_body"] = html
|
|
except ImportError:
|
|
pass
|
|
|
|
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=30)) as session:
|
|
async with session.put(url, headers=headers, json=payload) as resp:
|
|
if resp.status not in (200, 201):
|
|
body = await resp.text()
|
|
return _error(f"Matrix API error ({resp.status}): {body}")
|
|
data = await resp.json()
|
|
return {"success": True, "platform": "matrix", "chat_id": chat_id, "message_id": data.get("event_id")}
|
|
except Exception as e:
|
|
return _error(f"Matrix send failed: {e}")
|
|
|
|
|
|
async def _send_homeassistant(token, extra, chat_id, message):
|
|
"""Send via Home Assistant notify service."""
|
|
try:
|
|
import aiohttp
|
|
except ImportError:
|
|
return {"error": "aiohttp not installed. Run: pip install aiohttp"}
|
|
try:
|
|
hass_url = (extra.get("url") or os.getenv("HASS_URL", "")).rstrip("/")
|
|
token = token or os.getenv("HASS_TOKEN", "")
|
|
if not hass_url or not token:
|
|
return {"error": "Home Assistant not configured (HASS_URL, HASS_TOKEN required)"}
|
|
url = f"{hass_url}/api/services/notify/notify"
|
|
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
|
|
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=30)) as session:
|
|
async with session.post(url, headers=headers, json={"message": message, "target": chat_id}) as resp:
|
|
if resp.status not in (200, 201):
|
|
body = await resp.text()
|
|
return _error(f"Home Assistant API error ({resp.status}): {body}")
|
|
return {"success": True, "platform": "homeassistant", "chat_id": chat_id}
|
|
except Exception as e:
|
|
return _error(f"Home Assistant send failed: {e}")
|
|
|
|
|
|
async def _send_dingtalk(extra, chat_id, message):
|
|
"""Send via DingTalk robot webhook.
|
|
|
|
Note: The gateway's DingTalk adapter uses per-session webhook URLs from
|
|
incoming messages (dingtalk-stream SDK). For cross-platform send_message
|
|
delivery we use a static robot webhook URL instead, which must be
|
|
configured via ``DINGTALK_WEBHOOK_URL`` env var or ``webhook_url`` in the
|
|
platform's extra config.
|
|
"""
|
|
try:
|
|
import httpx
|
|
except ImportError:
|
|
return {"error": "httpx not installed"}
|
|
try:
|
|
webhook_url = extra.get("webhook_url") or os.getenv("DINGTALK_WEBHOOK_URL", "")
|
|
if not webhook_url:
|
|
return {"error": "DingTalk not configured. Set DINGTALK_WEBHOOK_URL env var or webhook_url in dingtalk platform extra config."}
|
|
async with httpx.AsyncClient(timeout=30.0) as client:
|
|
resp = await client.post(
|
|
webhook_url,
|
|
json={"msgtype": "text", "text": {"content": message}},
|
|
)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
if data.get("errcode", 0) != 0:
|
|
return _error(f"DingTalk API error: {data.get('errmsg', 'unknown')}")
|
|
return {"success": True, "platform": "dingtalk", "chat_id": chat_id}
|
|
except Exception as e:
|
|
return _error(f"DingTalk send failed: {e}")
|
|
|
|
|
|
async def _send_wecom(extra, chat_id, message):
|
|
"""Send via WeCom using the adapter's WebSocket send pipeline."""
|
|
try:
|
|
from gateway.platforms.wecom import WeComAdapter, check_wecom_requirements
|
|
if not check_wecom_requirements():
|
|
return {"error": "WeCom requirements not met. Need aiohttp + WECOM_BOT_ID/SECRET."}
|
|
except ImportError:
|
|
return {"error": "WeCom adapter not available."}
|
|
|
|
try:
|
|
from gateway.config import PlatformConfig
|
|
pconfig = PlatformConfig(extra=extra)
|
|
adapter = WeComAdapter(pconfig)
|
|
connected = await adapter.connect()
|
|
if not connected:
|
|
return _error(f"WeCom: failed to connect - {adapter.fatal_error_message or 'unknown error'}")
|
|
try:
|
|
result = await adapter.send(chat_id, message)
|
|
if not result.success:
|
|
return _error(f"WeCom send failed: {result.error}")
|
|
return {"success": True, "platform": "wecom", "chat_id": chat_id, "message_id": result.message_id}
|
|
finally:
|
|
await adapter.disconnect()
|
|
except Exception as e:
|
|
return _error(f"WeCom send failed: {e}")
|
|
|
|
|
|
async def _send_bluebubbles(extra, chat_id, message):
|
|
"""Send via BlueBubbles iMessage server using the adapter's REST API."""
|
|
try:
|
|
from gateway.platforms.bluebubbles import BlueBubblesAdapter, check_bluebubbles_requirements
|
|
if not check_bluebubbles_requirements():
|
|
return {"error": "BlueBubbles requirements not met (need aiohttp + httpx)."}
|
|
except ImportError:
|
|
return {"error": "BlueBubbles adapter not available."}
|
|
|
|
try:
|
|
from gateway.config import PlatformConfig
|
|
pconfig = PlatformConfig(extra=extra)
|
|
adapter = BlueBubblesAdapter(pconfig)
|
|
connected = await adapter.connect()
|
|
if not connected:
|
|
return _error("BlueBubbles: failed to connect to server")
|
|
try:
|
|
result = await adapter.send(chat_id, message)
|
|
if not result.success:
|
|
return _error(f"BlueBubbles send failed: {result.error}")
|
|
return {"success": True, "platform": "bluebubbles", "chat_id": chat_id, "message_id": result.message_id}
|
|
finally:
|
|
await adapter.disconnect()
|
|
except Exception as e:
|
|
return _error(f"BlueBubbles send failed: {e}")
|
|
|
|
|
|
async def _send_feishu(pconfig, chat_id, message, media_files=None, thread_id=None):
|
|
"""Send via Feishu/Lark using the adapter's send pipeline."""
|
|
try:
|
|
from gateway.platforms.feishu import FeishuAdapter, FEISHU_AVAILABLE
|
|
if not FEISHU_AVAILABLE:
|
|
return {"error": "Feishu dependencies not installed. Run: pip install 'hermes-agent[feishu]'"}
|
|
from gateway.platforms.feishu import FEISHU_DOMAIN, LARK_DOMAIN
|
|
except ImportError:
|
|
return {"error": "Feishu dependencies not installed. Run: pip install 'hermes-agent[feishu]'"}
|
|
|
|
media_files = media_files or []
|
|
|
|
try:
|
|
adapter = FeishuAdapter(pconfig)
|
|
domain_name = getattr(adapter, "_domain_name", "feishu")
|
|
domain = FEISHU_DOMAIN if domain_name != "lark" else LARK_DOMAIN
|
|
adapter._client = adapter._build_lark_client(domain)
|
|
metadata = {"thread_id": thread_id} if thread_id else None
|
|
|
|
last_result = None
|
|
if message.strip():
|
|
last_result = await adapter.send(chat_id, message, metadata=metadata)
|
|
if not last_result.success:
|
|
return _error(f"Feishu send failed: {last_result.error}")
|
|
|
|
for media_path, is_voice in media_files:
|
|
if not os.path.exists(media_path):
|
|
return _error(f"Media file not found: {media_path}")
|
|
|
|
ext = os.path.splitext(media_path)[1].lower()
|
|
if ext in _IMAGE_EXTS:
|
|
last_result = await adapter.send_image_file(chat_id, media_path, metadata=metadata)
|
|
elif ext in _VIDEO_EXTS:
|
|
last_result = await adapter.send_video(chat_id, media_path, metadata=metadata)
|
|
elif ext in _VOICE_EXTS and is_voice:
|
|
last_result = await adapter.send_voice(chat_id, media_path, metadata=metadata)
|
|
elif ext in _AUDIO_EXTS:
|
|
last_result = await adapter.send_voice(chat_id, media_path, metadata=metadata)
|
|
else:
|
|
last_result = await adapter.send_document(chat_id, media_path, metadata=metadata)
|
|
|
|
if not last_result.success:
|
|
return _error(f"Feishu media send failed: {last_result.error}")
|
|
|
|
if last_result is None:
|
|
return {"error": "No deliverable text or media remained after processing MEDIA tags"}
|
|
|
|
return {
|
|
"success": True,
|
|
"platform": "feishu",
|
|
"chat_id": chat_id,
|
|
"message_id": last_result.message_id,
|
|
}
|
|
except Exception as e:
|
|
return _error(f"Feishu send failed: {e}")
|
|
|
|
|
|
def _check_send_message():
|
|
"""Gate send_message on gateway running (always available on messaging platforms)."""
|
|
platform = os.getenv("HERMES_SESSION_PLATFORM", "")
|
|
if platform and platform != "local":
|
|
return True
|
|
try:
|
|
from gateway.status import is_gateway_running
|
|
return is_gateway_running()
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
# --- Registry ---
|
|
from tools.registry import registry, tool_error
|
|
|
|
registry.register(
|
|
name="send_message",
|
|
toolset="messaging",
|
|
schema=SEND_MESSAGE_SCHEMA,
|
|
handler=send_message_tool,
|
|
check_fn=_check_send_message,
|
|
emoji="📨",
|
|
)
|