mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-25 00:51:20 +00:00
time.sleep(1) inside async def connect() blocks the entire event loop for 1 second. Replaced with await asyncio.sleep(1) to yield control back to the event loop while waiting for the killed port process to release.
638 lines
25 KiB
Python
638 lines
25 KiB
Python
"""
|
|
WhatsApp platform adapter.
|
|
|
|
WhatsApp integration is more complex than Telegram/Discord because:
|
|
- No official bot API for personal accounts
|
|
- Business API requires Meta Business verification
|
|
- Most solutions use web-based automation
|
|
|
|
This adapter supports multiple backends:
|
|
1. WhatsApp Business API (requires Meta verification)
|
|
2. whatsapp-web.js (via Node.js subprocess) - for personal accounts
|
|
3. Baileys (via Node.js subprocess) - alternative for personal accounts
|
|
|
|
For simplicity, we'll implement a generic interface that can work
|
|
with different backends via a bridge pattern.
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import os
|
|
import platform
|
|
import subprocess
|
|
|
|
_IS_WINDOWS = platform.system() == "Windows"
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional, Any
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def _kill_port_process(port: int) -> None:
|
|
"""Kill any process listening on the given TCP port."""
|
|
try:
|
|
if _IS_WINDOWS:
|
|
# Use netstat to find the PID bound to this port, then taskkill
|
|
result = subprocess.run(
|
|
["netstat", "-ano", "-p", "TCP"],
|
|
capture_output=True, text=True, timeout=5,
|
|
)
|
|
for line in result.stdout.splitlines():
|
|
parts = line.split()
|
|
if len(parts) >= 5 and parts[3] == "LISTENING":
|
|
local_addr = parts[1]
|
|
if local_addr.endswith(f":{port}"):
|
|
try:
|
|
subprocess.run(
|
|
["taskkill", "/PID", parts[4], "/F"],
|
|
capture_output=True, timeout=5,
|
|
)
|
|
except subprocess.SubprocessError:
|
|
pass
|
|
else:
|
|
result = subprocess.run(
|
|
["fuser", f"{port}/tcp"],
|
|
capture_output=True, timeout=5,
|
|
)
|
|
if result.returncode == 0:
|
|
subprocess.run(
|
|
["fuser", "-k", f"{port}/tcp"],
|
|
capture_output=True, timeout=5,
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
import sys
|
|
sys.path.insert(0, str(Path(__file__).resolve().parents[2]))
|
|
|
|
from gateway.config import Platform, PlatformConfig
|
|
from gateway.platforms.base import (
|
|
BasePlatformAdapter,
|
|
MessageEvent,
|
|
MessageType,
|
|
SendResult,
|
|
cache_image_from_url,
|
|
cache_audio_from_url,
|
|
)
|
|
|
|
|
|
def check_whatsapp_requirements() -> bool:
|
|
"""
|
|
Check if WhatsApp dependencies are available.
|
|
|
|
WhatsApp requires a Node.js bridge for most implementations.
|
|
"""
|
|
# Check for Node.js
|
|
try:
|
|
result = subprocess.run(
|
|
["node", "--version"],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=5
|
|
)
|
|
return result.returncode == 0
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
class WhatsAppAdapter(BasePlatformAdapter):
|
|
"""
|
|
WhatsApp adapter.
|
|
|
|
This implementation uses a simple HTTP bridge pattern where:
|
|
1. A Node.js process runs the WhatsApp Web client
|
|
2. Messages are forwarded via HTTP/IPC to this Python adapter
|
|
3. Responses are sent back through the bridge
|
|
|
|
The actual Node.js bridge implementation can vary:
|
|
- whatsapp-web.js based
|
|
- Baileys based
|
|
- Business API based
|
|
|
|
Configuration:
|
|
- bridge_script: Path to the Node.js bridge script
|
|
- bridge_port: Port for HTTP communication (default: 3000)
|
|
- session_path: Path to store WhatsApp session data
|
|
"""
|
|
|
|
# WhatsApp message limits
|
|
MAX_MESSAGE_LENGTH = 65536 # WhatsApp allows longer messages
|
|
|
|
# Default bridge location relative to the hermes-agent install
|
|
_DEFAULT_BRIDGE_DIR = Path(__file__).resolve().parents[2] / "scripts" / "whatsapp-bridge"
|
|
|
|
def __init__(self, config: PlatformConfig):
|
|
super().__init__(config, Platform.WHATSAPP)
|
|
self._bridge_process: Optional[subprocess.Popen] = None
|
|
self._bridge_port: int = config.extra.get("bridge_port", 3000)
|
|
self._bridge_script: Optional[str] = config.extra.get(
|
|
"bridge_script",
|
|
str(self._DEFAULT_BRIDGE_DIR / "bridge.js"),
|
|
)
|
|
self._session_path: Path = Path(config.extra.get(
|
|
"session_path",
|
|
Path.home() / ".hermes" / "whatsapp" / "session"
|
|
))
|
|
self._message_queue: asyncio.Queue = asyncio.Queue()
|
|
self._bridge_log_fh = None
|
|
self._bridge_log: Optional[Path] = None
|
|
|
|
async def connect(self) -> bool:
|
|
"""
|
|
Start the WhatsApp bridge.
|
|
|
|
This launches the Node.js bridge process and waits for it to be ready.
|
|
"""
|
|
if not check_whatsapp_requirements():
|
|
logger.warning("[%s] Node.js not found. WhatsApp requires Node.js.", self.name)
|
|
return False
|
|
|
|
bridge_path = Path(self._bridge_script)
|
|
if not bridge_path.exists():
|
|
logger.warning("[%s] Bridge script not found: %s", self.name, bridge_path)
|
|
return False
|
|
|
|
logger.info("[%s] Bridge found at %s", self.name, bridge_path)
|
|
|
|
# Auto-install npm dependencies if node_modules doesn't exist
|
|
bridge_dir = bridge_path.parent
|
|
if not (bridge_dir / "node_modules").exists():
|
|
print(f"[{self.name}] Installing WhatsApp bridge dependencies...")
|
|
try:
|
|
install_result = subprocess.run(
|
|
["npm", "install", "--silent"],
|
|
cwd=str(bridge_dir),
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=60,
|
|
)
|
|
if install_result.returncode != 0:
|
|
print(f"[{self.name}] npm install failed: {install_result.stderr}")
|
|
return False
|
|
print(f"[{self.name}] Dependencies installed")
|
|
except Exception as e:
|
|
print(f"[{self.name}] Failed to install dependencies: {e}")
|
|
return False
|
|
|
|
try:
|
|
# Ensure session directory exists
|
|
self._session_path.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Kill any orphaned bridge from a previous gateway run
|
|
_kill_port_process(self._bridge_port)
|
|
import asyncio
|
|
await asyncio.sleep(1)
|
|
|
|
# Start the bridge process in its own process group.
|
|
# Route output to a log file so QR codes, errors, and reconnection
|
|
# messages are preserved for troubleshooting.
|
|
whatsapp_mode = os.getenv("WHATSAPP_MODE", "self-chat")
|
|
self._bridge_log = self._session_path.parent / "bridge.log"
|
|
bridge_log_fh = open(self._bridge_log, "a")
|
|
self._bridge_log_fh = bridge_log_fh
|
|
self._bridge_process = subprocess.Popen(
|
|
[
|
|
"node",
|
|
str(bridge_path),
|
|
"--port", str(self._bridge_port),
|
|
"--session", str(self._session_path),
|
|
"--mode", whatsapp_mode,
|
|
],
|
|
stdout=bridge_log_fh,
|
|
stderr=bridge_log_fh,
|
|
preexec_fn=None if _IS_WINDOWS else os.setsid,
|
|
)
|
|
|
|
# Wait for the bridge to connect to WhatsApp.
|
|
# Phase 1: wait for the HTTP server to come up (up to 15s).
|
|
# Phase 2: wait for WhatsApp status: connected (up to 15s more).
|
|
import aiohttp
|
|
http_ready = False
|
|
data = {}
|
|
for attempt in range(15):
|
|
await asyncio.sleep(1)
|
|
if self._bridge_process.poll() is not None:
|
|
print(f"[{self.name}] Bridge process died (exit code {self._bridge_process.returncode})")
|
|
print(f"[{self.name}] Check log: {self._bridge_log}")
|
|
self._close_bridge_log()
|
|
return False
|
|
try:
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(
|
|
f"http://localhost:{self._bridge_port}/health",
|
|
timeout=aiohttp.ClientTimeout(total=2)
|
|
) as resp:
|
|
if resp.status == 200:
|
|
http_ready = True
|
|
data = await resp.json()
|
|
if data.get("status") == "connected":
|
|
print(f"[{self.name}] Bridge ready (status: connected)")
|
|
break
|
|
except Exception:
|
|
continue
|
|
|
|
if not http_ready:
|
|
print(f"[{self.name}] Bridge HTTP server did not start in 15s")
|
|
print(f"[{self.name}] Check log: {self._bridge_log}")
|
|
self._close_bridge_log()
|
|
return False
|
|
|
|
# Phase 2: HTTP is up but WhatsApp may still be connecting.
|
|
# Give it more time to authenticate with saved credentials.
|
|
if data.get("status") != "connected":
|
|
print(f"[{self.name}] Bridge HTTP ready, waiting for WhatsApp connection...")
|
|
for attempt in range(15):
|
|
await asyncio.sleep(1)
|
|
if self._bridge_process.poll() is not None:
|
|
print(f"[{self.name}] Bridge process died during connection")
|
|
print(f"[{self.name}] Check log: {self._bridge_log}")
|
|
self._close_bridge_log()
|
|
return False
|
|
try:
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(
|
|
f"http://localhost:{self._bridge_port}/health",
|
|
timeout=aiohttp.ClientTimeout(total=2)
|
|
) as resp:
|
|
if resp.status == 200:
|
|
data = await resp.json()
|
|
if data.get("status") == "connected":
|
|
print(f"[{self.name}] Bridge ready (status: connected)")
|
|
break
|
|
except Exception:
|
|
continue
|
|
else:
|
|
# Still not connected — warn but proceed (bridge may
|
|
# auto-reconnect later, e.g. after a code 515 restart).
|
|
print(f"[{self.name}] ⚠ WhatsApp not connected after 30s")
|
|
print(f"[{self.name}] Bridge log: {self._bridge_log}")
|
|
print(f"[{self.name}] If session expired, re-pair: hermes whatsapp")
|
|
|
|
# Start message polling task
|
|
asyncio.create_task(self._poll_messages())
|
|
|
|
self._running = True
|
|
print(f"[{self.name}] Bridge started on port {self._bridge_port}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error("[%s] Failed to start bridge: %s", self.name, e, exc_info=True)
|
|
self._close_bridge_log()
|
|
return False
|
|
|
|
def _close_bridge_log(self) -> None:
|
|
"""Close the bridge log file handle if open."""
|
|
if self._bridge_log_fh:
|
|
try:
|
|
self._bridge_log_fh.close()
|
|
except Exception:
|
|
pass
|
|
self._bridge_log_fh = None
|
|
|
|
async def disconnect(self) -> None:
|
|
"""Stop the WhatsApp bridge and clean up any orphaned processes."""
|
|
if self._bridge_process:
|
|
try:
|
|
# Kill the entire process group so child node processes die too
|
|
import signal
|
|
try:
|
|
if _IS_WINDOWS:
|
|
self._bridge_process.terminate()
|
|
else:
|
|
os.killpg(os.getpgid(self._bridge_process.pid), signal.SIGTERM)
|
|
except (ProcessLookupError, PermissionError):
|
|
self._bridge_process.terminate()
|
|
await asyncio.sleep(1)
|
|
if self._bridge_process.poll() is None:
|
|
try:
|
|
if _IS_WINDOWS:
|
|
self._bridge_process.kill()
|
|
else:
|
|
os.killpg(os.getpgid(self._bridge_process.pid), signal.SIGKILL)
|
|
except (ProcessLookupError, PermissionError):
|
|
self._bridge_process.kill()
|
|
except Exception as e:
|
|
print(f"[{self.name}] Error stopping bridge: {e}")
|
|
|
|
# Also kill any orphaned bridge processes on our port
|
|
_kill_port_process(self._bridge_port)
|
|
|
|
self._running = False
|
|
self._bridge_process = None
|
|
self._close_bridge_log()
|
|
print(f"[{self.name}] Disconnected")
|
|
|
|
async def send(
|
|
self,
|
|
chat_id: str,
|
|
content: str,
|
|
reply_to: Optional[str] = None,
|
|
metadata: Optional[Dict[str, Any]] = None
|
|
) -> SendResult:
|
|
"""Send a message via the WhatsApp bridge."""
|
|
if not self._running:
|
|
return SendResult(success=False, error="Not connected")
|
|
|
|
try:
|
|
import aiohttp
|
|
|
|
async with aiohttp.ClientSession() as session:
|
|
payload = {
|
|
"chatId": chat_id,
|
|
"message": content,
|
|
}
|
|
if reply_to:
|
|
payload["replyTo"] = reply_to
|
|
|
|
async with session.post(
|
|
f"http://localhost:{self._bridge_port}/send",
|
|
json=payload,
|
|
timeout=aiohttp.ClientTimeout(total=30)
|
|
) as resp:
|
|
if resp.status == 200:
|
|
data = await resp.json()
|
|
return SendResult(
|
|
success=True,
|
|
message_id=data.get("messageId"),
|
|
raw_response=data
|
|
)
|
|
else:
|
|
error = await resp.text()
|
|
return SendResult(success=False, error=error)
|
|
|
|
except ImportError:
|
|
return SendResult(
|
|
success=False,
|
|
error="aiohttp not installed. Run: pip install aiohttp"
|
|
)
|
|
except Exception as e:
|
|
return SendResult(success=False, error=str(e))
|
|
|
|
async def edit_message(
|
|
self,
|
|
chat_id: str,
|
|
message_id: str,
|
|
content: str,
|
|
) -> SendResult:
|
|
"""Edit a previously sent message via the WhatsApp bridge."""
|
|
if not self._running:
|
|
return SendResult(success=False, error="Not connected")
|
|
try:
|
|
import aiohttp
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.post(
|
|
f"http://localhost:{self._bridge_port}/edit",
|
|
json={
|
|
"chatId": chat_id,
|
|
"messageId": message_id,
|
|
"message": content,
|
|
},
|
|
timeout=aiohttp.ClientTimeout(total=15)
|
|
) as resp:
|
|
if resp.status == 200:
|
|
return SendResult(success=True, message_id=message_id)
|
|
else:
|
|
error = await resp.text()
|
|
return SendResult(success=False, error=error)
|
|
except Exception as e:
|
|
return SendResult(success=False, error=str(e))
|
|
|
|
async def _send_media_to_bridge(
|
|
self,
|
|
chat_id: str,
|
|
file_path: str,
|
|
media_type: str,
|
|
caption: Optional[str] = None,
|
|
file_name: Optional[str] = None,
|
|
) -> SendResult:
|
|
"""Send any media file via bridge /send-media endpoint."""
|
|
if not self._running:
|
|
return SendResult(success=False, error="Not connected")
|
|
try:
|
|
import aiohttp
|
|
|
|
if not os.path.exists(file_path):
|
|
return SendResult(success=False, error=f"File not found: {file_path}")
|
|
|
|
payload: Dict[str, Any] = {
|
|
"chatId": chat_id,
|
|
"filePath": file_path,
|
|
"mediaType": media_type,
|
|
}
|
|
if caption:
|
|
payload["caption"] = caption
|
|
if file_name:
|
|
payload["fileName"] = file_name
|
|
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.post(
|
|
f"http://localhost:{self._bridge_port}/send-media",
|
|
json=payload,
|
|
timeout=aiohttp.ClientTimeout(total=120),
|
|
) as resp:
|
|
if resp.status == 200:
|
|
data = await resp.json()
|
|
return SendResult(
|
|
success=True,
|
|
message_id=data.get("messageId"),
|
|
raw_response=data,
|
|
)
|
|
else:
|
|
error = await resp.text()
|
|
return SendResult(success=False, error=error)
|
|
|
|
except Exception as e:
|
|
return SendResult(success=False, error=str(e))
|
|
|
|
async def send_image(
|
|
self,
|
|
chat_id: str,
|
|
image_url: str,
|
|
caption: Optional[str] = None,
|
|
reply_to: Optional[str] = None,
|
|
) -> SendResult:
|
|
"""Download image URL to cache, send natively via bridge."""
|
|
try:
|
|
local_path = await cache_image_from_url(image_url)
|
|
return await self._send_media_to_bridge(chat_id, local_path, "image", caption)
|
|
except Exception:
|
|
return await super().send_image(chat_id, image_url, caption, reply_to)
|
|
|
|
async def send_image_file(
|
|
self,
|
|
chat_id: str,
|
|
image_path: str,
|
|
caption: Optional[str] = None,
|
|
reply_to: Optional[str] = None,
|
|
) -> SendResult:
|
|
"""Send a local image file natively via bridge."""
|
|
return await self._send_media_to_bridge(chat_id, image_path, "image", caption)
|
|
|
|
async def send_video(
|
|
self,
|
|
chat_id: str,
|
|
video_path: str,
|
|
caption: Optional[str] = None,
|
|
reply_to: Optional[str] = None,
|
|
) -> SendResult:
|
|
"""Send a video natively via bridge — plays inline in WhatsApp."""
|
|
return await self._send_media_to_bridge(chat_id, video_path, "video", caption)
|
|
|
|
async def send_document(
|
|
self,
|
|
chat_id: str,
|
|
file_path: str,
|
|
caption: Optional[str] = None,
|
|
file_name: Optional[str] = None,
|
|
reply_to: Optional[str] = None,
|
|
) -> SendResult:
|
|
"""Send a document/file as a downloadable attachment via bridge."""
|
|
return await self._send_media_to_bridge(
|
|
chat_id, file_path, "document", caption,
|
|
file_name or os.path.basename(file_path),
|
|
)
|
|
|
|
async def send_typing(self, chat_id: str) -> None:
|
|
"""Send typing indicator via bridge."""
|
|
if not self._running:
|
|
return
|
|
|
|
try:
|
|
import aiohttp
|
|
|
|
async with aiohttp.ClientSession() as session:
|
|
await session.post(
|
|
f"http://localhost:{self._bridge_port}/typing",
|
|
json={"chatId": chat_id},
|
|
timeout=aiohttp.ClientTimeout(total=5)
|
|
)
|
|
except Exception:
|
|
pass # Ignore typing indicator failures
|
|
|
|
async def get_chat_info(self, chat_id: str) -> Dict[str, Any]:
|
|
"""Get information about a WhatsApp chat."""
|
|
if not self._running:
|
|
return {"name": "Unknown", "type": "dm"}
|
|
|
|
try:
|
|
import aiohttp
|
|
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(
|
|
f"http://localhost:{self._bridge_port}/chat/{chat_id}",
|
|
timeout=aiohttp.ClientTimeout(total=10)
|
|
) as resp:
|
|
if resp.status == 200:
|
|
data = await resp.json()
|
|
return {
|
|
"name": data.get("name", chat_id),
|
|
"type": "group" if data.get("isGroup") else "dm",
|
|
"participants": data.get("participants", []),
|
|
}
|
|
except Exception as e:
|
|
logger.debug("Could not get WhatsApp chat info for %s: %s", chat_id, e)
|
|
|
|
return {"name": chat_id, "type": "dm"}
|
|
|
|
async def _poll_messages(self) -> None:
|
|
"""Poll the bridge for incoming messages."""
|
|
try:
|
|
import aiohttp
|
|
except ImportError:
|
|
print(f"[{self.name}] aiohttp not installed, message polling disabled")
|
|
return
|
|
|
|
while self._running:
|
|
try:
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(
|
|
f"http://localhost:{self._bridge_port}/messages",
|
|
timeout=aiohttp.ClientTimeout(total=30)
|
|
) as resp:
|
|
if resp.status == 200:
|
|
messages = await resp.json()
|
|
for msg_data in messages:
|
|
event = await self._build_message_event(msg_data)
|
|
if event:
|
|
await self.handle_message(event)
|
|
except asyncio.CancelledError:
|
|
break
|
|
except Exception as e:
|
|
print(f"[{self.name}] Poll error: {e}")
|
|
await asyncio.sleep(5)
|
|
|
|
await asyncio.sleep(1) # Poll interval
|
|
|
|
async def _build_message_event(self, data: Dict[str, Any]) -> Optional[MessageEvent]:
|
|
"""Build a MessageEvent from bridge message data, downloading images to cache."""
|
|
try:
|
|
# Determine message type
|
|
msg_type = MessageType.TEXT
|
|
if data.get("hasMedia"):
|
|
media_type = data.get("mediaType", "")
|
|
if "image" in media_type:
|
|
msg_type = MessageType.PHOTO
|
|
elif "video" in media_type:
|
|
msg_type = MessageType.VIDEO
|
|
elif "audio" in media_type or "ptt" in media_type: # ptt = voice note
|
|
msg_type = MessageType.VOICE
|
|
else:
|
|
msg_type = MessageType.DOCUMENT
|
|
|
|
# Determine chat type
|
|
is_group = data.get("isGroup", False)
|
|
chat_type = "group" if is_group else "dm"
|
|
|
|
# Build source
|
|
source = self.build_source(
|
|
chat_id=data.get("chatId", ""),
|
|
chat_name=data.get("chatName"),
|
|
chat_type=chat_type,
|
|
user_id=data.get("senderId"),
|
|
user_name=data.get("senderName"),
|
|
)
|
|
|
|
# Download image media URLs to the local cache so the vision tool
|
|
# can access them reliably regardless of URL expiration.
|
|
raw_urls = data.get("mediaUrls", [])
|
|
cached_urls = []
|
|
media_types = []
|
|
for url in raw_urls:
|
|
if msg_type == MessageType.PHOTO and url.startswith(("http://", "https://")):
|
|
try:
|
|
cached_path = await cache_image_from_url(url, ext=".jpg")
|
|
cached_urls.append(cached_path)
|
|
media_types.append("image/jpeg")
|
|
print(f"[{self.name}] Cached user image: {cached_path}", flush=True)
|
|
except Exception as e:
|
|
print(f"[{self.name}] Failed to cache image: {e}", flush=True)
|
|
cached_urls.append(url)
|
|
media_types.append("image/jpeg")
|
|
elif msg_type == MessageType.VOICE and url.startswith(("http://", "https://")):
|
|
try:
|
|
cached_path = await cache_audio_from_url(url, ext=".ogg")
|
|
cached_urls.append(cached_path)
|
|
media_types.append("audio/ogg")
|
|
print(f"[{self.name}] Cached user voice: {cached_path}", flush=True)
|
|
except Exception as e:
|
|
print(f"[{self.name}] Failed to cache voice: {e}", flush=True)
|
|
cached_urls.append(url)
|
|
media_types.append("audio/ogg")
|
|
else:
|
|
cached_urls.append(url)
|
|
media_types.append("unknown")
|
|
|
|
return MessageEvent(
|
|
text=data.get("body", ""),
|
|
message_type=msg_type,
|
|
source=source,
|
|
raw_message=data,
|
|
message_id=data.get("messageId"),
|
|
media_urls=cached_urls,
|
|
media_types=media_types,
|
|
)
|
|
except Exception as e:
|
|
print(f"[{self.name}] Error building event: {e}")
|
|
return None
|
|
|