mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-06 02:41:48 +00:00
Two mitigations for the CLOSE_WAIT accumulation reported against QQ Bot + Feishu on macOS behind Cloudflare Warp. 1. Shared httpx.Limits helper (gateway/platforms/_http_client_limits.py). Every long-lived platform adapter now constructs httpx.AsyncClient with max_keepalive_connections=10 and keepalive_expiry=2.0, vs httpx's default of unbounded keepalive pool and 5.0s expiry. On macOS/Warp the default 5s window let idle keepalive sockets sit in CLOSE_WAIT long enough for seven persistent adapters (QQ Bot, WeCom, DingTalk, Signal, BlueBubbles, WeCom-callback, plus the transient Feishu helper) to compound to the 256-fd ulimit. Tunable via HERMES_GATEWAY_HTTPX_KEEPALIVE_EXPIRY and HERMES_GATEWAY_HTTPX_MAX_KEEPALIVE env vars. 2. whatsapp.send_typing aiohttp leak. The call was 'await self._http_session.post(...)' with no 'async with' and no variable capture — the ClientResponse went out of scope unclosed, holding its TCP socket in CLOSE_WAIT until GC. Fixed by wrapping in 'async with'. This was the only bare-await aiohttp leak in the gateway/tools/plugins tree per audit; all other aiohttp sites use the context-manager pattern correctly. The underlying reporter also saw Feishu SDK (lark-oapi) connections in CLOSE_WAIT — those are inside the SDK and out of our direct control, but tightening httpx keepalive across adapters reduces the aggregate pool pressure regardless of which individual adapter leaks.
1104 lines
46 KiB
Python
1104 lines
46 KiB
Python
"""
|
|
WhatsApp platform adapter.
|
|
|
|
WhatsApp integration is more complex than Telegram/Discord because:
|
|
- No official bot API for personal accounts
|
|
- Business API requires Meta Business verification
|
|
- Most solutions use web-based automation
|
|
|
|
This adapter supports multiple backends:
|
|
1. WhatsApp Business API (requires Meta verification)
|
|
2. whatsapp-web.js (via Node.js subprocess) - for personal accounts
|
|
3. Baileys (via Node.js subprocess) - alternative for personal accounts
|
|
|
|
For simplicity, we'll implement a generic interface that can work
|
|
with different backends via a bridge pattern.
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import os
|
|
import platform
|
|
import re
|
|
import subprocess
|
|
|
|
_IS_WINDOWS = platform.system() == "Windows"
|
|
from pathlib import Path
|
|
from typing import Dict, Optional, Any
|
|
|
|
from hermes_constants import get_hermes_dir
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def _kill_port_process(port: int) -> None:
|
|
"""Kill any process listening on the given TCP port."""
|
|
try:
|
|
if _IS_WINDOWS:
|
|
# Use netstat to find the PID bound to this port, then taskkill
|
|
result = subprocess.run(
|
|
["netstat", "-ano", "-p", "TCP"],
|
|
capture_output=True, text=True, timeout=5,
|
|
)
|
|
for line in result.stdout.splitlines():
|
|
parts = line.split()
|
|
if len(parts) >= 5 and parts[3] == "LISTENING":
|
|
local_addr = parts[1]
|
|
if local_addr.endswith(f":{port}"):
|
|
try:
|
|
subprocess.run(
|
|
["taskkill", "/PID", parts[4], "/F"],
|
|
capture_output=True, timeout=5,
|
|
)
|
|
except subprocess.SubprocessError:
|
|
pass
|
|
else:
|
|
result = subprocess.run(
|
|
["fuser", f"{port}/tcp"],
|
|
capture_output=True, timeout=5,
|
|
)
|
|
if result.returncode == 0:
|
|
subprocess.run(
|
|
["fuser", "-k", f"{port}/tcp"],
|
|
capture_output=True, timeout=5,
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def _terminate_bridge_process(proc, *, force: bool = False) -> None:
|
|
"""Terminate the bridge process using process-tree semantics where possible."""
|
|
if _IS_WINDOWS:
|
|
cmd = ["taskkill", "/PID", str(proc.pid), "/T"]
|
|
if force:
|
|
cmd.append("/F")
|
|
try:
|
|
result = subprocess.run(
|
|
cmd,
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=10,
|
|
)
|
|
except FileNotFoundError:
|
|
if force:
|
|
proc.kill()
|
|
else:
|
|
proc.terminate()
|
|
return
|
|
|
|
if result.returncode != 0:
|
|
details = (result.stderr or result.stdout or "").strip()
|
|
raise OSError(details or f"taskkill failed for PID {proc.pid}")
|
|
return
|
|
|
|
import signal
|
|
|
|
sig = signal.SIGTERM if not force else signal.SIGKILL
|
|
os.killpg(os.getpgid(proc.pid), sig)
|
|
|
|
import sys
|
|
sys.path.insert(0, str(Path(__file__).resolve().parents[2]))
|
|
|
|
from gateway.config import Platform, PlatformConfig
|
|
from gateway.platforms.base import (
|
|
BasePlatformAdapter,
|
|
MessageEvent,
|
|
MessageType,
|
|
SendResult,
|
|
SUPPORTED_DOCUMENT_TYPES,
|
|
cache_image_from_url,
|
|
cache_audio_from_url,
|
|
)
|
|
|
|
|
|
def check_whatsapp_requirements() -> bool:
|
|
"""
|
|
Check if WhatsApp dependencies are available.
|
|
|
|
WhatsApp requires a Node.js bridge for most implementations.
|
|
"""
|
|
# Check for Node.js
|
|
try:
|
|
result = subprocess.run(
|
|
["node", "--version"],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=5
|
|
)
|
|
return result.returncode == 0
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
class WhatsAppAdapter(BasePlatformAdapter):
|
|
"""
|
|
WhatsApp adapter.
|
|
|
|
This implementation uses a simple HTTP bridge pattern where:
|
|
1. A Node.js process runs the WhatsApp Web client
|
|
2. Messages are forwarded via HTTP/IPC to this Python adapter
|
|
3. Responses are sent back through the bridge
|
|
|
|
The actual Node.js bridge implementation can vary:
|
|
- whatsapp-web.js based
|
|
- Baileys based
|
|
- Business API based
|
|
|
|
Configuration:
|
|
- bridge_script: Path to the Node.js bridge script
|
|
- bridge_port: Port for HTTP communication (default: 3000)
|
|
- session_path: Path to store WhatsApp session data
|
|
- dm_policy: "open" | "allowlist" | "disabled" — how DMs are handled (default: "open")
|
|
- allow_from: List of sender IDs allowed in DMs (when dm_policy="allowlist")
|
|
- group_policy: "open" | "allowlist" | "disabled" — which groups are processed (default: "open")
|
|
- group_allow_from: List of group JIDs allowed (when group_policy="allowlist")
|
|
"""
|
|
|
|
# WhatsApp message limits — practical UX limit, not protocol max.
|
|
# WhatsApp allows ~65K but long messages are unreadable on mobile.
|
|
MAX_MESSAGE_LENGTH = 4096
|
|
|
|
# Default bridge location relative to the hermes-agent install
|
|
_DEFAULT_BRIDGE_DIR = Path(__file__).resolve().parents[2] / "scripts" / "whatsapp-bridge"
|
|
|
|
def __init__(self, config: PlatformConfig):
|
|
super().__init__(config, Platform.WHATSAPP)
|
|
self._bridge_process: Optional[subprocess.Popen] = None
|
|
self._bridge_port: int = config.extra.get("bridge_port", 3000)
|
|
self._bridge_script: Optional[str] = config.extra.get(
|
|
"bridge_script",
|
|
str(self._DEFAULT_BRIDGE_DIR / "bridge.js"),
|
|
)
|
|
self._session_path: Path = Path(config.extra.get(
|
|
"session_path",
|
|
get_hermes_dir("platforms/whatsapp/session", "whatsapp/session")
|
|
))
|
|
self._reply_prefix: Optional[str] = config.extra.get("reply_prefix")
|
|
self._dm_policy = str(config.extra.get("dm_policy") or os.getenv("WHATSAPP_DM_POLICY", "open")).strip().lower()
|
|
self._allow_from = self._coerce_allow_list(config.extra.get("allow_from") or config.extra.get("allowFrom"))
|
|
self._group_policy = str(config.extra.get("group_policy") or os.getenv("WHATSAPP_GROUP_POLICY", "open")).strip().lower()
|
|
self._group_allow_from = self._coerce_allow_list(config.extra.get("group_allow_from") or config.extra.get("groupAllowFrom"))
|
|
self._mention_patterns = self._compile_mention_patterns()
|
|
self._message_queue: asyncio.Queue = asyncio.Queue()
|
|
self._bridge_log_fh = None
|
|
self._bridge_log: Optional[Path] = None
|
|
self._poll_task: Optional[asyncio.Task] = None
|
|
self._http_session: Optional["aiohttp.ClientSession"] = None
|
|
# Set to True by disconnect() before we SIGTERM our child bridge so
|
|
# _check_managed_bridge_exit() can distinguish an intentional
|
|
# shutdown-time exit (returncode -15 / -2 / 0) from a real crash.
|
|
# Without this, every graceful gateway shutdown/restart would log
|
|
# "Fatal whatsapp adapter error" plus dispatch a fatal-error
|
|
# notification before the normal "✓ whatsapp disconnected" fires.
|
|
self._shutting_down: bool = False
|
|
|
|
def _whatsapp_require_mention(self) -> bool:
|
|
configured = self.config.extra.get("require_mention")
|
|
if configured is not None:
|
|
if isinstance(configured, str):
|
|
return configured.lower() in ("true", "1", "yes", "on")
|
|
return bool(configured)
|
|
return os.getenv("WHATSAPP_REQUIRE_MENTION", "false").lower() in ("true", "1", "yes", "on")
|
|
|
|
def _whatsapp_free_response_chats(self) -> set[str]:
|
|
raw = self.config.extra.get("free_response_chats")
|
|
if raw is None:
|
|
raw = os.getenv("WHATSAPP_FREE_RESPONSE_CHATS", "")
|
|
if isinstance(raw, list):
|
|
return {str(part).strip() for part in raw if str(part).strip()}
|
|
return {part.strip() for part in str(raw).split(",") if part.strip()}
|
|
|
|
@staticmethod
|
|
def _coerce_allow_list(raw) -> set[str]:
|
|
"""Parse allow_from / group_allow_from from config or env var."""
|
|
if raw is None:
|
|
return set()
|
|
if isinstance(raw, list):
|
|
return {str(part).strip() for part in raw if str(part).strip()}
|
|
return {part.strip() for part in str(raw).split(",") if part.strip()}
|
|
|
|
def _is_dm_allowed(self, sender_id: str) -> bool:
|
|
"""Check whether a DM from the given sender should be processed."""
|
|
if self._dm_policy == "disabled":
|
|
return False
|
|
if self._dm_policy == "allowlist":
|
|
return sender_id in self._allow_from
|
|
# "open" — all DMs allowed
|
|
return True
|
|
|
|
def _is_group_allowed(self, chat_id: str) -> bool:
|
|
"""Check whether a group chat should be processed."""
|
|
if self._group_policy == "disabled":
|
|
return False
|
|
if self._group_policy == "allowlist":
|
|
return chat_id in self._group_allow_from
|
|
# "open" — all groups allowed
|
|
return True
|
|
|
|
def _compile_mention_patterns(self):
|
|
patterns = self.config.extra.get("mention_patterns")
|
|
if patterns is None:
|
|
raw = os.getenv("WHATSAPP_MENTION_PATTERNS", "").strip()
|
|
if raw:
|
|
try:
|
|
patterns = json.loads(raw)
|
|
except Exception:
|
|
patterns = [part.strip() for part in raw.splitlines() if part.strip()]
|
|
if not patterns:
|
|
patterns = [part.strip() for part in raw.split(",") if part.strip()]
|
|
if patterns is None:
|
|
return []
|
|
if isinstance(patterns, str):
|
|
patterns = [patterns]
|
|
if not isinstance(patterns, list):
|
|
logger.warning("[%s] whatsapp mention_patterns must be a list or string; got %s", self.name, type(patterns).__name__)
|
|
return []
|
|
|
|
compiled = []
|
|
for pattern in patterns:
|
|
if not isinstance(pattern, str) or not pattern.strip():
|
|
continue
|
|
try:
|
|
compiled.append(re.compile(pattern, re.IGNORECASE))
|
|
except re.error as exc:
|
|
logger.warning("[%s] Invalid WhatsApp mention pattern %r: %s", self.name, pattern, exc)
|
|
if compiled:
|
|
logger.info("[%s] Loaded %d WhatsApp mention pattern(s)", self.name, len(compiled))
|
|
return compiled
|
|
|
|
@staticmethod
|
|
def _normalize_whatsapp_id(value: Optional[str]) -> str:
|
|
if not value:
|
|
return ""
|
|
normalized = str(value).strip()
|
|
if ":" in normalized and "@" in normalized:
|
|
normalized = normalized.replace(":", "@", 1)
|
|
return normalized
|
|
|
|
def _bot_ids_from_message(self, data: Dict[str, Any]) -> set[str]:
|
|
bot_ids = set()
|
|
for candidate in data.get("botIds") or []:
|
|
normalized = self._normalize_whatsapp_id(candidate)
|
|
if normalized:
|
|
bot_ids.add(normalized)
|
|
return bot_ids
|
|
|
|
def _message_is_reply_to_bot(self, data: Dict[str, Any]) -> bool:
|
|
quoted_participant = self._normalize_whatsapp_id(data.get("quotedParticipant"))
|
|
if not quoted_participant:
|
|
return False
|
|
return quoted_participant in self._bot_ids_from_message(data)
|
|
|
|
def _message_mentions_bot(self, data: Dict[str, Any]) -> bool:
|
|
bot_ids = self._bot_ids_from_message(data)
|
|
if not bot_ids:
|
|
return False
|
|
mentioned_ids = {
|
|
nid
|
|
for candidate in (data.get("mentionedIds") or [])
|
|
if (nid := self._normalize_whatsapp_id(candidate))
|
|
}
|
|
if mentioned_ids & bot_ids:
|
|
return True
|
|
|
|
body = str(data.get("body") or "")
|
|
lower_body = body.lower()
|
|
for bot_id in bot_ids:
|
|
bare_id = bot_id.split("@", 1)[0].lower()
|
|
if bare_id and (f"@{bare_id}" in lower_body or bare_id in lower_body):
|
|
return True
|
|
return False
|
|
|
|
def _message_matches_mention_patterns(self, data: Dict[str, Any]) -> bool:
|
|
if not self._mention_patterns:
|
|
return False
|
|
body = str(data.get("body") or "")
|
|
return any(pattern.search(body) for pattern in self._mention_patterns)
|
|
|
|
def _clean_bot_mention_text(self, text: str, data: Dict[str, Any]) -> str:
|
|
if not text:
|
|
return text
|
|
bot_ids = self._bot_ids_from_message(data)
|
|
cleaned = text
|
|
for bot_id in bot_ids:
|
|
bare_id = bot_id.split("@", 1)[0]
|
|
if bare_id:
|
|
cleaned = re.sub(rf"@{re.escape(bare_id)}\b[,:\-]*\s*", "", cleaned)
|
|
return cleaned.strip() or text
|
|
|
|
def _should_process_message(self, data: Dict[str, Any]) -> bool:
|
|
is_group = data.get("isGroup", False)
|
|
if is_group:
|
|
chat_id = str(data.get("chatId") or "")
|
|
if not self._is_group_allowed(chat_id):
|
|
return False
|
|
else:
|
|
sender_id = str(data.get("senderId") or data.get("from") or "")
|
|
if not self._is_dm_allowed(sender_id):
|
|
return False
|
|
# DMs that pass the policy gate are always processed
|
|
return True
|
|
# Group messages: check mention / free-response settings
|
|
chat_id = str(data.get("chatId") or "")
|
|
if chat_id in self._whatsapp_free_response_chats():
|
|
return True
|
|
if not self._whatsapp_require_mention():
|
|
return True
|
|
body = str(data.get("body") or "").strip()
|
|
if body.startswith("/"):
|
|
return True
|
|
if self._message_is_reply_to_bot(data):
|
|
return True
|
|
if self._message_mentions_bot(data):
|
|
return True
|
|
return self._message_matches_mention_patterns(data)
|
|
|
|
async def connect(self) -> bool:
|
|
"""
|
|
Start the WhatsApp bridge.
|
|
|
|
This launches the Node.js bridge process and waits for it to be ready.
|
|
"""
|
|
if not check_whatsapp_requirements():
|
|
logger.warning("[%s] Node.js not found. WhatsApp requires Node.js.", self.name)
|
|
return False
|
|
|
|
bridge_path = Path(self._bridge_script)
|
|
if not bridge_path.exists():
|
|
logger.warning("[%s] Bridge script not found: %s", self.name, bridge_path)
|
|
return False
|
|
|
|
logger.info("[%s] Bridge found at %s", self.name, bridge_path)
|
|
|
|
# Acquire scoped lock to prevent duplicate sessions
|
|
lock_acquired = False
|
|
try:
|
|
if not self._acquire_platform_lock('whatsapp-session', str(self._session_path), 'WhatsApp session'):
|
|
return False
|
|
lock_acquired = True
|
|
except Exception as e:
|
|
logger.warning("[%s] Could not acquire session lock (non-fatal): %s", self.name, e)
|
|
|
|
try:
|
|
# Auto-install npm dependencies if node_modules doesn't exist
|
|
bridge_dir = bridge_path.parent
|
|
if not (bridge_dir / "node_modules").exists():
|
|
print(f"[{self.name}] Installing WhatsApp bridge dependencies...")
|
|
try:
|
|
install_result = subprocess.run(
|
|
["npm", "install", "--silent"],
|
|
cwd=str(bridge_dir),
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=60,
|
|
)
|
|
if install_result.returncode != 0:
|
|
print(f"[{self.name}] npm install failed: {install_result.stderr}")
|
|
return False
|
|
print(f"[{self.name}] Dependencies installed")
|
|
except Exception as e:
|
|
print(f"[{self.name}] Failed to install dependencies: {e}")
|
|
return False
|
|
|
|
# Ensure session directory exists
|
|
self._session_path.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Check if bridge is already running and connected
|
|
import aiohttp
|
|
try:
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(
|
|
f"http://127.0.0.1:{self._bridge_port}/health",
|
|
timeout=aiohttp.ClientTimeout(total=2)
|
|
) as resp:
|
|
if resp.status == 200:
|
|
data = await resp.json()
|
|
bridge_status = data.get("status", "unknown")
|
|
if bridge_status == "connected":
|
|
print(f"[{self.name}] Using existing bridge (status: {bridge_status})")
|
|
self._mark_connected()
|
|
self._bridge_process = None # Not managed by us
|
|
self._http_session = aiohttp.ClientSession()
|
|
self._poll_task = asyncio.create_task(self._poll_messages())
|
|
return True
|
|
else:
|
|
print(f"[{self.name}] Bridge found but not connected (status: {bridge_status}), restarting")
|
|
except Exception:
|
|
pass # Bridge not running, start a new one
|
|
|
|
# Kill any orphaned bridge from a previous gateway run
|
|
_kill_port_process(self._bridge_port)
|
|
await asyncio.sleep(1)
|
|
|
|
# Start the bridge process in its own process group.
|
|
# Route output to a log file so QR codes, errors, and reconnection
|
|
# messages are preserved for troubleshooting.
|
|
whatsapp_mode = os.getenv("WHATSAPP_MODE", "self-chat")
|
|
self._bridge_log = self._session_path.parent / "bridge.log"
|
|
bridge_log_fh = open(self._bridge_log, "a")
|
|
self._bridge_log_fh = bridge_log_fh
|
|
|
|
# Build bridge subprocess environment.
|
|
# Pass WHATSAPP_REPLY_PREFIX from config.yaml so the Node bridge
|
|
# can use it without the user needing to set a separate env var.
|
|
bridge_env = os.environ.copy()
|
|
if self._reply_prefix is not None:
|
|
bridge_env["WHATSAPP_REPLY_PREFIX"] = self._reply_prefix
|
|
|
|
self._bridge_process = subprocess.Popen(
|
|
[
|
|
"node",
|
|
str(bridge_path),
|
|
"--port", str(self._bridge_port),
|
|
"--session", str(self._session_path),
|
|
"--mode", whatsapp_mode,
|
|
],
|
|
stdout=bridge_log_fh,
|
|
stderr=bridge_log_fh,
|
|
preexec_fn=None if _IS_WINDOWS else os.setsid,
|
|
env=bridge_env,
|
|
)
|
|
|
|
# Wait for the bridge to connect to WhatsApp.
|
|
# Phase 1: wait for the HTTP server to come up (up to 15s).
|
|
# Phase 2: wait for WhatsApp status: connected (up to 15s more).
|
|
import aiohttp
|
|
http_ready = False
|
|
data = {}
|
|
for attempt in range(15):
|
|
await asyncio.sleep(1)
|
|
if self._bridge_process.poll() is not None:
|
|
print(f"[{self.name}] Bridge process died (exit code {self._bridge_process.returncode})")
|
|
print(f"[{self.name}] Check log: {self._bridge_log}")
|
|
self._close_bridge_log()
|
|
return False
|
|
try:
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(
|
|
f"http://127.0.0.1:{self._bridge_port}/health",
|
|
timeout=aiohttp.ClientTimeout(total=2)
|
|
) as resp:
|
|
if resp.status == 200:
|
|
http_ready = True
|
|
data = await resp.json()
|
|
if data.get("status") == "connected":
|
|
print(f"[{self.name}] Bridge ready (status: connected)")
|
|
break
|
|
except Exception:
|
|
continue
|
|
|
|
if not http_ready:
|
|
print(f"[{self.name}] Bridge HTTP server did not start in 15s")
|
|
print(f"[{self.name}] Check log: {self._bridge_log}")
|
|
self._close_bridge_log()
|
|
return False
|
|
|
|
# Phase 2: HTTP is up but WhatsApp may still be connecting.
|
|
# Give it more time to authenticate with saved credentials.
|
|
if data.get("status") != "connected":
|
|
print(f"[{self.name}] Bridge HTTP ready, waiting for WhatsApp connection...")
|
|
for attempt in range(15):
|
|
await asyncio.sleep(1)
|
|
if self._bridge_process.poll() is not None:
|
|
print(f"[{self.name}] Bridge process died during connection")
|
|
print(f"[{self.name}] Check log: {self._bridge_log}")
|
|
self._close_bridge_log()
|
|
return False
|
|
try:
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(
|
|
f"http://127.0.0.1:{self._bridge_port}/health",
|
|
timeout=aiohttp.ClientTimeout(total=2)
|
|
) as resp:
|
|
if resp.status == 200:
|
|
data = await resp.json()
|
|
if data.get("status") == "connected":
|
|
print(f"[{self.name}] Bridge ready (status: connected)")
|
|
break
|
|
except Exception:
|
|
continue
|
|
else:
|
|
# Still not connected — warn but proceed (bridge may
|
|
# auto-reconnect later, e.g. after a code 515 restart).
|
|
print(f"[{self.name}] ⚠ WhatsApp not connected after 30s")
|
|
print(f"[{self.name}] Bridge log: {self._bridge_log}")
|
|
print(f"[{self.name}] If session expired, re-pair: hermes whatsapp")
|
|
|
|
# Create a persistent HTTP session for all bridge communication
|
|
self._http_session = aiohttp.ClientSession()
|
|
|
|
# Start message polling task
|
|
self._poll_task = asyncio.create_task(self._poll_messages())
|
|
|
|
self._mark_connected()
|
|
print(f"[{self.name}] Bridge started on port {self._bridge_port}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error("[%s] Failed to start bridge: %s", self.name, e, exc_info=True)
|
|
return False
|
|
finally:
|
|
if not self._running:
|
|
if lock_acquired:
|
|
self._release_platform_lock()
|
|
self._close_bridge_log()
|
|
|
|
def _close_bridge_log(self) -> None:
|
|
"""Close the bridge log file handle if open."""
|
|
if self._bridge_log_fh:
|
|
try:
|
|
self._bridge_log_fh.close()
|
|
except Exception:
|
|
pass
|
|
self._bridge_log_fh = None
|
|
|
|
async def _check_managed_bridge_exit(self) -> Optional[str]:
|
|
"""Return a fatal error message if the managed bridge child exited."""
|
|
if self._bridge_process is None:
|
|
return None
|
|
|
|
returncode = self._bridge_process.poll()
|
|
if returncode is None:
|
|
return None
|
|
|
|
# Planned shutdown: disconnect() sets _shutting_down before it sends
|
|
# SIGTERM to the bridge, so a returncode of -15 (SIGTERM), -2 (SIGINT),
|
|
# or 0 (clean exit) at that point is expected, not a crash. Treat it
|
|
# as informational and skip the fatal-error path.
|
|
# getattr-with-default keeps tests that construct the adapter via
|
|
# ``WhatsAppAdapter.__new__`` (bypassing __init__) working without
|
|
# every _make_adapter() helper having to seed the attribute.
|
|
if getattr(self, "_shutting_down", False) and returncode in (0, -2, -15):
|
|
logger.info(
|
|
"[%s] Bridge exited during shutdown (code %d).",
|
|
self.name,
|
|
returncode,
|
|
)
|
|
return None
|
|
|
|
message = f"WhatsApp bridge process exited unexpectedly (code {returncode})."
|
|
if not self.has_fatal_error:
|
|
logger.error("[%s] %s", self.name, message)
|
|
self._set_fatal_error("whatsapp_bridge_exited", message, retryable=True)
|
|
self._close_bridge_log()
|
|
await self._notify_fatal_error()
|
|
return self.fatal_error_message or message
|
|
|
|
async def disconnect(self) -> None:
|
|
"""Stop the WhatsApp bridge and clean up any orphaned processes."""
|
|
# Flip the shutdown flag BEFORE signalling the child so the exit-check
|
|
# path (which runs from other tasks like send() and the poll loop)
|
|
# doesn't race us and report the intentional termination as fatal.
|
|
self._shutting_down = True
|
|
if self._bridge_process:
|
|
try:
|
|
try:
|
|
_terminate_bridge_process(self._bridge_process, force=False)
|
|
except (ProcessLookupError, PermissionError):
|
|
self._bridge_process.terminate()
|
|
await asyncio.sleep(1)
|
|
if self._bridge_process.poll() is None:
|
|
try:
|
|
_terminate_bridge_process(self._bridge_process, force=True)
|
|
except (ProcessLookupError, PermissionError):
|
|
self._bridge_process.kill()
|
|
except Exception as e:
|
|
print(f"[{self.name}] Error stopping bridge: {e}")
|
|
else:
|
|
# Bridge was not started by us, don't kill it
|
|
print(f"[{self.name}] Disconnecting (external bridge left running)")
|
|
|
|
# Cancel the poll task explicitly
|
|
if self._poll_task and not self._poll_task.done():
|
|
self._poll_task.cancel()
|
|
try:
|
|
await self._poll_task
|
|
except (asyncio.CancelledError, Exception):
|
|
pass
|
|
self._poll_task = None
|
|
|
|
# Close the persistent HTTP session
|
|
if self._http_session and not self._http_session.closed:
|
|
await self._http_session.close()
|
|
self._http_session = None
|
|
|
|
self._release_platform_lock()
|
|
|
|
self._mark_disconnected()
|
|
self._bridge_process = None
|
|
self._close_bridge_log()
|
|
print(f"[{self.name}] Disconnected")
|
|
|
|
def format_message(self, content: str) -> str:
|
|
"""Convert standard markdown to WhatsApp-compatible formatting.
|
|
|
|
WhatsApp supports: *bold*, _italic_, ~strikethrough~, ```code```,
|
|
and monospaced `inline`. Standard markdown uses different syntax
|
|
for bold/italic/strikethrough, so we convert here.
|
|
|
|
Code blocks (``` fenced) and inline code (`) are protected from
|
|
conversion via placeholder substitution.
|
|
"""
|
|
if not content:
|
|
return content
|
|
|
|
# --- 1. Protect fenced code blocks from formatting changes ---
|
|
_FENCE_PH = "\x00FENCE"
|
|
fences: list[str] = []
|
|
|
|
def _save_fence(m: re.Match) -> str:
|
|
fences.append(m.group(0))
|
|
return f"{_FENCE_PH}{len(fences) - 1}\x00"
|
|
|
|
result = re.sub(r"```[\s\S]*?```", _save_fence, content)
|
|
|
|
# --- 2. Protect inline code ---
|
|
_CODE_PH = "\x00CODE"
|
|
codes: list[str] = []
|
|
|
|
def _save_code(m: re.Match) -> str:
|
|
codes.append(m.group(0))
|
|
return f"{_CODE_PH}{len(codes) - 1}\x00"
|
|
|
|
result = re.sub(r"`[^`\n]+`", _save_code, result)
|
|
|
|
# --- 3. Convert markdown formatting to WhatsApp syntax ---
|
|
# Bold: **text** or __text__ → *text*
|
|
result = re.sub(r"\*\*(.+?)\*\*", r"*\1*", result)
|
|
result = re.sub(r"__(.+?)__", r"*\1*", result)
|
|
# Strikethrough: ~~text~~ → ~text~
|
|
result = re.sub(r"~~(.+?)~~", r"~\1~", result)
|
|
# Italic: *text* is already WhatsApp italic — leave as-is
|
|
# _text_ is already WhatsApp italic — leave as-is
|
|
|
|
# --- 4. Convert markdown headers to bold text ---
|
|
# # Header → *Header*
|
|
result = re.sub(r"^#{1,6}\s+(.+)$", r"*\1*", result, flags=re.MULTILINE)
|
|
|
|
# --- 5. Convert markdown links: [text](url) → text (url) ---
|
|
result = re.sub(r"\[([^\]]+)\]\(([^)]+)\)", r"\1 (\2)", result)
|
|
|
|
# --- 6. Restore protected sections ---
|
|
for i, fence in enumerate(fences):
|
|
result = result.replace(f"{_FENCE_PH}{i}\x00", fence)
|
|
for i, code in enumerate(codes):
|
|
result = result.replace(f"{_CODE_PH}{i}\x00", code)
|
|
|
|
return result
|
|
|
|
async def send(
|
|
self,
|
|
chat_id: str,
|
|
content: str,
|
|
reply_to: Optional[str] = None,
|
|
metadata: Optional[Dict[str, Any]] = None
|
|
) -> SendResult:
|
|
"""Send a message via the WhatsApp bridge.
|
|
|
|
Formats markdown for WhatsApp, splits long messages into chunks
|
|
that preserve code block boundaries, and sends each chunk sequentially.
|
|
"""
|
|
if not self._running or not self._http_session:
|
|
return SendResult(success=False, error="Not connected")
|
|
bridge_exit = await self._check_managed_bridge_exit()
|
|
if bridge_exit:
|
|
return SendResult(success=False, error=bridge_exit)
|
|
|
|
if not content or not content.strip():
|
|
return SendResult(success=True, message_id=None)
|
|
|
|
try:
|
|
import aiohttp
|
|
|
|
# Format and chunk the message
|
|
formatted = self.format_message(content)
|
|
chunks = self.truncate_message(formatted, self.MAX_MESSAGE_LENGTH)
|
|
|
|
last_message_id = None
|
|
for chunk in chunks:
|
|
payload: Dict[str, Any] = {
|
|
"chatId": chat_id,
|
|
"message": chunk,
|
|
}
|
|
if reply_to and last_message_id is None:
|
|
# Only reply-to on the first chunk
|
|
payload["replyTo"] = reply_to
|
|
|
|
async with self._http_session.post(
|
|
f"http://127.0.0.1:{self._bridge_port}/send",
|
|
json=payload,
|
|
timeout=aiohttp.ClientTimeout(total=30)
|
|
) as resp:
|
|
if resp.status == 200:
|
|
data = await resp.json()
|
|
last_message_id = data.get("messageId")
|
|
else:
|
|
error = await resp.text()
|
|
return SendResult(success=False, error=error)
|
|
|
|
# Small delay between chunks to avoid rate limiting
|
|
if len(chunks) > 1:
|
|
await asyncio.sleep(0.3)
|
|
|
|
return SendResult(
|
|
success=True,
|
|
message_id=last_message_id,
|
|
)
|
|
except Exception as e:
|
|
return SendResult(success=False, error=str(e))
|
|
|
|
async def edit_message(
|
|
self,
|
|
chat_id: str,
|
|
message_id: str,
|
|
content: str,
|
|
*,
|
|
finalize: bool = False,
|
|
) -> SendResult:
|
|
"""Edit a previously sent message via the WhatsApp bridge."""
|
|
if not self._running or not self._http_session:
|
|
return SendResult(success=False, error="Not connected")
|
|
bridge_exit = await self._check_managed_bridge_exit()
|
|
if bridge_exit:
|
|
return SendResult(success=False, error=bridge_exit)
|
|
try:
|
|
import aiohttp
|
|
async with self._http_session.post(
|
|
f"http://127.0.0.1:{self._bridge_port}/edit",
|
|
json={
|
|
"chatId": chat_id,
|
|
"messageId": message_id,
|
|
"message": content,
|
|
},
|
|
timeout=aiohttp.ClientTimeout(total=15)
|
|
) as resp:
|
|
if resp.status == 200:
|
|
return SendResult(success=True, message_id=message_id)
|
|
else:
|
|
error = await resp.text()
|
|
return SendResult(success=False, error=error)
|
|
except Exception as e:
|
|
return SendResult(success=False, error=str(e))
|
|
|
|
async def _send_media_to_bridge(
|
|
self,
|
|
chat_id: str,
|
|
file_path: str,
|
|
media_type: str,
|
|
caption: Optional[str] = None,
|
|
file_name: Optional[str] = None,
|
|
) -> SendResult:
|
|
"""Send any media file via bridge /send-media endpoint."""
|
|
if not self._running or not self._http_session:
|
|
return SendResult(success=False, error="Not connected")
|
|
bridge_exit = await self._check_managed_bridge_exit()
|
|
if bridge_exit:
|
|
return SendResult(success=False, error=bridge_exit)
|
|
try:
|
|
import aiohttp
|
|
|
|
if not os.path.exists(file_path):
|
|
return SendResult(success=False, error=f"File not found: {file_path}")
|
|
|
|
payload: Dict[str, Any] = {
|
|
"chatId": chat_id,
|
|
"filePath": file_path,
|
|
"mediaType": media_type,
|
|
}
|
|
if caption:
|
|
payload["caption"] = caption
|
|
if file_name:
|
|
payload["fileName"] = file_name
|
|
|
|
async with self._http_session.post(
|
|
f"http://127.0.0.1:{self._bridge_port}/send-media",
|
|
json=payload,
|
|
timeout=aiohttp.ClientTimeout(total=120),
|
|
) as resp:
|
|
if resp.status == 200:
|
|
data = await resp.json()
|
|
return SendResult(
|
|
success=True,
|
|
message_id=data.get("messageId"),
|
|
raw_response=data,
|
|
)
|
|
else:
|
|
error = await resp.text()
|
|
return SendResult(success=False, error=error)
|
|
|
|
except Exception as e:
|
|
return SendResult(success=False, error=str(e))
|
|
|
|
async def send_image(
|
|
self,
|
|
chat_id: str,
|
|
image_url: str,
|
|
caption: Optional[str] = None,
|
|
reply_to: Optional[str] = None,
|
|
) -> SendResult:
|
|
"""Download image URL to cache, send natively via bridge."""
|
|
try:
|
|
local_path = await cache_image_from_url(image_url)
|
|
return await self._send_media_to_bridge(chat_id, local_path, "image", caption)
|
|
except Exception:
|
|
return await super().send_image(chat_id, image_url, caption, reply_to)
|
|
|
|
async def send_image_file(
|
|
self,
|
|
chat_id: str,
|
|
image_path: str,
|
|
caption: Optional[str] = None,
|
|
reply_to: Optional[str] = None,
|
|
**kwargs,
|
|
) -> SendResult:
|
|
"""Send a local image file natively via bridge."""
|
|
return await self._send_media_to_bridge(chat_id, image_path, "image", caption)
|
|
|
|
async def send_video(
|
|
self,
|
|
chat_id: str,
|
|
video_path: str,
|
|
caption: Optional[str] = None,
|
|
reply_to: Optional[str] = None,
|
|
**kwargs,
|
|
) -> SendResult:
|
|
"""Send a video natively via bridge — plays inline in WhatsApp."""
|
|
return await self._send_media_to_bridge(chat_id, video_path, "video", caption)
|
|
|
|
async def send_voice(
|
|
self,
|
|
chat_id: str,
|
|
audio_path: str,
|
|
caption: Optional[str] = None,
|
|
reply_to: Optional[str] = None,
|
|
**kwargs,
|
|
) -> SendResult:
|
|
"""Send an audio file as a WhatsApp voice message via bridge."""
|
|
return await self._send_media_to_bridge(chat_id, audio_path, "audio", caption)
|
|
|
|
async def send_document(
|
|
self,
|
|
chat_id: str,
|
|
file_path: str,
|
|
caption: Optional[str] = None,
|
|
file_name: Optional[str] = None,
|
|
reply_to: Optional[str] = None,
|
|
**kwargs,
|
|
) -> SendResult:
|
|
"""Send a document/file as a downloadable attachment via bridge."""
|
|
return await self._send_media_to_bridge(
|
|
chat_id, file_path, "document", caption,
|
|
file_name or os.path.basename(file_path),
|
|
)
|
|
|
|
async def send_typing(self, chat_id: str, metadata=None) -> None:
|
|
"""Send typing indicator via bridge."""
|
|
if not self._running or not self._http_session:
|
|
return
|
|
if await self._check_managed_bridge_exit():
|
|
return
|
|
|
|
try:
|
|
import aiohttp
|
|
|
|
# Must wrap in `async with` — a bare `await session.post(...)`
|
|
# leaves the response object alive until GC, holding its TCP
|
|
# socket in CLOSE_WAIT. See #18451.
|
|
async with self._http_session.post(
|
|
f"http://127.0.0.1:{self._bridge_port}/typing",
|
|
json={"chatId": chat_id},
|
|
timeout=aiohttp.ClientTimeout(total=5)
|
|
):
|
|
pass
|
|
except Exception:
|
|
pass # Ignore typing indicator failures
|
|
|
|
async def get_chat_info(self, chat_id: str) -> Dict[str, Any]:
|
|
"""Get information about a WhatsApp chat."""
|
|
if not self._running or not self._http_session:
|
|
return {"name": "Unknown", "type": "dm"}
|
|
if await self._check_managed_bridge_exit():
|
|
return {"name": chat_id, "type": "dm"}
|
|
|
|
try:
|
|
import aiohttp
|
|
|
|
async with self._http_session.get(
|
|
f"http://127.0.0.1:{self._bridge_port}/chat/{chat_id}",
|
|
timeout=aiohttp.ClientTimeout(total=10)
|
|
) as resp:
|
|
if resp.status == 200:
|
|
data = await resp.json()
|
|
return {
|
|
"name": data.get("name", chat_id),
|
|
"type": "group" if data.get("isGroup") else "dm",
|
|
"participants": data.get("participants", []),
|
|
}
|
|
except Exception as e:
|
|
logger.debug("Could not get WhatsApp chat info for %s: %s", chat_id, e)
|
|
|
|
return {"name": chat_id, "type": "dm"}
|
|
|
|
async def _poll_messages(self) -> None:
|
|
"""Poll the bridge for incoming messages."""
|
|
import aiohttp
|
|
|
|
while self._running:
|
|
if not self._http_session:
|
|
break
|
|
bridge_exit = await self._check_managed_bridge_exit()
|
|
if bridge_exit:
|
|
print(f"[{self.name}] {bridge_exit}")
|
|
break
|
|
try:
|
|
async with self._http_session.get(
|
|
f"http://127.0.0.1:{self._bridge_port}/messages",
|
|
timeout=aiohttp.ClientTimeout(total=30)
|
|
) as resp:
|
|
if resp.status == 200:
|
|
messages = await resp.json()
|
|
for msg_data in messages:
|
|
event = await self._build_message_event(msg_data)
|
|
if event:
|
|
await self.handle_message(event)
|
|
except asyncio.CancelledError:
|
|
break
|
|
except Exception as e:
|
|
bridge_exit = await self._check_managed_bridge_exit()
|
|
if bridge_exit:
|
|
print(f"[{self.name}] {bridge_exit}")
|
|
break
|
|
print(f"[{self.name}] Poll error: {e}")
|
|
await asyncio.sleep(5)
|
|
|
|
await asyncio.sleep(1) # Poll interval
|
|
|
|
async def _build_message_event(self, data: Dict[str, Any]) -> Optional[MessageEvent]:
|
|
"""Build a MessageEvent from bridge message data, downloading images to cache."""
|
|
try:
|
|
if not self._should_process_message(data):
|
|
return None
|
|
|
|
# Determine message type
|
|
msg_type = MessageType.TEXT
|
|
if data.get("hasMedia"):
|
|
media_type = data.get("mediaType", "")
|
|
if "image" in media_type:
|
|
msg_type = MessageType.PHOTO
|
|
elif "video" in media_type:
|
|
msg_type = MessageType.VIDEO
|
|
elif "audio" in media_type or "ptt" in media_type: # ptt = voice note
|
|
msg_type = MessageType.VOICE
|
|
else:
|
|
msg_type = MessageType.DOCUMENT
|
|
|
|
# Determine chat type
|
|
is_group = data.get("isGroup", False)
|
|
chat_type = "group" if is_group else "dm"
|
|
|
|
# Build source
|
|
source = self.build_source(
|
|
chat_id=data.get("chatId", ""),
|
|
chat_name=data.get("chatName"),
|
|
chat_type=chat_type,
|
|
user_id=data.get("senderId"),
|
|
user_name=data.get("senderName"),
|
|
)
|
|
|
|
# Download media URLs to the local cache so agent tools
|
|
# can access them reliably regardless of URL expiration.
|
|
raw_urls = data.get("mediaUrls", [])
|
|
cached_urls = []
|
|
media_types = []
|
|
for url in raw_urls:
|
|
if msg_type == MessageType.PHOTO and url.startswith(("http://", "https://")):
|
|
try:
|
|
cached_path = await cache_image_from_url(url, ext=".jpg")
|
|
cached_urls.append(cached_path)
|
|
media_types.append("image/jpeg")
|
|
print(f"[{self.name}] Cached user image: {cached_path}", flush=True)
|
|
except Exception as e:
|
|
print(f"[{self.name}] Failed to cache image: {e}", flush=True)
|
|
cached_urls.append(url)
|
|
media_types.append("image/jpeg")
|
|
elif msg_type == MessageType.PHOTO and os.path.isabs(url):
|
|
# Local file path — bridge already downloaded the image
|
|
cached_urls.append(url)
|
|
media_types.append("image/jpeg")
|
|
print(f"[{self.name}] Using bridge-cached image: {url}", flush=True)
|
|
elif msg_type == MessageType.VOICE and url.startswith(("http://", "https://")):
|
|
try:
|
|
cached_path = await cache_audio_from_url(url, ext=".ogg")
|
|
cached_urls.append(cached_path)
|
|
media_types.append("audio/ogg")
|
|
print(f"[{self.name}] Cached user voice: {cached_path}", flush=True)
|
|
except Exception as e:
|
|
print(f"[{self.name}] Failed to cache voice: {e}", flush=True)
|
|
cached_urls.append(url)
|
|
media_types.append("audio/ogg")
|
|
elif msg_type == MessageType.VOICE and os.path.isabs(url):
|
|
# Local file path — bridge already downloaded the audio
|
|
cached_urls.append(url)
|
|
media_types.append("audio/ogg")
|
|
print(f"[{self.name}] Using bridge-cached audio: {url}", flush=True)
|
|
elif msg_type == MessageType.DOCUMENT and os.path.isabs(url):
|
|
# Local file path — bridge already downloaded the document
|
|
cached_urls.append(url)
|
|
ext = Path(url).suffix.lower()
|
|
mime = SUPPORTED_DOCUMENT_TYPES.get(ext, "application/octet-stream")
|
|
media_types.append(mime)
|
|
print(f"[{self.name}] Using bridge-cached document: {url}", flush=True)
|
|
elif msg_type == MessageType.VIDEO and os.path.isabs(url):
|
|
cached_urls.append(url)
|
|
media_types.append("video/mp4")
|
|
print(f"[{self.name}] Using bridge-cached video: {url}", flush=True)
|
|
else:
|
|
cached_urls.append(url)
|
|
media_types.append("unknown")
|
|
|
|
# For text-readable documents, inject file content directly into
|
|
# the message text so the agent can read it inline.
|
|
# Cap at 100KB to match Telegram/Discord/Slack behaviour.
|
|
body = data.get("body", "")
|
|
if data.get("isGroup"):
|
|
body = self._clean_bot_mention_text(body, data)
|
|
MAX_TEXT_INJECT_BYTES = 100 * 1024
|
|
if msg_type == MessageType.DOCUMENT and cached_urls:
|
|
for doc_path in cached_urls:
|
|
ext = Path(doc_path).suffix.lower()
|
|
if ext in (".txt", ".md", ".csv", ".json", ".xml", ".yaml", ".yml", ".log", ".py", ".js", ".ts", ".html", ".css"):
|
|
try:
|
|
file_size = Path(doc_path).stat().st_size
|
|
if file_size > MAX_TEXT_INJECT_BYTES:
|
|
print(f"[{self.name}] Skipping text injection for {doc_path} ({file_size} bytes > {MAX_TEXT_INJECT_BYTES})", flush=True)
|
|
continue
|
|
content = Path(doc_path).read_text(errors="replace")
|
|
fname = Path(doc_path).name
|
|
# Remove the doc_<hex>_ prefix for display
|
|
display_name = fname
|
|
if "_" in fname:
|
|
parts = fname.split("_", 2)
|
|
if len(parts) >= 3:
|
|
display_name = parts[2]
|
|
injection = f"[Content of {display_name}]:\n{content}"
|
|
if body:
|
|
body = f"{injection}\n\n{body}"
|
|
else:
|
|
body = injection
|
|
print(f"[{self.name}] Injected text content from: {doc_path}", flush=True)
|
|
except Exception as e:
|
|
print(f"[{self.name}] Failed to read document text: {e}", flush=True)
|
|
|
|
return MessageEvent(
|
|
text=body,
|
|
message_type=msg_type,
|
|
source=source,
|
|
raw_message=data,
|
|
message_id=data.get("messageId"),
|
|
media_urls=cached_urls,
|
|
media_types=media_types,
|
|
)
|
|
except Exception as e:
|
|
print(f"[{self.name}] Error building event: {e}")
|
|
return None
|