hermes-agent/tools/send_message_tool.py
quabug 50d6659392 fix: handle MEDIA tags in send_message tool for native file delivery
The send_message tool's _send_telegram() sent MEDIA:<path> tags as
literal text instead of delivering actual files. This fixes it by
extracting MEDIA tags via BasePlatformAdapter.extract_media() and
routing files to the appropriate Telegram Bot API method by extension.

Changes:
- send_message_tool: extract MEDIA tags and send files natively as
  photo/video/voice/audio/document based on file extension
- send_message_tool: add per-file error handling and missing-file logging
- send_message_tool: use cleaned text in fallback to avoid leaking tags
- base.py extract_media: handle optional space after MEDIA: colon
- base.py extract_media: strip surrounding backticks/quotes from paths

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-14 04:02:03 -07:00

393 lines
15 KiB
Python

"""Send Message Tool -- cross-channel messaging via platform APIs.
Sends a message to a user or channel on any connected messaging platform
(Telegram, Discord, Slack). Supports listing available targets and resolving
human-friendly channel names to IDs. Works in both CLI and gateway contexts.
"""
import json
import logging
import os
import re
import time
logger = logging.getLogger(__name__)
_TELEGRAM_TOPIC_TARGET_RE = re.compile(r"^\s*(-?\d+)(?::(\d+))?\s*$")
_IMAGE_EXTS = {'.jpg', '.jpeg', '.png', '.webp', '.gif'}
_VIDEO_EXTS = {'.mp4', '.mov', '.avi', '.mkv', '.3gp'}
_AUDIO_EXTS = {'.ogg', '.opus', '.mp3', '.wav', '.m4a'}
_VOICE_EXTS = {'.ogg', '.opus'}
SEND_MESSAGE_SCHEMA = {
"name": "send_message",
"description": (
"Send a message to a connected messaging platform, or list available targets.\n\n"
"IMPORTANT: When the user asks to send to a specific channel or person "
"(not just a bare platform name), call send_message(action='list') FIRST to see "
"available targets, then send to the correct one.\n"
"If the user just says a platform name like 'send to telegram', send directly "
"to the home channel without listing first."
),
"parameters": {
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": ["send", "list"],
"description": "Action to perform. 'send' (default) sends a message. 'list' returns all available channels/contacts across connected platforms."
},
"target": {
"type": "string",
"description": "Delivery target. Format: 'platform' (uses home channel), 'platform:#channel-name', 'platform:chat_id', or Telegram topic 'telegram:chat_id:thread_id'. Examples: 'telegram', 'telegram:-1001234567890:17585', 'discord:#bot-home', 'slack:#engineering', 'signal:+15551234567'"
},
"message": {
"type": "string",
"description": "The message text to send"
}
},
"required": []
}
}
def send_message_tool(args, **kw):
"""Handle cross-channel send_message tool calls."""
action = args.get("action", "send")
if action == "list":
return _handle_list()
return _handle_send(args)
def _handle_list():
"""Return formatted list of available messaging targets."""
try:
from gateway.channel_directory import format_directory_for_display
return json.dumps({"targets": format_directory_for_display()})
except Exception as e:
return json.dumps({"error": f"Failed to load channel directory: {e}"})
def _handle_send(args):
"""Send a message to a platform target."""
target = args.get("target", "")
message = args.get("message", "")
if not target or not message:
return json.dumps({"error": "Both 'target' and 'message' are required when action='send'"})
parts = target.split(":", 1)
platform_name = parts[0].strip().lower()
target_ref = parts[1].strip() if len(parts) > 1 else None
chat_id = None
thread_id = None
if target_ref:
chat_id, thread_id, is_explicit = _parse_target_ref(platform_name, target_ref)
else:
is_explicit = False
# Resolve human-friendly channel names to numeric IDs
if target_ref and not is_explicit:
try:
from gateway.channel_directory import resolve_channel_name
resolved = resolve_channel_name(platform_name, target_ref)
if resolved:
chat_id, thread_id, _ = _parse_target_ref(platform_name, resolved)
else:
return json.dumps({
"error": f"Could not resolve '{target_ref}' on {platform_name}. "
f"Use send_message(action='list') to see available targets."
})
except Exception:
return json.dumps({
"error": f"Could not resolve '{target_ref}' on {platform_name}. "
f"Try using a numeric channel ID instead."
})
from tools.interrupt import is_interrupted
if is_interrupted():
return json.dumps({"error": "Interrupted"})
try:
from gateway.config import load_gateway_config, Platform
config = load_gateway_config()
except Exception as e:
return json.dumps({"error": f"Failed to load gateway config: {e}"})
platform_map = {
"telegram": Platform.TELEGRAM,
"discord": Platform.DISCORD,
"slack": Platform.SLACK,
"whatsapp": Platform.WHATSAPP,
"signal": Platform.SIGNAL,
"email": Platform.EMAIL,
}
platform = platform_map.get(platform_name)
if not platform:
avail = ", ".join(platform_map.keys())
return json.dumps({"error": f"Unknown platform: {platform_name}. Available: {avail}"})
pconfig = config.platforms.get(platform)
if not pconfig or not pconfig.enabled:
return json.dumps({"error": f"Platform '{platform_name}' is not configured. Set up credentials in ~/.hermes/gateway.json or environment variables."})
used_home_channel = False
if not chat_id:
home = config.get_home_channel(platform)
if home:
chat_id = home.chat_id
used_home_channel = True
else:
return json.dumps({
"error": f"No home channel set for {platform_name} to determine where to send the message. "
f"Either specify a channel directly with '{platform_name}:CHANNEL_NAME', "
f"or set a home channel via: hermes config set {platform_name.upper()}_HOME_CHANNEL <channel_id>"
})
try:
from model_tools import _run_async
result = _run_async(_send_to_platform(platform, pconfig, chat_id, message, thread_id=thread_id))
if used_home_channel and isinstance(result, dict) and result.get("success"):
result["note"] = f"Sent to {platform_name} home channel (chat_id: {chat_id})"
# Mirror the sent message into the target's gateway session
if isinstance(result, dict) and result.get("success"):
try:
from gateway.mirror import mirror_to_session
source_label = os.getenv("HERMES_SESSION_PLATFORM", "cli")
if mirror_to_session(platform_name, chat_id, message, source_label=source_label, thread_id=thread_id):
result["mirrored"] = True
except Exception:
pass
return json.dumps(result)
except Exception as e:
return json.dumps({"error": f"Send failed: {e}"})
def _parse_target_ref(platform_name: str, target_ref: str):
"""Parse a tool target into chat_id/thread_id and whether it is explicit."""
if platform_name == "telegram":
match = _TELEGRAM_TOPIC_TARGET_RE.fullmatch(target_ref)
if match:
return match.group(1), match.group(2), True
if target_ref.lstrip("-").isdigit():
return target_ref, None, True
return None, None, False
async def _send_to_platform(platform, pconfig, chat_id, message, thread_id=None):
"""Route a message to the appropriate platform sender."""
from gateway.config import Platform
if platform == Platform.TELEGRAM:
return await _send_telegram(pconfig.token, chat_id, message, thread_id=thread_id)
elif platform == Platform.DISCORD:
return await _send_discord(pconfig.token, chat_id, message)
elif platform == Platform.SLACK:
return await _send_slack(pconfig.token, chat_id, message)
elif platform == Platform.SIGNAL:
return await _send_signal(pconfig.extra, chat_id, message)
elif platform == Platform.EMAIL:
return await _send_email(pconfig.extra, chat_id, message)
return {"error": f"Direct sending not yet implemented for {platform.value}"}
async def _send_telegram(token, chat_id, message, thread_id=None):
"""Send via Telegram Bot API (one-shot, no polling needed)."""
try:
from telegram import Bot
from gateway.platforms.base import BasePlatformAdapter
bot = Bot(token=token)
int_chat_id = int(chat_id)
thread_kwargs = {}
if thread_id is not None:
thread_kwargs["message_thread_id"] = int(thread_id)
# Extract MEDIA:<path> tags and send files natively
media_files, cleaned = BasePlatformAdapter.extract_media(message)
last_msg = None
# Send text portion if any remains
if cleaned.strip():
last_msg = await bot.send_message(
chat_id=int_chat_id, text=cleaned, **thread_kwargs
)
# Send extracted media files
for media_path, is_voice in media_files:
if not os.path.exists(media_path):
logger.warning("Media file not found, skipping: %s", media_path)
continue
ext = os.path.splitext(media_path)[1].lower()
try:
with open(media_path, "rb") as f:
if ext in _IMAGE_EXTS:
last_msg = await bot.send_photo(
chat_id=int_chat_id, photo=f, **thread_kwargs
)
elif ext in _VIDEO_EXTS:
last_msg = await bot.send_video(
chat_id=int_chat_id, video=f, **thread_kwargs
)
elif ext in _VOICE_EXTS and is_voice:
last_msg = await bot.send_voice(
chat_id=int_chat_id, voice=f, **thread_kwargs
)
elif ext in _AUDIO_EXTS:
last_msg = await bot.send_audio(
chat_id=int_chat_id, audio=f, **thread_kwargs
)
else:
last_msg = await bot.send_document(
chat_id=int_chat_id, document=f, **thread_kwargs
)
except Exception as e:
logger.error("Failed to send media %s: %s", media_path, e)
# If no text and no media sent, send cleaned text as fallback
if last_msg is None:
last_msg = await bot.send_message(
chat_id=int_chat_id, text=cleaned if cleaned.strip() else message, **thread_kwargs
)
return {"success": True, "platform": "telegram", "chat_id": chat_id, "message_id": str(last_msg.message_id)}
except ImportError:
return {"error": "python-telegram-bot not installed. Run: pip install python-telegram-bot"}
except Exception as e:
return {"error": f"Telegram send failed: {e}"}
async def _send_discord(token, chat_id, message):
"""Send via Discord REST API (no websocket client needed)."""
try:
import aiohttp
except ImportError:
return {"error": "aiohttp not installed. Run: pip install aiohttp"}
try:
url = f"https://discord.com/api/v10/channels/{chat_id}/messages"
headers = {"Authorization": f"Bot {token}", "Content-Type": "application/json"}
chunks = [message[i:i+2000] for i in range(0, len(message), 2000)]
message_ids = []
async with aiohttp.ClientSession() as session:
for chunk in chunks:
async with session.post(url, headers=headers, json={"content": chunk}) as resp:
if resp.status not in (200, 201):
body = await resp.text()
return {"error": f"Discord API error ({resp.status}): {body}"}
data = await resp.json()
message_ids.append(data.get("id"))
return {"success": True, "platform": "discord", "chat_id": chat_id, "message_ids": message_ids}
except Exception as e:
return {"error": f"Discord send failed: {e}"}
async def _send_slack(token, chat_id, message):
"""Send via Slack Web API."""
try:
import aiohttp
except ImportError:
return {"error": "aiohttp not installed. Run: pip install aiohttp"}
try:
url = "https://slack.com/api/chat.postMessage"
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
async with aiohttp.ClientSession() as session:
async with session.post(url, headers=headers, json={"channel": chat_id, "text": message}) as resp:
data = await resp.json()
if data.get("ok"):
return {"success": True, "platform": "slack", "chat_id": chat_id, "message_id": data.get("ts")}
return {"error": f"Slack API error: {data.get('error', 'unknown')}"}
except Exception as e:
return {"error": f"Slack send failed: {e}"}
async def _send_signal(extra, chat_id, message):
"""Send via signal-cli JSON-RPC API."""
try:
import httpx
except ImportError:
return {"error": "httpx not installed"}
try:
http_url = extra.get("http_url", "http://127.0.0.1:8080").rstrip("/")
account = extra.get("account", "")
if not account:
return {"error": "Signal account not configured"}
params = {"account": account, "message": message}
if chat_id.startswith("group:"):
params["groupId"] = chat_id[6:]
else:
params["recipient"] = [chat_id]
payload = {
"jsonrpc": "2.0",
"method": "send",
"params": params,
"id": f"send_{int(time.time() * 1000)}",
}
async with httpx.AsyncClient(timeout=30.0) as client:
resp = await client.post(f"{http_url}/api/v1/rpc", json=payload)
resp.raise_for_status()
data = resp.json()
if "error" in data:
return {"error": f"Signal RPC error: {data['error']}"}
return {"success": True, "platform": "signal", "chat_id": chat_id}
except Exception as e:
return {"error": f"Signal send failed: {e}"}
async def _send_email(extra, chat_id, message):
"""Send via SMTP (one-shot, no persistent connection needed)."""
import smtplib
from email.mime.text import MIMEText
address = extra.get("address") or os.getenv("EMAIL_ADDRESS", "")
password = os.getenv("EMAIL_PASSWORD", "")
smtp_host = extra.get("smtp_host") or os.getenv("EMAIL_SMTP_HOST", "")
smtp_port = int(os.getenv("EMAIL_SMTP_PORT", "587"))
if not all([address, password, smtp_host]):
return {"error": "Email not configured (EMAIL_ADDRESS, EMAIL_PASSWORD, EMAIL_SMTP_HOST required)"}
try:
msg = MIMEText(message, "plain", "utf-8")
msg["From"] = address
msg["To"] = chat_id
msg["Subject"] = "Hermes Agent"
server = smtplib.SMTP(smtp_host, smtp_port)
server.starttls()
server.login(address, password)
server.send_message(msg)
server.quit()
return {"success": True, "platform": "email", "chat_id": chat_id}
except Exception as e:
return {"error": f"Email send failed: {e}"}
def _check_send_message():
"""Gate send_message on gateway running (always available on messaging platforms)."""
platform = os.getenv("HERMES_SESSION_PLATFORM", "")
if platform and platform != "local":
return True
try:
from gateway.status import is_gateway_running
return is_gateway_running()
except Exception:
return False
# --- Registry ---
from tools.registry import registry
registry.register(
name="send_message",
toolset="messaging",
schema=SEND_MESSAGE_SCHEMA,
handler=send_message_tool,
check_fn=_check_send_message,
)