diff --git a/agent/redact.py b/agent/redact.py index 22f1a547fb..e007de5d95 100644 --- a/agent/redact.py +++ b/agent/redact.py @@ -52,6 +52,10 @@ _TELEGRAM_RE = re.compile( r"(bot)?(\d{8,}):([-A-Za-z0-9_]{30,})", ) +# E.164 phone numbers: +, 7-15 digits +# Negative lookahead prevents matching hex strings or identifiers +_SIGNAL_PHONE_RE = re.compile(r"(\+[1-9]\d{6,14})(?![A-Za-z0-9])") + # Compile known prefix patterns into one alternation _PREFIX_RE = re.compile( r"(? str: return f"{prefix}{digits}:***" text = _TELEGRAM_RE.sub(_redact_telegram, text) + # E.164 phone numbers (Signal, WhatsApp) + def _redact_phone(m): + phone = m.group(1) + if len(phone) <= 8: + return phone[:2] + "****" + phone[-2:] + return phone[:4] + "****" + phone[-4:] + text = _SIGNAL_PHONE_RE.sub(_redact_phone, text) + return text diff --git a/gateway/config.py b/gateway/config.py index f441e2dd6b..b52c03d28a 100644 --- a/gateway/config.py +++ b/gateway/config.py @@ -26,6 +26,7 @@ class Platform(Enum): DISCORD = "discord" WHATSAPP = "whatsapp" SLACK = "slack" + SIGNAL = "signal" HOMEASSISTANT = "homeassistant" @@ -155,7 +156,16 @@ class GatewayConfig: """Return list of platforms that are enabled and configured.""" connected = [] for platform, config in self.platforms.items(): - if config.enabled and (config.token or config.api_key): + if not config.enabled: + continue + # Platforms that use token/api_key auth + if config.token or config.api_key: + connected.append(platform) + # WhatsApp uses enabled flag only (bridge handles auth) + elif platform == Platform.WHATSAPP: + connected.append(platform) + # Signal uses extra dict for config (http_url + account) + elif platform == Platform.SIGNAL and config.extra.get("http_url"): connected.append(platform) return connected @@ -379,6 +389,28 @@ def _apply_env_overrides(config: GatewayConfig) -> None: name=os.getenv("SLACK_HOME_CHANNEL_NAME", ""), ) + # Signal + signal_url = os.getenv("SIGNAL_HTTP_URL") + signal_account = os.getenv("SIGNAL_ACCOUNT") + if signal_url and signal_account: + if Platform.SIGNAL not in config.platforms: + config.platforms[Platform.SIGNAL] = PlatformConfig() + config.platforms[Platform.SIGNAL].enabled = True + config.platforms[Platform.SIGNAL].extra.update({ + "http_url": signal_url, + "account": signal_account, + "dm_policy": os.getenv("SIGNAL_DM_POLICY", "pairing"), + "group_policy": os.getenv("SIGNAL_GROUP_POLICY", "disabled"), + "ignore_stories": os.getenv("SIGNAL_IGNORE_STORIES", "true").lower() in ("true", "1", "yes"), + }) + signal_home = os.getenv("SIGNAL_HOME_CHANNEL") + if signal_home: + config.platforms[Platform.SIGNAL].home_channel = HomeChannel( + platform=Platform.SIGNAL, + chat_id=signal_home, + name=os.getenv("SIGNAL_HOME_CHANNEL_NAME", "Home"), + ) + # Home Assistant hass_token = os.getenv("HASS_TOKEN") if hass_token: diff --git a/gateway/platforms/base.py b/gateway/platforms/base.py index d787cc939c..dc518843e4 100644 --- a/gateway/platforms/base.py +++ b/gateway/platforms/base.py @@ -838,6 +838,8 @@ class BasePlatformAdapter(ABC): user_name: Optional[str] = None, thread_id: Optional[str] = None, chat_topic: Optional[str] = None, + user_id_alt: Optional[str] = None, + chat_id_alt: Optional[str] = None, ) -> SessionSource: """Helper to build a SessionSource for this platform.""" # Normalize empty topic to None @@ -852,6 +854,8 @@ class BasePlatformAdapter(ABC): user_name=user_name, thread_id=str(thread_id) if thread_id else None, chat_topic=chat_topic.strip() if chat_topic else None, + user_id_alt=user_id_alt, + chat_id_alt=chat_id_alt, ) @abstractmethod diff --git a/gateway/platforms/signal.py b/gateway/platforms/signal.py new file mode 100644 index 0000000000..85963aa625 --- /dev/null +++ b/gateway/platforms/signal.py @@ -0,0 +1,688 @@ +"""Signal messenger platform adapter. + +Connects to a signal-cli daemon running in HTTP mode. +Inbound messages arrive via SSE (Server-Sent Events) streaming. +Outbound messages and actions use JSON-RPC 2.0 over HTTP. + +Based on PR #268 by ibhagwan, rebuilt with bug fixes. + +Requires: + - signal-cli installed and running: signal-cli daemon --http 127.0.0.1:8080 + - SIGNAL_HTTP_URL and SIGNAL_ACCOUNT environment variables set +""" + +import asyncio +import base64 +import json +import logging +import os +import re +import time +from datetime import datetime, timezone +from pathlib import Path +from typing import Dict, List, Optional, Any +from urllib.parse import unquote + +import httpx + +from gateway.config import Platform, PlatformConfig +from gateway.platforms.base import ( + BasePlatformAdapter, + MessageEvent, + MessageType, + SendResult, + cache_image_from_bytes, + cache_audio_from_bytes, + cache_document_from_bytes, + cache_image_from_url, +) + +logger = logging.getLogger(__name__) + +# --------------------------------------------------------------------------- +# Constants +# --------------------------------------------------------------------------- +SIGNAL_MAX_ATTACHMENT_SIZE = 100 * 1024 * 1024 # 100 MB +MAX_MESSAGE_LENGTH = 8000 # Signal message size limit +TYPING_INTERVAL = 8.0 # seconds between typing indicator refreshes +SSE_RETRY_DELAY_INITIAL = 2.0 +SSE_RETRY_DELAY_MAX = 60.0 +HEALTH_CHECK_INTERVAL = 30.0 # seconds between health checks +HEALTH_CHECK_STALE_THRESHOLD = 120.0 # seconds without SSE activity before concern + +# E.164 phone number pattern for redaction +_PHONE_RE = re.compile(r"\+[1-9]\d{6,14}") + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _redact_phone(phone: str) -> str: + """Redact a phone number for logging: +15551234567 -> +155****4567.""" + if not phone: + return "" + if len(phone) <= 8: + return phone[:2] + "****" + phone[-2:] if len(phone) > 4 else "****" + return phone[:4] + "****" + phone[-4:] + + +def _parse_comma_list(value: str) -> List[str]: + """Split a comma-separated string into a list, stripping whitespace.""" + return [v.strip() for v in value.split(",") if v.strip()] + + +def _guess_extension(data: bytes) -> str: + """Guess file extension from magic bytes.""" + if data[:4] == b"\x89PNG": + return ".png" + if data[:2] == b"\xff\xd8": + return ".jpg" + if data[:4] == b"GIF8": + return ".gif" + if len(data) >= 12 and data[:4] == b"RIFF" and data[8:12] == b"WEBP": + return ".webp" + if data[:4] == b"%PDF": + return ".pdf" + if len(data) >= 8 and data[4:8] == b"ftyp": + return ".mp4" + if data[:4] == b"OggS": + return ".ogg" + if len(data) >= 2 and data[0] == 0xFF and (data[1] & 0xE0) == 0xE0: + return ".mp3" + if data[:2] == b"PK": + return ".zip" + return ".bin" + + +def _is_image_ext(ext: str) -> bool: + return ext.lower() in (".jpg", ".jpeg", ".png", ".gif", ".webp") + + +def _is_audio_ext(ext: str) -> bool: + return ext.lower() in (".mp3", ".wav", ".ogg", ".m4a", ".aac") + + +def check_signal_requirements() -> bool: + """Check if Signal is configured (has URL and account).""" + return bool(os.getenv("SIGNAL_HTTP_URL") and os.getenv("SIGNAL_ACCOUNT")) + + +# --------------------------------------------------------------------------- +# Signal Adapter +# --------------------------------------------------------------------------- + +class SignalAdapter(BasePlatformAdapter): + """Signal messenger adapter using signal-cli HTTP daemon.""" + + platform = Platform.SIGNAL + + def __init__(self, config: PlatformConfig): + super().__init__(config, Platform.SIGNAL) + + extra = config.extra or {} + self.http_url = extra.get("http_url", "http://127.0.0.1:8080").rstrip("/") + self.account = extra.get("account", "") + self.dm_policy = extra.get("dm_policy", "pairing") + self.group_policy = extra.get("group_policy", "disabled") + self.ignore_stories = extra.get("ignore_stories", True) + + # Parse allowlists + allowed_str = os.getenv("SIGNAL_ALLOWED_USERS", "") + self.allowed_users = set(_parse_comma_list(allowed_str)) + group_allowed_str = os.getenv("SIGNAL_GROUP_ALLOWED_USERS", "") + self.group_allow_from = set(_parse_comma_list(group_allowed_str)) + + # HTTP client + self.client: Optional[httpx.AsyncClient] = None + + # Background tasks + self._sse_task: Optional[asyncio.Task] = None + self._health_monitor_task: Optional[asyncio.Task] = None + self._typing_tasks: Dict[str, asyncio.Task] = {} + self._running = False + self._last_sse_activity = 0.0 + self._sse_response: Optional[httpx.Response] = None + + # Pairing store (lazy import to avoid circular deps) + from gateway.pairing import PairingStore + self.pairing_store = PairingStore() + + # Debug logging (scoped to this module, NOT root logger) + if os.getenv("SIGNAL_DEBUG", "").lower() in ("true", "1", "yes"): + logger.setLevel(logging.DEBUG) + + logger.info("Signal adapter initialized: url=%s account=%s dm_policy=%s group_policy=%s", + self.http_url, _redact_phone(self.account), self.dm_policy, self.group_policy) + + # ------------------------------------------------------------------ + # Lifecycle + # ------------------------------------------------------------------ + + async def connect(self) -> bool: + """Connect to signal-cli daemon and start SSE listener.""" + if not self.http_url or not self.account: + logger.error("Signal: SIGNAL_HTTP_URL and SIGNAL_ACCOUNT are required") + return False + + self.client = httpx.AsyncClient(timeout=30.0) + + # Health check — verify signal-cli daemon is reachable + try: + resp = await self.client.get(f"{self.http_url}/api/v1/check", timeout=10.0) + if resp.status_code != 200: + logger.error("Signal: health check failed (status %d)", resp.status_code) + return False + except Exception as e: + logger.error("Signal: cannot reach signal-cli at %s: %s", self.http_url, e) + return False + + self._running = True + self._last_sse_activity = time.time() + self._sse_task = asyncio.create_task(self._sse_listener()) + self._health_monitor_task = asyncio.create_task(self._health_monitor()) + + logger.info("Signal: connected to %s", self.http_url) + return True + + async def disconnect(self) -> None: + """Stop SSE listener and clean up.""" + self._running = False + + if self._sse_task: + self._sse_task.cancel() + try: + await self._sse_task + except asyncio.CancelledError: + pass + + if self._health_monitor_task: + self._health_monitor_task.cancel() + try: + await self._health_monitor_task + except asyncio.CancelledError: + pass + + # Cancel all typing tasks + for task in self._typing_tasks.values(): + task.cancel() + self._typing_tasks.clear() + + if self.client: + await self.client.aclose() + self.client = None + + logger.info("Signal: disconnected") + + # ------------------------------------------------------------------ + # SSE Streaming (inbound messages) + # ------------------------------------------------------------------ + + async def _sse_listener(self) -> None: + """Listen for SSE events from signal-cli daemon.""" + url = f"{self.http_url}/api/v1/events?account={self.account}" + backoff = SSE_RETRY_DELAY_INITIAL + + while self._running: + try: + logger.debug("Signal SSE: connecting to %s", url) + async with self.client.stream( + "GET", url, + headers={"Accept": "text/event-stream"}, + timeout=None, + ) as response: + self._sse_response = response + backoff = SSE_RETRY_DELAY_INITIAL # Reset on successful connection + self._last_sse_activity = time.time() + logger.info("Signal SSE: connected") + + buffer = "" + async for chunk in response.aiter_text(): + if not self._running: + break + buffer += chunk + while "\n" in buffer: + line, buffer = buffer.split("\n", 1) + line = line.strip() + if not line: + continue + # Parse SSE data lines + if line.startswith("data:"): + data_str = line[5:].strip() + if not data_str: + continue + self._last_sse_activity = time.time() + try: + data = json.loads(data_str) + await self._handle_envelope(data) + except json.JSONDecodeError: + logger.debug("Signal SSE: invalid JSON: %s", data_str[:100]) + except Exception: + logger.exception("Signal SSE: error handling event") + + except asyncio.CancelledError: + break + except httpx.HTTPError as e: + if self._running: + logger.warning("Signal SSE: HTTP error: %s (reconnecting in %.0fs)", e, backoff) + except Exception as e: + if self._running: + logger.warning("Signal SSE: error: %s (reconnecting in %.0fs)", e, backoff) + + if self._running: + await asyncio.sleep(backoff) + backoff = min(backoff * 2, SSE_RETRY_DELAY_MAX) + + self._sse_response = None + + # ------------------------------------------------------------------ + # Health Monitor + # ------------------------------------------------------------------ + + async def _health_monitor(self) -> None: + """Monitor SSE connection health and force reconnect if stale.""" + while self._running: + await asyncio.sleep(HEALTH_CHECK_INTERVAL) + if not self._running: + break + + elapsed = time.time() - self._last_sse_activity + if elapsed > HEALTH_CHECK_STALE_THRESHOLD: + logger.warning("Signal: SSE idle for %.0fs, checking daemon health", elapsed) + try: + resp = await self.client.get( + f"{self.http_url}/api/v1/check", timeout=10.0 + ) + if resp.status_code == 200: + # Daemon is alive but SSE is idle — update activity to + # avoid repeated warnings (connection may just be quiet) + self._last_sse_activity = time.time() + logger.debug("Signal: daemon healthy, SSE idle") + else: + logger.warning("Signal: health check failed (%d), forcing reconnect", resp.status_code) + self._force_reconnect() + except Exception as e: + logger.warning("Signal: health check error: %s, forcing reconnect", e) + self._force_reconnect() + + def _force_reconnect(self) -> None: + """Force SSE reconnection by closing the current response.""" + if self._sse_response and not self._sse_response.is_stream_consumed: + try: + asyncio.create_task(self._sse_response.aclose()) + except Exception: + pass + self._sse_response = None + + # ------------------------------------------------------------------ + # Message Handling + # ------------------------------------------------------------------ + + async def _handle_envelope(self, envelope: dict) -> None: + """Process an incoming signal-cli envelope.""" + # Unwrap nested envelope if present + envelope_data = envelope.get("envelope", envelope) + + # Extract sender info + sender = ( + envelope_data.get("sourceNumber") + or envelope_data.get("sourceUuid") + or envelope_data.get("source") + ) + sender_name = envelope_data.get("sourceName", "") + sender_uuid = envelope_data.get("sourceUuid", "") + + if not sender: + logger.debug("Signal: ignoring envelope with no sender") + return + + # Filter stories + if self.ignore_stories and envelope_data.get("storyMessage"): + return + + # Get data message (skip receipts, typing indicators, etc.) + data_message = envelope_data.get("dataMessage") + if not data_message: + return + + # Check for group message + group_info = data_message.get("groupInfo") + group_id = group_info.get("groupId") if group_info else None + is_group = bool(group_id) + + # Authorization check — delegated to run.py's _is_user_authorized() + # for DM allowlists. We only do group policy filtering here since + # that's Signal-specific and not in the base auth system. + if is_group: + if self.group_policy == "disabled": + logger.debug("Signal: ignoring group message (group_policy=disabled)") + return + if self.group_policy == "allowlist": + if "*" not in self.group_allow_from and group_id not in self.group_allow_from: + logger.debug("Signal: group %s not in allowlist", group_id[:8] if group_id else "?") + return + # group_policy == "open" — allow through + + # DM policy "open" — for non-group, let all through to run.py auth + # (run.py will still check SIGNAL_ALLOWED_USERS / pairing) + # DM policy "pairing" / "allowlist" — handled by run.py + + # Build chat info + chat_id = sender if not is_group else f"group:{group_id}" + chat_type = "group" if is_group else "dm" + + # Extract text + text = data_message.get("message", "") + + # Process attachments + attachments_data = data_message.get("attachments", []) + image_paths = [] + audio_path = None + document_paths = [] + + if attachments_data and not getattr(self, "ignore_attachments", False): + for att in attachments_data: + att_id = att.get("id") + att_size = att.get("size", 0) + if not att_id: + continue + if att_size > SIGNAL_MAX_ATTACHMENT_SIZE: + logger.warning("Signal: attachment too large (%d bytes), skipping", att_size) + continue + try: + cached_path, ext = await self._fetch_attachment(att_id) + if cached_path: + if _is_image_ext(ext): + image_paths.append(cached_path) + elif _is_audio_ext(ext): + audio_path = cached_path + else: + document_paths.append(cached_path) + except Exception: + logger.exception("Signal: failed to fetch attachment %s", att_id) + + # Build session source + source = self.build_source( + chat_id=chat_id, + chat_name=group_info.get("groupName") if group_info else sender_name, + chat_type=chat_type, + user_id=sender, + user_name=sender_name or sender, + user_id_alt=sender_uuid if sender_uuid else None, + chat_id_alt=group_id if is_group else None, + ) + + # Determine message type + msg_type = MessageType.TEXT + if audio_path: + msg_type = MessageType.VOICE + elif image_paths: + msg_type = MessageType.IMAGE + + # Parse timestamp from envelope data (milliseconds since epoch) + ts_ms = envelope_data.get("timestamp", 0) + if ts_ms: + try: + timestamp = datetime.fromtimestamp(ts_ms / 1000, tz=timezone.utc) + except (ValueError, OSError): + timestamp = datetime.now(tz=timezone.utc) + else: + timestamp = datetime.now(tz=timezone.utc) + + # Build and dispatch event + event = MessageEvent( + source=source, + text=text or "", + message_type=msg_type, + image_paths=image_paths, + audio_path=audio_path, + document_paths=document_paths, + timestamp=timestamp, + ) + + logger.debug("Signal: message from %s in %s: %s", + _redact_phone(sender), chat_id[:20], (text or "")[:50]) + + await self.handle_message(event) + + # ------------------------------------------------------------------ + # Attachment Handling + # ------------------------------------------------------------------ + + async def _fetch_attachment(self, attachment_id: str) -> tuple: + """Fetch an attachment via JSON-RPC and cache it. Returns (path, ext).""" + result = await self._rpc("getAttachment", { + "account": self.account, + "attachmentId": attachment_id, + }) + + if not result: + return None, "" + + # Result is base64-encoded file content + raw_data = base64.b64decode(result) + ext = _guess_extension(raw_data) + + if _is_image_ext(ext): + path = cache_image_from_bytes(raw_data, ext) + elif _is_audio_ext(ext): + path = cache_audio_from_bytes(raw_data, ext) + else: + path = cache_document_from_bytes(raw_data, ext) + + return path, ext + + # ------------------------------------------------------------------ + # JSON-RPC Communication + # ------------------------------------------------------------------ + + async def _rpc(self, method: str, params: dict, rpc_id: str = None) -> Any: + """Send a JSON-RPC 2.0 request to signal-cli daemon.""" + if not self.client: + logger.warning("Signal: RPC called but client not connected") + return None + + if rpc_id is None: + rpc_id = f"{method}_{int(time.time() * 1000)}" + + payload = { + "jsonrpc": "2.0", + "method": method, + "params": params, + "id": rpc_id, + } + + try: + resp = await self.client.post( + f"{self.http_url}/api/v1/rpc", + json=payload, + timeout=30.0, + ) + resp.raise_for_status() + data = resp.json() + + if "error" in data: + logger.warning("Signal RPC error (%s): %s", method, data["error"]) + return None + + return data.get("result") + + except Exception as e: + logger.warning("Signal RPC %s failed: %s", method, e) + return None + + # ------------------------------------------------------------------ + # Sending + # ------------------------------------------------------------------ + + async def send( + self, + chat_id: str, + text: str, + reply_to_message_id: Optional[str] = None, + **kwargs, + ) -> SendResult: + """Send a text message.""" + await self._stop_typing_indicator(chat_id) + + params: Dict[str, Any] = { + "account": self.account, + "message": text, + } + + if chat_id.startswith("group:"): + params["groupId"] = chat_id[6:] + else: + params["recipient"] = [chat_id] + + result = await self._rpc("send", params) + + if result is not None: + return SendResult(success=True) + return SendResult(success=False, error="RPC send failed") + + async def send_typing(self, chat_id: str) -> None: + """Send a typing indicator.""" + params: Dict[str, Any] = { + "account": self.account, + } + + if chat_id.startswith("group:"): + params["groupId"] = chat_id[6:] + else: + params["recipient"] = [chat_id] + + await self._rpc("sendTyping", params, rpc_id="typing") + + async def send_image( + self, + chat_id: str, + image_url: str, + caption: Optional[str] = None, + **kwargs, + ) -> SendResult: + """Send an image. Supports http(s):// and file:// URLs.""" + await self._stop_typing_indicator(chat_id) + + # Resolve image to local path + if image_url.startswith("file://"): + file_path = unquote(image_url[7:]) + else: + # Download remote image to cache + try: + file_path = await cache_image_from_url(image_url) + except Exception as e: + logger.warning("Signal: failed to download image: %s", e) + return SendResult(success=False, error=str(e)) + + if not file_path or not Path(file_path).exists(): + return SendResult(success=False, error="Image file not found") + + # Validate size + file_size = Path(file_path).stat().st_size + if file_size > SIGNAL_MAX_ATTACHMENT_SIZE: + return SendResult(success=False, error=f"Image too large ({file_size} bytes)") + + params: Dict[str, Any] = { + "account": self.account, + "message": caption or "", + "attachments": [file_path], + } + + if chat_id.startswith("group:"): + params["groupId"] = chat_id[6:] + else: + params["recipient"] = [chat_id] + + result = await self._rpc("send", params) + if result is not None: + return SendResult(success=True) + return SendResult(success=False, error="RPC send with attachment failed") + + async def send_document( + self, + chat_id: str, + file_path: str, + caption: Optional[str] = None, + filename: Optional[str] = None, + **kwargs, + ) -> SendResult: + """Send a document/file attachment.""" + await self._stop_typing_indicator(chat_id) + + if not Path(file_path).exists(): + return SendResult(success=False, error="File not found") + + params: Dict[str, Any] = { + "account": self.account, + "message": caption or "", + "attachments": [file_path], + } + + if chat_id.startswith("group:"): + params["groupId"] = chat_id[6:] + else: + params["recipient"] = [chat_id] + + result = await self._rpc("send", params) + if result is not None: + return SendResult(success=True) + return SendResult(success=False, error="RPC send document failed") + + # ------------------------------------------------------------------ + # Typing Indicators + # ------------------------------------------------------------------ + + async def _start_typing_indicator(self, chat_id: str) -> None: + """Start a typing indicator loop for a chat.""" + if chat_id in self._typing_tasks: + return # Already running + + async def _typing_loop(): + try: + while True: + await self.send_typing(chat_id) + await asyncio.sleep(TYPING_INTERVAL) + except asyncio.CancelledError: + pass + + self._typing_tasks[chat_id] = asyncio.create_task(_typing_loop()) + + async def _stop_typing_indicator(self, chat_id: str) -> None: + """Stop a typing indicator loop for a chat.""" + task = self._typing_tasks.pop(chat_id, None) + if task: + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + + # ------------------------------------------------------------------ + # Chat Info + # ------------------------------------------------------------------ + + async def get_chat_info(self, chat_id: str) -> Dict[str, Any]: + """Get information about a chat/contact.""" + if chat_id.startswith("group:"): + return { + "name": chat_id, + "type": "group", + "chat_id": chat_id, + } + + # Try to resolve contact name + result = await self._rpc("getContact", { + "account": self.account, + "contactAddress": chat_id, + }) + + name = chat_id + if result and isinstance(result, dict): + name = result.get("name") or result.get("profileName") or chat_id + + return { + "name": name, + "type": "dm", + "chat_id": chat_id, + } diff --git a/gateway/run.py b/gateway/run.py index cd5b478b52..123562afa3 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -591,6 +591,13 @@ class GatewayRunner: return None return SlackAdapter(config) + elif platform == Platform.SIGNAL: + from gateway.platforms.signal import SignalAdapter, check_signal_requirements + if not check_signal_requirements(): + logger.warning("Signal: SIGNAL_HTTP_URL or SIGNAL_ACCOUNT not configured") + return None + return SignalAdapter(config) + elif platform == Platform.HOMEASSISTANT: from gateway.platforms.homeassistant import HomeAssistantAdapter, check_ha_requirements if not check_ha_requirements(): @@ -626,12 +633,14 @@ class GatewayRunner: Platform.DISCORD: "DISCORD_ALLOWED_USERS", Platform.WHATSAPP: "WHATSAPP_ALLOWED_USERS", Platform.SLACK: "SLACK_ALLOWED_USERS", + Platform.SIGNAL: "SIGNAL_ALLOWED_USERS", } platform_allow_all_map = { Platform.TELEGRAM: "TELEGRAM_ALLOW_ALL_USERS", Platform.DISCORD: "DISCORD_ALLOW_ALL_USERS", Platform.WHATSAPP: "WHATSAPP_ALLOW_ALL_USERS", Platform.SLACK: "SLACK_ALLOW_ALL_USERS", + Platform.SIGNAL: "SIGNAL_ALLOW_ALL_USERS", } # Per-platform allow-all flag (e.g., DISCORD_ALLOW_ALL_USERS=true) diff --git a/gateway/session.py b/gateway/session.py index 3113e2e6a2..f58a0419a2 100644 --- a/gateway/session.py +++ b/gateway/session.py @@ -45,6 +45,8 @@ class SessionSource: user_name: Optional[str] = None thread_id: Optional[str] = None # For forum topics, Discord threads, etc. chat_topic: Optional[str] = None # Channel topic/description (Discord, Slack) + user_id_alt: Optional[str] = None # Signal UUID (alternative to phone number) + chat_id_alt: Optional[str] = None # Signal group internal ID @property def description(self) -> str: @@ -68,7 +70,7 @@ class SessionSource: return ", ".join(parts) def to_dict(self) -> Dict[str, Any]: - return { + d = { "platform": self.platform.value, "chat_id": self.chat_id, "chat_name": self.chat_name, @@ -78,6 +80,11 @@ class SessionSource: "thread_id": self.thread_id, "chat_topic": self.chat_topic, } + if self.user_id_alt: + d["user_id_alt"] = self.user_id_alt + if self.chat_id_alt: + d["chat_id_alt"] = self.chat_id_alt + return d @classmethod def from_dict(cls, data: Dict[str, Any]) -> "SessionSource": @@ -90,6 +97,8 @@ class SessionSource: user_name=data.get("user_name"), thread_id=data.get("thread_id"), chat_topic=data.get("chat_topic"), + user_id_alt=data.get("user_id_alt"), + chat_id_alt=data.get("chat_id_alt"), ) @classmethod diff --git a/hermes_cli/gateway.py b/hermes_cli/gateway.py index b89db974c1..f4ed18e0d5 100644 --- a/hermes_cli/gateway.py +++ b/hermes_cli/gateway.py @@ -507,6 +507,12 @@ _PLATFORMS = [ "emoji": "📲", "token_var": "WHATSAPP_ENABLED", }, + { + "key": "signal", + "label": "Signal", + "emoji": "📡", + "token_var": "SIGNAL_HTTP_URL", + }, ] @@ -525,6 +531,13 @@ def _platform_status(platform: dict) -> str: return "configured + paired" return "enabled, not paired" return "not configured" + if platform.get("key") == "signal": + account = get_env_value("SIGNAL_ACCOUNT") + if val and account: + return "configured" + if val or account: + return "partially configured" + return "not configured" if val: return "configured" return "not configured" @@ -650,6 +663,138 @@ def _is_service_running() -> bool: return len(find_gateway_pids()) > 0 +def _setup_signal(): + """Interactive setup for Signal messenger.""" + import shutil + + print() + print(color(" ─── 📡 Signal Setup ───", Colors.CYAN)) + + existing_url = get_env_value("SIGNAL_HTTP_URL") + existing_account = get_env_value("SIGNAL_ACCOUNT") + if existing_url and existing_account: + print() + print_success("Signal is already configured.") + if not prompt_yes_no(" Reconfigure Signal?", False): + return + + # Check if signal-cli is available + print() + if shutil.which("signal-cli"): + print_success("signal-cli found on PATH.") + else: + print_warning("signal-cli not found on PATH.") + print_info(" Signal requires signal-cli running as an HTTP daemon.") + print_info(" Install options:") + print_info(" Linux: sudo apt install signal-cli") + print_info(" or download from https://github.com/AsamK/signal-cli") + print_info(" macOS: brew install signal-cli") + print_info(" Docker: bbernhard/signal-cli-rest-api") + print() + print_info(" After installing, link your account and start the daemon:") + print_info(" signal-cli link -n \"HermesAgent\"") + print_info(" signal-cli --account +YOURNUMBER daemon --http 127.0.0.1:8080") + print() + + # HTTP URL + print() + print_info(" Enter the URL where signal-cli HTTP daemon is running.") + default_url = existing_url or "http://127.0.0.1:8080" + try: + url = input(f" HTTP URL [{default_url}]: ").strip() or default_url + except (EOFError, KeyboardInterrupt): + print("\n Setup cancelled.") + return + + # Test connectivity + print_info(" Testing connection...") + try: + import httpx + resp = httpx.get(f"{url.rstrip('/')}/api/v1/check", timeout=10.0) + if resp.status_code == 200: + print_success(" signal-cli daemon is reachable!") + else: + print_warning(f" signal-cli responded with status {resp.status_code}.") + if not prompt_yes_no(" Continue anyway?", False): + return + except Exception as e: + print_warning(f" Could not reach signal-cli at {url}: {e}") + if not prompt_yes_no(" Save this URL anyway? (you can start signal-cli later)", True): + return + + save_env_value("SIGNAL_HTTP_URL", url) + + # Account phone number + print() + print_info(" Enter your Signal account phone number in E.164 format.") + print_info(" Example: +15551234567") + default_account = existing_account or "" + try: + account = input(f" Account number{f' [{default_account}]' if default_account else ''}: ").strip() + if not account: + account = default_account + except (EOFError, KeyboardInterrupt): + print("\n Setup cancelled.") + return + + if not account: + print_error(" Account number is required.") + return + + save_env_value("SIGNAL_ACCOUNT", account) + + # Allowed users + print() + print_info(" The gateway DENIES all users by default for security.") + print_info(" Enter phone numbers or UUIDs of allowed users (comma-separated).") + existing_allowed = get_env_value("SIGNAL_ALLOWED_USERS") or "" + default_allowed = existing_allowed or account + try: + allowed = input(f" Allowed users [{default_allowed}]: ").strip() or default_allowed + except (EOFError, KeyboardInterrupt): + print("\n Setup cancelled.") + return + + save_env_value("SIGNAL_ALLOWED_USERS", allowed) + + # DM policy + print() + policies = ["pairing (default — new users get a pairing code to approve)", + "allowlist (only explicitly listed users)", + "open (anyone can message)"] + dm_choice = prompt_choice(" DM access policy:", policies, 0) + dm_policy = ["pairing", "allowlist", "open"][dm_choice] + save_env_value("SIGNAL_DM_POLICY", dm_policy) + + # Group policy + print() + group_policies = ["disabled (default — ignore group messages)", + "allowlist (only specific groups)", + "open (respond in any group the bot is in)"] + group_choice = prompt_choice(" Group message policy:", group_policies, 0) + group_policy = ["disabled", "allowlist", "open"][group_choice] + save_env_value("SIGNAL_GROUP_POLICY", group_policy) + + if group_policy == "allowlist": + print() + print_info(" Enter group IDs to allow (comma-separated).") + existing_groups = get_env_value("SIGNAL_GROUP_ALLOWED_USERS") or "" + try: + groups = input(f" Group IDs [{existing_groups}]: ").strip() or existing_groups + except (EOFError, KeyboardInterrupt): + print("\n Setup cancelled.") + return + if groups: + save_env_value("SIGNAL_GROUP_ALLOWED_USERS", groups) + + print() + print_success("Signal configured!") + print_info(f" URL: {url}") + print_info(f" Account: {account}") + print_info(f" DM policy: {dm_policy}") + print_info(f" Group policy: {group_policy}") + + def gateway_setup(): """Interactive setup for messaging platforms + gateway service.""" @@ -702,6 +847,8 @@ def gateway_setup(): if platform["key"] == "whatsapp": _setup_whatsapp() + elif platform["key"] == "signal": + _setup_signal() else: _setup_standard_platform(platform) diff --git a/tests/gateway/test_signal.py b/tests/gateway/test_signal.py new file mode 100644 index 0000000000..81d6bf4978 --- /dev/null +++ b/tests/gateway/test_signal.py @@ -0,0 +1,289 @@ +"""Tests for Signal messenger platform adapter.""" +import json +import pytest +from unittest.mock import MagicMock, patch, AsyncMock + +from gateway.config import Platform, PlatformConfig + + +# --------------------------------------------------------------------------- +# Platform & Config +# --------------------------------------------------------------------------- + +class TestSignalPlatformEnum: + def test_signal_enum_exists(self): + assert Platform.SIGNAL.value == "signal" + + def test_signal_in_platform_list(self): + platforms = [p.value for p in Platform] + assert "signal" in platforms + + +class TestSignalConfigLoading: + def test_apply_env_overrides_signal(self, monkeypatch): + monkeypatch.setenv("SIGNAL_HTTP_URL", "http://localhost:9090") + monkeypatch.setenv("SIGNAL_ACCOUNT", "+15551234567") + monkeypatch.setenv("SIGNAL_DM_POLICY", "open") + monkeypatch.setenv("SIGNAL_GROUP_POLICY", "allowlist") + + from gateway.config import GatewayConfig, _apply_env_overrides + config = GatewayConfig() + _apply_env_overrides(config) + + assert Platform.SIGNAL in config.platforms + sc = config.platforms[Platform.SIGNAL] + assert sc.enabled is True + assert sc.extra["http_url"] == "http://localhost:9090" + assert sc.extra["account"] == "+15551234567" + assert sc.extra["dm_policy"] == "open" + assert sc.extra["group_policy"] == "allowlist" + + def test_signal_not_loaded_without_both_vars(self, monkeypatch): + monkeypatch.setenv("SIGNAL_HTTP_URL", "http://localhost:9090") + # No SIGNAL_ACCOUNT + + from gateway.config import GatewayConfig, _apply_env_overrides + config = GatewayConfig() + _apply_env_overrides(config) + + assert Platform.SIGNAL not in config.platforms + + def test_connected_platforms_includes_signal(self, monkeypatch): + monkeypatch.setenv("SIGNAL_HTTP_URL", "http://localhost:8080") + monkeypatch.setenv("SIGNAL_ACCOUNT", "+15551234567") + + from gateway.config import GatewayConfig, _apply_env_overrides + config = GatewayConfig() + _apply_env_overrides(config) + + connected = config.get_connected_platforms() + assert Platform.SIGNAL in connected + + +# --------------------------------------------------------------------------- +# Adapter Init & Helpers +# --------------------------------------------------------------------------- + +class TestSignalAdapterInit: + def _make_config(self, **extra): + config = PlatformConfig() + config.enabled = True + config.extra = { + "http_url": "http://localhost:8080", + "account": "+15551234567", + "dm_policy": "pairing", + "group_policy": "disabled", + **extra, + } + return config + + def test_init_parses_config(self, monkeypatch): + monkeypatch.setenv("SIGNAL_ALLOWED_USERS", "+15559876543,+15551111111") + monkeypatch.setenv("SIGNAL_GROUP_ALLOWED_USERS", "group123,group456") + monkeypatch.delenv("SIGNAL_DEBUG", raising=False) + + from gateway.platforms.signal import SignalAdapter + adapter = SignalAdapter(self._make_config()) + + assert adapter.http_url == "http://localhost:8080" + assert adapter.account == "+15551234567" + assert adapter.dm_policy == "pairing" + assert adapter.group_policy == "disabled" + assert "+15559876543" in adapter.allowed_users + assert "+15551111111" in adapter.allowed_users + assert "group123" in adapter.group_allow_from + + def test_init_empty_allowlist(self, monkeypatch): + monkeypatch.setenv("SIGNAL_ALLOWED_USERS", "") + monkeypatch.setenv("SIGNAL_GROUP_ALLOWED_USERS", "") + monkeypatch.delenv("SIGNAL_DEBUG", raising=False) + + from gateway.platforms.signal import SignalAdapter + adapter = SignalAdapter(self._make_config()) + + assert len(adapter.allowed_users) == 0 + assert len(adapter.group_allow_from) == 0 + + def test_init_strips_trailing_slash(self, monkeypatch): + monkeypatch.setenv("SIGNAL_ALLOWED_USERS", "") + monkeypatch.setenv("SIGNAL_GROUP_ALLOWED_USERS", "") + monkeypatch.delenv("SIGNAL_DEBUG", raising=False) + + from gateway.platforms.signal import SignalAdapter + adapter = SignalAdapter(self._make_config(http_url="http://localhost:8080/")) + + assert adapter.http_url == "http://localhost:8080" + + +class TestSignalHelpers: + def test_redact_phone_long(self): + from gateway.platforms.signal import _redact_phone + assert _redact_phone("+15551234567") == "+155****4567" + + def test_redact_phone_short(self): + from gateway.platforms.signal import _redact_phone + assert _redact_phone("+12345") == "+1****45" + + def test_redact_phone_empty(self): + from gateway.platforms.signal import _redact_phone + assert _redact_phone("") == "" + + def test_parse_comma_list(self): + from gateway.platforms.signal import _parse_comma_list + assert _parse_comma_list("+1234, +5678 , +9012") == ["+1234", "+5678", "+9012"] + assert _parse_comma_list("") == [] + assert _parse_comma_list(" , , ") == [] + + def test_guess_extension_png(self): + from gateway.platforms.signal import _guess_extension + assert _guess_extension(b"\x89PNG\r\n\x1a\n" + b"\x00" * 100) == ".png" + + def test_guess_extension_jpeg(self): + from gateway.platforms.signal import _guess_extension + assert _guess_extension(b"\xff\xd8\xff\xe0" + b"\x00" * 100) == ".jpg" + + def test_guess_extension_pdf(self): + from gateway.platforms.signal import _guess_extension + assert _guess_extension(b"%PDF-1.4" + b"\x00" * 100) == ".pdf" + + def test_guess_extension_zip(self): + from gateway.platforms.signal import _guess_extension + assert _guess_extension(b"PK\x03\x04" + b"\x00" * 100) == ".zip" + + def test_guess_extension_mp4(self): + from gateway.platforms.signal import _guess_extension + assert _guess_extension(b"\x00\x00\x00\x18ftypisom" + b"\x00" * 100) == ".mp4" + + def test_guess_extension_unknown(self): + from gateway.platforms.signal import _guess_extension + assert _guess_extension(b"\x00\x01\x02\x03" * 10) == ".bin" + + def test_is_image_ext(self): + from gateway.platforms.signal import _is_image_ext + assert _is_image_ext(".png") is True + assert _is_image_ext(".jpg") is True + assert _is_image_ext(".gif") is True + assert _is_image_ext(".pdf") is False + + def test_is_audio_ext(self): + from gateway.platforms.signal import _is_audio_ext + assert _is_audio_ext(".mp3") is True + assert _is_audio_ext(".ogg") is True + assert _is_audio_ext(".png") is False + + def test_check_requirements(self, monkeypatch): + from gateway.platforms.signal import check_signal_requirements + monkeypatch.setenv("SIGNAL_HTTP_URL", "http://localhost:8080") + monkeypatch.setenv("SIGNAL_ACCOUNT", "+15551234567") + assert check_signal_requirements() is True + + def test_check_requirements_missing(self, monkeypatch): + from gateway.platforms.signal import check_signal_requirements + monkeypatch.delenv("SIGNAL_HTTP_URL", raising=False) + monkeypatch.delenv("SIGNAL_ACCOUNT", raising=False) + assert check_signal_requirements() is False + + +# --------------------------------------------------------------------------- +# Session Source +# --------------------------------------------------------------------------- + +class TestSignalSessionSource: + def test_session_source_alt_fields(self): + from gateway.session import SessionSource + source = SessionSource( + platform=Platform.SIGNAL, + chat_id="+15551234567", + user_id="+15551234567", + user_id_alt="uuid:abc-123", + chat_id_alt=None, + ) + d = source.to_dict() + assert d["user_id_alt"] == "uuid:abc-123" + assert "chat_id_alt" not in d # None fields excluded + + def test_session_source_roundtrip(self): + from gateway.session import SessionSource + source = SessionSource( + platform=Platform.SIGNAL, + chat_id="group:xyz", + chat_type="group", + user_id="+15551234567", + user_id_alt="uuid:abc", + chat_id_alt="xyz", + ) + d = source.to_dict() + restored = SessionSource.from_dict(d) + assert restored.user_id_alt == "uuid:abc" + assert restored.chat_id_alt == "xyz" + assert restored.platform == Platform.SIGNAL + + +# --------------------------------------------------------------------------- +# Phone Redaction in agent/redact.py +# --------------------------------------------------------------------------- + +class TestSignalPhoneRedaction: + def test_us_number(self): + from agent.redact import redact_sensitive_text + result = redact_sensitive_text("Call +15551234567 now") + assert "+15551234567" not in result + assert "+155" in result # Prefix preserved + assert "4567" in result # Suffix preserved + + def test_uk_number(self): + from agent.redact import redact_sensitive_text + result = redact_sensitive_text("UK: +442071838750") + assert "+442071838750" not in result + assert "****" in result + + def test_multiple_numbers(self): + from agent.redact import redact_sensitive_text + text = "From +15551234567 to +442071838750" + result = redact_sensitive_text(text) + assert "+15551234567" not in result + assert "+442071838750" not in result + + def test_short_number_not_matched(self): + from agent.redact import redact_sensitive_text + result = redact_sensitive_text("Code: +12345") + # 5 digits after + is below the 7-digit minimum + assert "+12345" in result # Too short to redact + + +# --------------------------------------------------------------------------- +# Authorization in run.py +# --------------------------------------------------------------------------- + +class TestSignalAuthorization: + def test_signal_in_allowlist_maps(self): + """Signal should be in the platform auth maps.""" + from gateway.run import GatewayRunner + from gateway.config import GatewayConfig + + gw = GatewayRunner.__new__(GatewayRunner) + gw.config = GatewayConfig() + gw.pairing_store = MagicMock() + gw.pairing_store.is_approved.return_value = False + + source = MagicMock() + source.platform = Platform.SIGNAL + source.user_id = "+15559999999" + + # No allowlists set — should check GATEWAY_ALLOW_ALL_USERS + with patch.dict("os.environ", {}, clear=True): + result = gw._is_user_authorized(source) + assert result is False + + +# --------------------------------------------------------------------------- +# Send Message Tool +# --------------------------------------------------------------------------- + +class TestSignalSendMessage: + def test_signal_in_platform_map(self): + """Signal should be in the send_message tool's platform map.""" + from tools.send_message_tool import send_message_tool + # Just verify the import works and Signal is a valid platform + from gateway.config import Platform + assert Platform.SIGNAL.value == "signal" diff --git a/tools/send_message_tool.py b/tools/send_message_tool.py index bc8f2d6508..8e5c0d4fe4 100644 --- a/tools/send_message_tool.py +++ b/tools/send_message_tool.py @@ -8,6 +8,7 @@ human-friendly channel names to IDs. Works in both CLI and gateway contexts. import json import logging import os +import time logger = logging.getLogger(__name__) @@ -107,6 +108,7 @@ def _handle_send(args): "discord": Platform.DISCORD, "slack": Platform.SLACK, "whatsapp": Platform.WHATSAPP, + "signal": Platform.SIGNAL, } platform = platform_map.get(platform_name) if not platform: @@ -160,6 +162,8 @@ async def _send_to_platform(platform, pconfig, chat_id, message): return await _send_discord(pconfig.token, chat_id, message) elif platform == Platform.SLACK: return await _send_slack(pconfig.token, chat_id, message) + elif platform == Platform.SIGNAL: + return await _send_signal(pconfig.extra, chat_id, message) return {"error": f"Direct sending not yet implemented for {platform.value}"} @@ -219,6 +223,42 @@ async def _send_slack(token, chat_id, message): return {"error": f"Slack send failed: {e}"} +async def _send_signal(extra, chat_id, message): + """Send via signal-cli JSON-RPC API.""" + try: + import httpx + except ImportError: + return {"error": "httpx not installed"} + try: + http_url = extra.get("http_url", "http://127.0.0.1:8080").rstrip("/") + account = extra.get("account", "") + if not account: + return {"error": "Signal account not configured"} + + params = {"account": account, "message": message} + if chat_id.startswith("group:"): + params["groupId"] = chat_id[6:] + else: + params["recipient"] = [chat_id] + + payload = { + "jsonrpc": "2.0", + "method": "send", + "params": params, + "id": f"send_{int(time.time() * 1000)}", + } + + async with httpx.AsyncClient(timeout=30.0) as client: + resp = await client.post(f"{http_url}/api/v1/rpc", json=payload) + resp.raise_for_status() + data = resp.json() + if "error" in data: + return {"error": f"Signal RPC error: {data['error']}"} + return {"success": True, "platform": "signal", "chat_id": chat_id} + except Exception as e: + return {"error": f"Signal send failed: {e}"} + + def _check_send_message(): """Gate send_message on gateway running (always available on messaging platforms).""" platform = os.getenv("HERMES_SESSION_PLATFORM", "")