""" WhatsApp platform adapter. WhatsApp integration is more complex than Telegram/Discord because: - No official bot API for personal accounts - Business API requires Meta Business verification - Most solutions use web-based automation This adapter supports multiple backends: 1. WhatsApp Business API (requires Meta verification) 2. whatsapp-web.js (via Node.js subprocess) - for personal accounts 3. Baileys (via Node.js subprocess) - alternative for personal accounts For simplicity, we'll implement a generic interface that can work with different backends via a bridge pattern. """ import asyncio import json import logging import os import platform import re import subprocess _IS_WINDOWS = platform.system() == "Windows" from pathlib import Path from typing import Dict, Optional, Any from hermes_constants import get_hermes_dir logger = logging.getLogger(__name__) def _kill_port_process(port: int) -> None: """Kill any process listening on the given TCP port.""" try: if _IS_WINDOWS: # Use netstat to find the PID bound to this port, then taskkill result = subprocess.run( ["netstat", "-ano", "-p", "TCP"], capture_output=True, text=True, timeout=5, ) for line in result.stdout.splitlines(): parts = line.split() if len(parts) >= 5 and parts[3] == "LISTENING": local_addr = parts[1] if local_addr.endswith(f":{port}"): try: subprocess.run( ["taskkill", "/PID", parts[4], "/F"], capture_output=True, timeout=5, ) except subprocess.SubprocessError: pass else: result = subprocess.run( ["fuser", f"{port}/tcp"], capture_output=True, timeout=5, ) if result.returncode == 0: subprocess.run( ["fuser", "-k", f"{port}/tcp"], capture_output=True, timeout=5, ) except Exception: pass def _terminate_bridge_process(proc, *, force: bool = False) -> None: """Terminate the bridge process using process-tree semantics where possible.""" if _IS_WINDOWS: cmd = ["taskkill", "/PID", str(proc.pid), "/T"] if force: cmd.append("/F") try: result = subprocess.run( cmd, capture_output=True, text=True, timeout=10, ) except FileNotFoundError: if force: proc.kill() else: proc.terminate() return if result.returncode != 0: details = (result.stderr or result.stdout or "").strip() raise OSError(details or f"taskkill failed for PID {proc.pid}") return import signal sig = signal.SIGTERM if not force else signal.SIGKILL os.killpg(os.getpgid(proc.pid), sig) import sys sys.path.insert(0, str(Path(__file__).resolve().parents[2])) from gateway.config import Platform, PlatformConfig from gateway.platforms.base import ( BasePlatformAdapter, MessageEvent, MessageType, SendResult, SUPPORTED_DOCUMENT_TYPES, cache_image_from_url, cache_audio_from_url, ) def check_whatsapp_requirements() -> bool: """ Check if WhatsApp dependencies are available. WhatsApp requires a Node.js bridge for most implementations. """ # Check for Node.js try: result = subprocess.run( ["node", "--version"], capture_output=True, text=True, timeout=5 ) return result.returncode == 0 except Exception: return False class WhatsAppAdapter(BasePlatformAdapter): """ WhatsApp adapter. This implementation uses a simple HTTP bridge pattern where: 1. A Node.js process runs the WhatsApp Web client 2. Messages are forwarded via HTTP/IPC to this Python adapter 3. Responses are sent back through the bridge The actual Node.js bridge implementation can vary: - whatsapp-web.js based - Baileys based - Business API based Configuration: - bridge_script: Path to the Node.js bridge script - bridge_port: Port for HTTP communication (default: 3000) - session_path: Path to store WhatsApp session data - dm_policy: "open" | "allowlist" | "disabled" — how DMs are handled (default: "open") - allow_from: List of sender IDs allowed in DMs (when dm_policy="allowlist") - group_policy: "open" | "allowlist" | "disabled" — which groups are processed (default: "open") - group_allow_from: List of group JIDs allowed (when group_policy="allowlist") """ # WhatsApp message limits — practical UX limit, not protocol max. # WhatsApp allows ~65K but long messages are unreadable on mobile. MAX_MESSAGE_LENGTH = 4096 # Default bridge location relative to the hermes-agent install _DEFAULT_BRIDGE_DIR = Path(__file__).resolve().parents[2] / "scripts" / "whatsapp-bridge" def __init__(self, config: PlatformConfig): super().__init__(config, Platform.WHATSAPP) self._bridge_process: Optional[subprocess.Popen] = None self._bridge_port: int = config.extra.get("bridge_port", 3000) self._bridge_script: Optional[str] = config.extra.get( "bridge_script", str(self._DEFAULT_BRIDGE_DIR / "bridge.js"), ) self._session_path: Path = Path(config.extra.get( "session_path", get_hermes_dir("platforms/whatsapp/session", "whatsapp/session") )) self._reply_prefix: Optional[str] = config.extra.get("reply_prefix") self._dm_policy = str(config.extra.get("dm_policy") or os.getenv("WHATSAPP_DM_POLICY", "open")).strip().lower() self._allow_from = self._coerce_allow_list(config.extra.get("allow_from") or config.extra.get("allowFrom")) self._group_policy = str(config.extra.get("group_policy") or os.getenv("WHATSAPP_GROUP_POLICY", "open")).strip().lower() self._group_allow_from = self._coerce_allow_list(config.extra.get("group_allow_from") or config.extra.get("groupAllowFrom")) self._mention_patterns = self._compile_mention_patterns() self._message_queue: asyncio.Queue = asyncio.Queue() self._bridge_log_fh = None self._bridge_log: Optional[Path] = None self._poll_task: Optional[asyncio.Task] = None self._http_session: Optional["aiohttp.ClientSession"] = None def _whatsapp_require_mention(self) -> bool: configured = self.config.extra.get("require_mention") if configured is not None: if isinstance(configured, str): return configured.lower() in ("true", "1", "yes", "on") return bool(configured) return os.getenv("WHATSAPP_REQUIRE_MENTION", "false").lower() in ("true", "1", "yes", "on") def _whatsapp_free_response_chats(self) -> set[str]: raw = self.config.extra.get("free_response_chats") if raw is None: raw = os.getenv("WHATSAPP_FREE_RESPONSE_CHATS", "") if isinstance(raw, list): return {str(part).strip() for part in raw if str(part).strip()} return {part.strip() for part in str(raw).split(",") if part.strip()} @staticmethod def _coerce_allow_list(raw) -> set[str]: """Parse allow_from / group_allow_from from config or env var.""" if raw is None: return set() if isinstance(raw, list): return {str(part).strip() for part in raw if str(part).strip()} return {part.strip() for part in str(raw).split(",") if part.strip()} def _is_dm_allowed(self, sender_id: str) -> bool: """Check whether a DM from the given sender should be processed.""" if self._dm_policy == "disabled": return False if self._dm_policy == "allowlist": return sender_id in self._allow_from # "open" — all DMs allowed return True def _is_group_allowed(self, chat_id: str) -> bool: """Check whether a group chat should be processed.""" if self._group_policy == "disabled": return False if self._group_policy == "allowlist": return chat_id in self._group_allow_from # "open" — all groups allowed return True def _compile_mention_patterns(self): patterns = self.config.extra.get("mention_patterns") if patterns is None: raw = os.getenv("WHATSAPP_MENTION_PATTERNS", "").strip() if raw: try: patterns = json.loads(raw) except Exception: patterns = [part.strip() for part in raw.splitlines() if part.strip()] if not patterns: patterns = [part.strip() for part in raw.split(",") if part.strip()] if patterns is None: return [] if isinstance(patterns, str): patterns = [patterns] if not isinstance(patterns, list): logger.warning("[%s] whatsapp mention_patterns must be a list or string; got %s", self.name, type(patterns).__name__) return [] compiled = [] for pattern in patterns: if not isinstance(pattern, str) or not pattern.strip(): continue try: compiled.append(re.compile(pattern, re.IGNORECASE)) except re.error as exc: logger.warning("[%s] Invalid WhatsApp mention pattern %r: %s", self.name, pattern, exc) if compiled: logger.info("[%s] Loaded %d WhatsApp mention pattern(s)", self.name, len(compiled)) return compiled @staticmethod def _normalize_whatsapp_id(value: Optional[str]) -> str: if not value: return "" normalized = str(value).strip() if ":" in normalized and "@" in normalized: normalized = normalized.replace(":", "@", 1) return normalized def _bot_ids_from_message(self, data: Dict[str, Any]) -> set[str]: bot_ids = set() for candidate in data.get("botIds") or []: normalized = self._normalize_whatsapp_id(candidate) if normalized: bot_ids.add(normalized) return bot_ids def _message_is_reply_to_bot(self, data: Dict[str, Any]) -> bool: quoted_participant = self._normalize_whatsapp_id(data.get("quotedParticipant")) if not quoted_participant: return False return quoted_participant in self._bot_ids_from_message(data) def _message_mentions_bot(self, data: Dict[str, Any]) -> bool: bot_ids = self._bot_ids_from_message(data) if not bot_ids: return False mentioned_ids = { nid for candidate in (data.get("mentionedIds") or []) if (nid := self._normalize_whatsapp_id(candidate)) } if mentioned_ids & bot_ids: return True body = str(data.get("body") or "") lower_body = body.lower() for bot_id in bot_ids: bare_id = bot_id.split("@", 1)[0].lower() if bare_id and (f"@{bare_id}" in lower_body or bare_id in lower_body): return True return False def _message_matches_mention_patterns(self, data: Dict[str, Any]) -> bool: if not self._mention_patterns: return False body = str(data.get("body") or "") return any(pattern.search(body) for pattern in self._mention_patterns) def _clean_bot_mention_text(self, text: str, data: Dict[str, Any]) -> str: if not text: return text bot_ids = self._bot_ids_from_message(data) cleaned = text for bot_id in bot_ids: bare_id = bot_id.split("@", 1)[0] if bare_id: cleaned = re.sub(rf"@{re.escape(bare_id)}\b[,:\-]*\s*", "", cleaned) return cleaned.strip() or text def _should_process_message(self, data: Dict[str, Any]) -> bool: is_group = data.get("isGroup", False) if is_group: chat_id = str(data.get("chatId") or "") if not self._is_group_allowed(chat_id): return False else: sender_id = str(data.get("senderId") or data.get("from") or "") if not self._is_dm_allowed(sender_id): return False # DMs that pass the policy gate are always processed return True # Group messages: check mention / free-response settings chat_id = str(data.get("chatId") or "") if chat_id in self._whatsapp_free_response_chats(): return True if not self._whatsapp_require_mention(): return True body = str(data.get("body") or "").strip() if body.startswith("/"): return True if self._message_is_reply_to_bot(data): return True if self._message_mentions_bot(data): return True return self._message_matches_mention_patterns(data) async def connect(self) -> bool: """ Start the WhatsApp bridge. This launches the Node.js bridge process and waits for it to be ready. """ if not check_whatsapp_requirements(): logger.warning("[%s] Node.js not found. WhatsApp requires Node.js.", self.name) return False bridge_path = Path(self._bridge_script) if not bridge_path.exists(): logger.warning("[%s] Bridge script not found: %s", self.name, bridge_path) return False logger.info("[%s] Bridge found at %s", self.name, bridge_path) # Acquire scoped lock to prevent duplicate sessions lock_acquired = False try: if not self._acquire_platform_lock('whatsapp-session', str(self._session_path), 'WhatsApp session'): return False lock_acquired = True except Exception as e: logger.warning("[%s] Could not acquire session lock (non-fatal): %s", self.name, e) try: # Auto-install npm dependencies if node_modules doesn't exist bridge_dir = bridge_path.parent if not (bridge_dir / "node_modules").exists(): print(f"[{self.name}] Installing WhatsApp bridge dependencies...") try: install_result = subprocess.run( ["npm", "install", "--silent"], cwd=str(bridge_dir), capture_output=True, text=True, timeout=60, ) if install_result.returncode != 0: print(f"[{self.name}] npm install failed: {install_result.stderr}") return False print(f"[{self.name}] Dependencies installed") except Exception as e: print(f"[{self.name}] Failed to install dependencies: {e}") return False # Ensure session directory exists self._session_path.mkdir(parents=True, exist_ok=True) # Check if bridge is already running and connected import aiohttp try: async with aiohttp.ClientSession() as session: async with session.get( f"http://127.0.0.1:{self._bridge_port}/health", timeout=aiohttp.ClientTimeout(total=2) ) as resp: if resp.status == 200: data = await resp.json() bridge_status = data.get("status", "unknown") if bridge_status == "connected": print(f"[{self.name}] Using existing bridge (status: {bridge_status})") self._mark_connected() self._bridge_process = None # Not managed by us self._http_session = aiohttp.ClientSession() self._poll_task = asyncio.create_task(self._poll_messages()) return True else: print(f"[{self.name}] Bridge found but not connected (status: {bridge_status}), restarting") except Exception: pass # Bridge not running, start a new one # Kill any orphaned bridge from a previous gateway run _kill_port_process(self._bridge_port) await asyncio.sleep(1) # Start the bridge process in its own process group. # Route output to a log file so QR codes, errors, and reconnection # messages are preserved for troubleshooting. whatsapp_mode = os.getenv("WHATSAPP_MODE", "self-chat") self._bridge_log = self._session_path.parent / "bridge.log" bridge_log_fh = open(self._bridge_log, "a") self._bridge_log_fh = bridge_log_fh # Build bridge subprocess environment. # Pass WHATSAPP_REPLY_PREFIX from config.yaml so the Node bridge # can use it without the user needing to set a separate env var. bridge_env = os.environ.copy() if self._reply_prefix is not None: bridge_env["WHATSAPP_REPLY_PREFIX"] = self._reply_prefix self._bridge_process = subprocess.Popen( [ "node", str(bridge_path), "--port", str(self._bridge_port), "--session", str(self._session_path), "--mode", whatsapp_mode, ], stdout=bridge_log_fh, stderr=bridge_log_fh, preexec_fn=None if _IS_WINDOWS else os.setsid, env=bridge_env, ) # Wait for the bridge to connect to WhatsApp. # Phase 1: wait for the HTTP server to come up (up to 15s). # Phase 2: wait for WhatsApp status: connected (up to 15s more). import aiohttp http_ready = False data = {} for attempt in range(15): await asyncio.sleep(1) if self._bridge_process.poll() is not None: print(f"[{self.name}] Bridge process died (exit code {self._bridge_process.returncode})") print(f"[{self.name}] Check log: {self._bridge_log}") self._close_bridge_log() return False try: async with aiohttp.ClientSession() as session: async with session.get( f"http://127.0.0.1:{self._bridge_port}/health", timeout=aiohttp.ClientTimeout(total=2) ) as resp: if resp.status == 200: http_ready = True data = await resp.json() if data.get("status") == "connected": print(f"[{self.name}] Bridge ready (status: connected)") break except Exception: continue if not http_ready: print(f"[{self.name}] Bridge HTTP server did not start in 15s") print(f"[{self.name}] Check log: {self._bridge_log}") self._close_bridge_log() return False # Phase 2: HTTP is up but WhatsApp may still be connecting. # Give it more time to authenticate with saved credentials. if data.get("status") != "connected": print(f"[{self.name}] Bridge HTTP ready, waiting for WhatsApp connection...") for attempt in range(15): await asyncio.sleep(1) if self._bridge_process.poll() is not None: print(f"[{self.name}] Bridge process died during connection") print(f"[{self.name}] Check log: {self._bridge_log}") self._close_bridge_log() return False try: async with aiohttp.ClientSession() as session: async with session.get( f"http://127.0.0.1:{self._bridge_port}/health", timeout=aiohttp.ClientTimeout(total=2) ) as resp: if resp.status == 200: data = await resp.json() if data.get("status") == "connected": print(f"[{self.name}] Bridge ready (status: connected)") break except Exception: continue else: # Still not connected — warn but proceed (bridge may # auto-reconnect later, e.g. after a code 515 restart). print(f"[{self.name}] ⚠ WhatsApp not connected after 30s") print(f"[{self.name}] Bridge log: {self._bridge_log}") print(f"[{self.name}] If session expired, re-pair: hermes whatsapp") # Create a persistent HTTP session for all bridge communication self._http_session = aiohttp.ClientSession() # Start message polling task self._poll_task = asyncio.create_task(self._poll_messages()) self._mark_connected() print(f"[{self.name}] Bridge started on port {self._bridge_port}") return True except Exception as e: logger.error("[%s] Failed to start bridge: %s", self.name, e, exc_info=True) return False finally: if not self._running: if lock_acquired: self._release_platform_lock() self._close_bridge_log() def _close_bridge_log(self) -> None: """Close the bridge log file handle if open.""" if self._bridge_log_fh: try: self._bridge_log_fh.close() except Exception: pass self._bridge_log_fh = None async def _check_managed_bridge_exit(self) -> Optional[str]: """Return a fatal error message if the managed bridge child exited.""" if self._bridge_process is None: return None returncode = self._bridge_process.poll() if returncode is None: return None message = f"WhatsApp bridge process exited unexpectedly (code {returncode})." if not self.has_fatal_error: logger.error("[%s] %s", self.name, message) self._set_fatal_error("whatsapp_bridge_exited", message, retryable=True) self._close_bridge_log() await self._notify_fatal_error() return self.fatal_error_message or message async def disconnect(self) -> None: """Stop the WhatsApp bridge and clean up any orphaned processes.""" if self._bridge_process: try: try: _terminate_bridge_process(self._bridge_process, force=False) except (ProcessLookupError, PermissionError): self._bridge_process.terminate() await asyncio.sleep(1) if self._bridge_process.poll() is None: try: _terminate_bridge_process(self._bridge_process, force=True) except (ProcessLookupError, PermissionError): self._bridge_process.kill() except Exception as e: print(f"[{self.name}] Error stopping bridge: {e}") else: # Bridge was not started by us, don't kill it print(f"[{self.name}] Disconnecting (external bridge left running)") # Cancel the poll task explicitly if self._poll_task and not self._poll_task.done(): self._poll_task.cancel() try: await self._poll_task except (asyncio.CancelledError, Exception): pass self._poll_task = None # Close the persistent HTTP session if self._http_session and not self._http_session.closed: await self._http_session.close() self._http_session = None self._release_platform_lock() self._mark_disconnected() self._bridge_process = None self._close_bridge_log() print(f"[{self.name}] Disconnected") def format_message(self, content: str) -> str: """Convert standard markdown to WhatsApp-compatible formatting. WhatsApp supports: *bold*, _italic_, ~strikethrough~, ```code```, and monospaced `inline`. Standard markdown uses different syntax for bold/italic/strikethrough, so we convert here. Code blocks (``` fenced) and inline code (`) are protected from conversion via placeholder substitution. """ if not content: return content # --- 1. Protect fenced code blocks from formatting changes --- _FENCE_PH = "\x00FENCE" fences: list[str] = [] def _save_fence(m: re.Match) -> str: fences.append(m.group(0)) return f"{_FENCE_PH}{len(fences) - 1}\x00" result = re.sub(r"```[\s\S]*?```", _save_fence, content) # --- 2. Protect inline code --- _CODE_PH = "\x00CODE" codes: list[str] = [] def _save_code(m: re.Match) -> str: codes.append(m.group(0)) return f"{_CODE_PH}{len(codes) - 1}\x00" result = re.sub(r"`[^`\n]+`", _save_code, result) # --- 3. Convert markdown formatting to WhatsApp syntax --- # Bold: **text** or __text__ → *text* result = re.sub(r"\*\*(.+?)\*\*", r"*\1*", result) result = re.sub(r"__(.+?)__", r"*\1*", result) # Strikethrough: ~~text~~ → ~text~ result = re.sub(r"~~(.+?)~~", r"~\1~", result) # Italic: *text* is already WhatsApp italic — leave as-is # _text_ is already WhatsApp italic — leave as-is # --- 4. Convert markdown headers to bold text --- # # Header → *Header* result = re.sub(r"^#{1,6}\s+(.+)$", r"*\1*", result, flags=re.MULTILINE) # --- 5. Convert markdown links: [text](url) → text (url) --- result = re.sub(r"\[([^\]]+)\]\(([^)]+)\)", r"\1 (\2)", result) # --- 6. Restore protected sections --- for i, fence in enumerate(fences): result = result.replace(f"{_FENCE_PH}{i}\x00", fence) for i, code in enumerate(codes): result = result.replace(f"{_CODE_PH}{i}\x00", code) return result async def send( self, chat_id: str, content: str, reply_to: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None ) -> SendResult: """Send a message via the WhatsApp bridge. Formats markdown for WhatsApp, splits long messages into chunks that preserve code block boundaries, and sends each chunk sequentially. """ if not self._running or not self._http_session: return SendResult(success=False, error="Not connected") bridge_exit = await self._check_managed_bridge_exit() if bridge_exit: return SendResult(success=False, error=bridge_exit) if not content or not content.strip(): return SendResult(success=True, message_id=None) try: import aiohttp # Format and chunk the message formatted = self.format_message(content) chunks = self.truncate_message(formatted, self.MAX_MESSAGE_LENGTH) last_message_id = None for chunk in chunks: payload: Dict[str, Any] = { "chatId": chat_id, "message": chunk, } if reply_to and last_message_id is None: # Only reply-to on the first chunk payload["replyTo"] = reply_to async with self._http_session.post( f"http://127.0.0.1:{self._bridge_port}/send", json=payload, timeout=aiohttp.ClientTimeout(total=30) ) as resp: if resp.status == 200: data = await resp.json() last_message_id = data.get("messageId") else: error = await resp.text() return SendResult(success=False, error=error) # Small delay between chunks to avoid rate limiting if len(chunks) > 1: await asyncio.sleep(0.3) return SendResult( success=True, message_id=last_message_id, ) except Exception as e: return SendResult(success=False, error=str(e)) async def edit_message( self, chat_id: str, message_id: str, content: str, *, finalize: bool = False, ) -> SendResult: """Edit a previously sent message via the WhatsApp bridge.""" if not self._running or not self._http_session: return SendResult(success=False, error="Not connected") bridge_exit = await self._check_managed_bridge_exit() if bridge_exit: return SendResult(success=False, error=bridge_exit) try: import aiohttp async with self._http_session.post( f"http://127.0.0.1:{self._bridge_port}/edit", json={ "chatId": chat_id, "messageId": message_id, "message": content, }, timeout=aiohttp.ClientTimeout(total=15) ) as resp: if resp.status == 200: return SendResult(success=True, message_id=message_id) else: error = await resp.text() return SendResult(success=False, error=error) except Exception as e: return SendResult(success=False, error=str(e)) async def _send_media_to_bridge( self, chat_id: str, file_path: str, media_type: str, caption: Optional[str] = None, file_name: Optional[str] = None, ) -> SendResult: """Send any media file via bridge /send-media endpoint.""" if not self._running or not self._http_session: return SendResult(success=False, error="Not connected") bridge_exit = await self._check_managed_bridge_exit() if bridge_exit: return SendResult(success=False, error=bridge_exit) try: import aiohttp if not os.path.exists(file_path): return SendResult(success=False, error=f"File not found: {file_path}") payload: Dict[str, Any] = { "chatId": chat_id, "filePath": file_path, "mediaType": media_type, } if caption: payload["caption"] = caption if file_name: payload["fileName"] = file_name async with self._http_session.post( f"http://127.0.0.1:{self._bridge_port}/send-media", json=payload, timeout=aiohttp.ClientTimeout(total=120), ) as resp: if resp.status == 200: data = await resp.json() return SendResult( success=True, message_id=data.get("messageId"), raw_response=data, ) else: error = await resp.text() return SendResult(success=False, error=error) except Exception as e: return SendResult(success=False, error=str(e)) async def send_image( self, chat_id: str, image_url: str, caption: Optional[str] = None, reply_to: Optional[str] = None, ) -> SendResult: """Download image URL to cache, send natively via bridge.""" try: local_path = await cache_image_from_url(image_url) return await self._send_media_to_bridge(chat_id, local_path, "image", caption) except Exception: return await super().send_image(chat_id, image_url, caption, reply_to) async def send_image_file( self, chat_id: str, image_path: str, caption: Optional[str] = None, reply_to: Optional[str] = None, **kwargs, ) -> SendResult: """Send a local image file natively via bridge.""" return await self._send_media_to_bridge(chat_id, image_path, "image", caption) async def send_video( self, chat_id: str, video_path: str, caption: Optional[str] = None, reply_to: Optional[str] = None, **kwargs, ) -> SendResult: """Send a video natively via bridge — plays inline in WhatsApp.""" return await self._send_media_to_bridge(chat_id, video_path, "video", caption) async def send_voice( self, chat_id: str, audio_path: str, caption: Optional[str] = None, reply_to: Optional[str] = None, **kwargs, ) -> SendResult: """Send an audio file as a WhatsApp voice message via bridge.""" return await self._send_media_to_bridge(chat_id, audio_path, "audio", caption) async def send_document( self, chat_id: str, file_path: str, caption: Optional[str] = None, file_name: Optional[str] = None, reply_to: Optional[str] = None, **kwargs, ) -> SendResult: """Send a document/file as a downloadable attachment via bridge.""" return await self._send_media_to_bridge( chat_id, file_path, "document", caption, file_name or os.path.basename(file_path), ) async def send_typing(self, chat_id: str, metadata=None) -> None: """Send typing indicator via bridge.""" if not self._running or not self._http_session: return if await self._check_managed_bridge_exit(): return try: import aiohttp await self._http_session.post( f"http://127.0.0.1:{self._bridge_port}/typing", json={"chatId": chat_id}, timeout=aiohttp.ClientTimeout(total=5) ) except Exception: pass # Ignore typing indicator failures async def get_chat_info(self, chat_id: str) -> Dict[str, Any]: """Get information about a WhatsApp chat.""" if not self._running or not self._http_session: return {"name": "Unknown", "type": "dm"} if await self._check_managed_bridge_exit(): return {"name": chat_id, "type": "dm"} try: import aiohttp async with self._http_session.get( f"http://127.0.0.1:{self._bridge_port}/chat/{chat_id}", timeout=aiohttp.ClientTimeout(total=10) ) as resp: if resp.status == 200: data = await resp.json() return { "name": data.get("name", chat_id), "type": "group" if data.get("isGroup") else "dm", "participants": data.get("participants", []), } except Exception as e: logger.debug("Could not get WhatsApp chat info for %s: %s", chat_id, e) return {"name": chat_id, "type": "dm"} async def _poll_messages(self) -> None: """Poll the bridge for incoming messages.""" import aiohttp while self._running: if not self._http_session: break bridge_exit = await self._check_managed_bridge_exit() if bridge_exit: print(f"[{self.name}] {bridge_exit}") break try: async with self._http_session.get( f"http://127.0.0.1:{self._bridge_port}/messages", timeout=aiohttp.ClientTimeout(total=30) ) as resp: if resp.status == 200: messages = await resp.json() for msg_data in messages: event = await self._build_message_event(msg_data) if event: await self.handle_message(event) except asyncio.CancelledError: break except Exception as e: bridge_exit = await self._check_managed_bridge_exit() if bridge_exit: print(f"[{self.name}] {bridge_exit}") break print(f"[{self.name}] Poll error: {e}") await asyncio.sleep(5) await asyncio.sleep(1) # Poll interval async def _build_message_event(self, data: Dict[str, Any]) -> Optional[MessageEvent]: """Build a MessageEvent from bridge message data, downloading images to cache.""" try: if not self._should_process_message(data): return None # Determine message type msg_type = MessageType.TEXT if data.get("hasMedia"): media_type = data.get("mediaType", "") if "image" in media_type: msg_type = MessageType.PHOTO elif "video" in media_type: msg_type = MessageType.VIDEO elif "audio" in media_type or "ptt" in media_type: # ptt = voice note msg_type = MessageType.VOICE else: msg_type = MessageType.DOCUMENT # Determine chat type is_group = data.get("isGroup", False) chat_type = "group" if is_group else "dm" # Build source source = self.build_source( chat_id=data.get("chatId", ""), chat_name=data.get("chatName"), chat_type=chat_type, user_id=data.get("senderId"), user_name=data.get("senderName"), ) # Download media URLs to the local cache so agent tools # can access them reliably regardless of URL expiration. raw_urls = data.get("mediaUrls", []) cached_urls = [] media_types = [] for url in raw_urls: if msg_type == MessageType.PHOTO and url.startswith(("http://", "https://")): try: cached_path = await cache_image_from_url(url, ext=".jpg") cached_urls.append(cached_path) media_types.append("image/jpeg") print(f"[{self.name}] Cached user image: {cached_path}", flush=True) except Exception as e: print(f"[{self.name}] Failed to cache image: {e}", flush=True) cached_urls.append(url) media_types.append("image/jpeg") elif msg_type == MessageType.PHOTO and os.path.isabs(url): # Local file path — bridge already downloaded the image cached_urls.append(url) media_types.append("image/jpeg") print(f"[{self.name}] Using bridge-cached image: {url}", flush=True) elif msg_type == MessageType.VOICE and url.startswith(("http://", "https://")): try: cached_path = await cache_audio_from_url(url, ext=".ogg") cached_urls.append(cached_path) media_types.append("audio/ogg") print(f"[{self.name}] Cached user voice: {cached_path}", flush=True) except Exception as e: print(f"[{self.name}] Failed to cache voice: {e}", flush=True) cached_urls.append(url) media_types.append("audio/ogg") elif msg_type == MessageType.VOICE and os.path.isabs(url): # Local file path — bridge already downloaded the audio cached_urls.append(url) media_types.append("audio/ogg") print(f"[{self.name}] Using bridge-cached audio: {url}", flush=True) elif msg_type == MessageType.DOCUMENT and os.path.isabs(url): # Local file path — bridge already downloaded the document cached_urls.append(url) ext = Path(url).suffix.lower() mime = SUPPORTED_DOCUMENT_TYPES.get(ext, "application/octet-stream") media_types.append(mime) print(f"[{self.name}] Using bridge-cached document: {url}", flush=True) elif msg_type == MessageType.VIDEO and os.path.isabs(url): cached_urls.append(url) media_types.append("video/mp4") print(f"[{self.name}] Using bridge-cached video: {url}", flush=True) else: cached_urls.append(url) media_types.append("unknown") # For text-readable documents, inject file content directly into # the message text so the agent can read it inline. # Cap at 100KB to match Telegram/Discord/Slack behaviour. body = data.get("body", "") if data.get("isGroup"): body = self._clean_bot_mention_text(body, data) MAX_TEXT_INJECT_BYTES = 100 * 1024 if msg_type == MessageType.DOCUMENT and cached_urls: for doc_path in cached_urls: ext = Path(doc_path).suffix.lower() if ext in (".txt", ".md", ".csv", ".json", ".xml", ".yaml", ".yml", ".log", ".py", ".js", ".ts", ".html", ".css"): try: file_size = Path(doc_path).stat().st_size if file_size > MAX_TEXT_INJECT_BYTES: print(f"[{self.name}] Skipping text injection for {doc_path} ({file_size} bytes > {MAX_TEXT_INJECT_BYTES})", flush=True) continue content = Path(doc_path).read_text(errors="replace") fname = Path(doc_path).name # Remove the doc__ prefix for display display_name = fname if "_" in fname: parts = fname.split("_", 2) if len(parts) >= 3: display_name = parts[2] injection = f"[Content of {display_name}]:\n{content}" if body: body = f"{injection}\n\n{body}" else: body = injection print(f"[{self.name}] Injected text content from: {doc_path}", flush=True) except Exception as e: print(f"[{self.name}] Failed to read document text: {e}", flush=True) return MessageEvent( text=body, message_type=msg_type, source=source, raw_message=data, message_id=data.get("messageId"), media_urls=cached_urls, media_types=media_types, ) except Exception as e: print(f"[{self.name}] Error building event: {e}") return None