diff --git a/gateway/config.py b/gateway/config.py
index fc348e70d6..e43af65aa2 100644
--- a/gateway/config.py
+++ b/gateway/config.py
@@ -40,6 +40,8 @@ class Platform(Enum):
WHATSAPP = "whatsapp"
SLACK = "slack"
SIGNAL = "signal"
+ MATTERMOST = "mattermost"
+ MATRIX = "matrix"
HOMEASSISTANT = "homeassistant"
EMAIL = "email"
SMS = "sms"
@@ -442,6 +444,8 @@ def load_gateway_config() -> GatewayConfig:
Platform.TELEGRAM: "TELEGRAM_BOT_TOKEN",
Platform.DISCORD: "DISCORD_BOT_TOKEN",
Platform.SLACK: "SLACK_BOT_TOKEN",
+ Platform.MATTERMOST: "MATTERMOST_TOKEN",
+ Platform.MATRIX: "MATRIX_ACCESS_TOKEN",
}
for platform, pconfig in config.platforms.items():
if not pconfig.enabled:
@@ -535,6 +539,53 @@ def _apply_env_overrides(config: GatewayConfig) -> None:
name=os.getenv("SIGNAL_HOME_CHANNEL_NAME", "Home"),
)
+ # Mattermost
+ mattermost_token = os.getenv("MATTERMOST_TOKEN")
+ if mattermost_token:
+ mattermost_url = os.getenv("MATTERMOST_URL", "")
+ if not mattermost_url:
+ logger.warning("MATTERMOST_TOKEN set but MATTERMOST_URL is missing")
+ if Platform.MATTERMOST not in config.platforms:
+ config.platforms[Platform.MATTERMOST] = PlatformConfig()
+ config.platforms[Platform.MATTERMOST].enabled = True
+ config.platforms[Platform.MATTERMOST].token = mattermost_token
+ config.platforms[Platform.MATTERMOST].extra["url"] = mattermost_url
+ mattermost_home = os.getenv("MATTERMOST_HOME_CHANNEL")
+ if mattermost_home:
+ config.platforms[Platform.MATTERMOST].home_channel = HomeChannel(
+ platform=Platform.MATTERMOST,
+ chat_id=mattermost_home,
+ name=os.getenv("MATTERMOST_HOME_CHANNEL_NAME", "Home"),
+ )
+
+ # Matrix
+ matrix_token = os.getenv("MATRIX_ACCESS_TOKEN")
+ matrix_homeserver = os.getenv("MATRIX_HOMESERVER", "")
+ if matrix_token or os.getenv("MATRIX_PASSWORD"):
+ if not matrix_homeserver:
+ logger.warning("MATRIX_ACCESS_TOKEN/MATRIX_PASSWORD set but MATRIX_HOMESERVER is missing")
+ if Platform.MATRIX not in config.platforms:
+ config.platforms[Platform.MATRIX] = PlatformConfig()
+ config.platforms[Platform.MATRIX].enabled = True
+ if matrix_token:
+ config.platforms[Platform.MATRIX].token = matrix_token
+ config.platforms[Platform.MATRIX].extra["homeserver"] = matrix_homeserver
+ matrix_user = os.getenv("MATRIX_USER_ID", "")
+ if matrix_user:
+ config.platforms[Platform.MATRIX].extra["user_id"] = matrix_user
+ matrix_password = os.getenv("MATRIX_PASSWORD", "")
+ if matrix_password:
+ config.platforms[Platform.MATRIX].extra["password"] = matrix_password
+ matrix_e2ee = os.getenv("MATRIX_ENCRYPTION", "").lower() in ("true", "1", "yes")
+ config.platforms[Platform.MATRIX].extra["encryption"] = matrix_e2ee
+ matrix_home = os.getenv("MATRIX_HOME_ROOM")
+ if matrix_home:
+ config.platforms[Platform.MATRIX].home_channel = HomeChannel(
+ platform=Platform.MATRIX,
+ chat_id=matrix_home,
+ name=os.getenv("MATRIX_HOME_ROOM_NAME", "Home"),
+ )
+
# Home Assistant
hass_token = os.getenv("HASS_TOKEN")
if hass_token:
diff --git a/gateway/platforms/matrix.py b/gateway/platforms/matrix.py
new file mode 100644
index 0000000000..6a4b0f8220
--- /dev/null
+++ b/gateway/platforms/matrix.py
@@ -0,0 +1,841 @@
+"""Matrix gateway adapter.
+
+Connects to any Matrix homeserver (self-hosted or matrix.org) via the
+matrix-nio Python SDK. Supports optional end-to-end encryption (E2EE)
+when installed with ``pip install "matrix-nio[e2e]"``.
+
+Environment variables:
+ MATRIX_HOMESERVER Homeserver URL (e.g. https://matrix.example.org)
+ MATRIX_ACCESS_TOKEN Access token (preferred auth method)
+ MATRIX_USER_ID Full user ID (@bot:server) — required for password login
+ MATRIX_PASSWORD Password (alternative to access token)
+ MATRIX_ENCRYPTION Set "true" to enable E2EE
+ MATRIX_ALLOWED_USERS Comma-separated Matrix user IDs (@user:server)
+ MATRIX_HOME_ROOM Room ID for cron/notification delivery
+"""
+
+from __future__ import annotations
+
+import asyncio
+import json
+import logging
+import mimetypes
+import os
+import re
+import time
+from pathlib import Path
+from typing import Any, Dict, List, Optional, Set
+
+from gateway.config import Platform, PlatformConfig
+from gateway.platforms.base import (
+ BasePlatformAdapter,
+ MessageEvent,
+ MessageType,
+ SendResult,
+)
+
+logger = logging.getLogger(__name__)
+
+# Matrix message size limit (4000 chars practical, spec has no hard limit
+# but clients render poorly above this).
+MAX_MESSAGE_LENGTH = 4000
+
+# Store directory for E2EE keys and sync state.
+_STORE_DIR = Path.home() / ".hermes" / "matrix" / "store"
+
+# Grace period: ignore messages older than this many seconds before startup.
+_STARTUP_GRACE_SECONDS = 5
+
+
+def check_matrix_requirements() -> bool:
+ """Return True if the Matrix adapter can be used."""
+ token = os.getenv("MATRIX_ACCESS_TOKEN", "")
+ password = os.getenv("MATRIX_PASSWORD", "")
+ homeserver = os.getenv("MATRIX_HOMESERVER", "")
+
+ if not token and not password:
+ logger.debug("Matrix: neither MATRIX_ACCESS_TOKEN nor MATRIX_PASSWORD set")
+ return False
+ if not homeserver:
+ logger.warning("Matrix: MATRIX_HOMESERVER not set")
+ return False
+ try:
+ import nio # noqa: F401
+ return True
+ except ImportError:
+ logger.warning(
+ "Matrix: matrix-nio not installed. "
+ "Run: pip install 'matrix-nio[e2e]'"
+ )
+ return False
+
+
+class MatrixAdapter(BasePlatformAdapter):
+ """Gateway adapter for Matrix (any homeserver)."""
+
+ def __init__(self, config: PlatformConfig):
+ super().__init__(config, Platform.MATRIX)
+
+ self._homeserver: str = (
+ config.extra.get("homeserver", "")
+ or os.getenv("MATRIX_HOMESERVER", "")
+ ).rstrip("/")
+ self._access_token: str = config.token or os.getenv("MATRIX_ACCESS_TOKEN", "")
+ self._user_id: str = (
+ config.extra.get("user_id", "")
+ or os.getenv("MATRIX_USER_ID", "")
+ )
+ self._password: str = (
+ config.extra.get("password", "")
+ or os.getenv("MATRIX_PASSWORD", "")
+ )
+ self._encryption: bool = config.extra.get(
+ "encryption",
+ os.getenv("MATRIX_ENCRYPTION", "").lower() in ("true", "1", "yes"),
+ )
+
+ self._client: Any = None # nio.AsyncClient
+ self._sync_task: Optional[asyncio.Task] = None
+ self._closing = False
+ self._startup_ts: float = 0.0
+
+ # Cache: room_id → bool (is DM)
+ self._dm_rooms: Dict[str, bool] = {}
+ # Set of room IDs we've joined
+ self._joined_rooms: Set[str] = set()
+
+ # ------------------------------------------------------------------
+ # Required overrides
+ # ------------------------------------------------------------------
+
+ async def connect(self) -> bool:
+ """Connect to the Matrix homeserver and start syncing."""
+ import nio
+
+ if not self._homeserver:
+ logger.error("Matrix: homeserver URL not configured")
+ return False
+
+ # Determine store path and ensure it exists.
+ store_path = str(_STORE_DIR)
+ _STORE_DIR.mkdir(parents=True, exist_ok=True)
+
+ # Create the client.
+ if self._encryption:
+ try:
+ client = nio.AsyncClient(
+ self._homeserver,
+ self._user_id or "",
+ store_path=store_path,
+ )
+ logger.info("Matrix: E2EE enabled (store: %s)", store_path)
+ except Exception as exc:
+ logger.warning(
+ "Matrix: failed to create E2EE client (%s), "
+ "falling back to plain client. Install: "
+ "pip install 'matrix-nio[e2e]'",
+ exc,
+ )
+ client = nio.AsyncClient(self._homeserver, self._user_id or "")
+ else:
+ client = nio.AsyncClient(self._homeserver, self._user_id or "")
+
+ self._client = client
+
+ # Authenticate.
+ if self._access_token:
+ client.access_token = self._access_token
+ # Resolve user_id if not set.
+ if not self._user_id:
+ resp = await client.whoami()
+ if isinstance(resp, nio.WhoamiResponse):
+ self._user_id = resp.user_id
+ client.user_id = resp.user_id
+ logger.info("Matrix: authenticated as %s", self._user_id)
+ else:
+ logger.error(
+ "Matrix: whoami failed — check MATRIX_ACCESS_TOKEN and MATRIX_HOMESERVER"
+ )
+ await client.close()
+ return False
+ else:
+ client.user_id = self._user_id
+ logger.info("Matrix: using access token for %s", self._user_id)
+ elif self._password and self._user_id:
+ resp = await client.login(
+ self._password,
+ device_name="Hermes Agent",
+ )
+ if isinstance(resp, nio.LoginResponse):
+ logger.info("Matrix: logged in as %s", self._user_id)
+ else:
+ logger.error("Matrix: login failed — %s", getattr(resp, "message", resp))
+ await client.close()
+ return False
+ else:
+ logger.error("Matrix: need MATRIX_ACCESS_TOKEN or MATRIX_USER_ID + MATRIX_PASSWORD")
+ await client.close()
+ return False
+
+ # If E2EE is enabled, load the crypto store.
+ if self._encryption and hasattr(client, "olm"):
+ try:
+ if client.should_upload_keys:
+ await client.keys_upload()
+ logger.info("Matrix: E2EE crypto initialized")
+ except Exception as exc:
+ logger.warning("Matrix: crypto init issue: %s", exc)
+
+ # Register event callbacks.
+ client.add_event_callback(self._on_room_message, nio.RoomMessageText)
+ client.add_event_callback(self._on_room_message_media, nio.RoomMessageMedia)
+ client.add_event_callback(self._on_room_message_media, nio.RoomMessageImage)
+ client.add_event_callback(self._on_room_message_media, nio.RoomMessageAudio)
+ client.add_event_callback(self._on_room_message_media, nio.RoomMessageVideo)
+ client.add_event_callback(self._on_room_message_media, nio.RoomMessageFile)
+ client.add_event_callback(self._on_invite, nio.InviteMemberEvent)
+
+ # If E2EE: handle encrypted events.
+ if self._encryption and hasattr(client, "olm"):
+ client.add_event_callback(
+ self._on_room_message, nio.MegolmEvent
+ )
+
+ # Initial sync to catch up, then start background sync.
+ self._startup_ts = time.time()
+ self._closing = False
+
+ # Do an initial sync to populate room state.
+ resp = await client.sync(timeout=10000, full_state=True)
+ if isinstance(resp, nio.SyncResponse):
+ self._joined_rooms = set(resp.rooms.join.keys())
+ logger.info(
+ "Matrix: initial sync complete, joined %d rooms",
+ len(self._joined_rooms),
+ )
+ # Build DM room cache from m.direct account data.
+ await self._refresh_dm_cache()
+ else:
+ logger.warning("Matrix: initial sync returned %s", type(resp).__name__)
+
+ # Start the sync loop.
+ self._sync_task = asyncio.create_task(self._sync_loop())
+ return True
+
+ async def disconnect(self) -> None:
+ """Disconnect from Matrix."""
+ self._closing = True
+
+ if self._sync_task and not self._sync_task.done():
+ self._sync_task.cancel()
+ try:
+ await self._sync_task
+ except (asyncio.CancelledError, Exception):
+ pass
+
+ if self._client:
+ await self._client.close()
+ self._client = None
+
+ logger.info("Matrix: disconnected")
+
+ async def send(
+ self,
+ chat_id: str,
+ content: str,
+ reply_to: Optional[str] = None,
+ metadata: Optional[Dict[str, Any]] = None,
+ ) -> SendResult:
+ """Send a message to a Matrix room."""
+ import nio
+
+ if not content:
+ return SendResult(success=True)
+
+ formatted = self.format_message(content)
+ chunks = self.truncate_message(formatted, MAX_MESSAGE_LENGTH)
+
+ last_event_id = None
+ for chunk in chunks:
+ msg_content: Dict[str, Any] = {
+ "msgtype": "m.text",
+ "body": chunk,
+ }
+
+ # Convert markdown to HTML for rich rendering.
+ html = self._markdown_to_html(chunk)
+ if html and html != chunk:
+ msg_content["format"] = "org.matrix.custom.html"
+ msg_content["formatted_body"] = html
+
+ # Reply-to support.
+ if reply_to:
+ msg_content["m.relates_to"] = {
+ "m.in_reply_to": {"event_id": reply_to}
+ }
+
+ # Thread support: if metadata has thread_id, send as threaded reply.
+ thread_id = (metadata or {}).get("thread_id")
+ if thread_id:
+ relates_to = msg_content.get("m.relates_to", {})
+ relates_to["rel_type"] = "m.thread"
+ relates_to["event_id"] = thread_id
+ relates_to["is_falling_back"] = True
+ if reply_to and "m.in_reply_to" not in relates_to:
+ relates_to["m.in_reply_to"] = {"event_id": reply_to}
+ msg_content["m.relates_to"] = relates_to
+
+ resp = await self._client.room_send(
+ chat_id,
+ "m.room.message",
+ msg_content,
+ )
+ if isinstance(resp, nio.RoomSendResponse):
+ last_event_id = resp.event_id
+ else:
+ err = getattr(resp, "message", str(resp))
+ logger.error("Matrix: failed to send to %s: %s", chat_id, err)
+ return SendResult(success=False, error=err)
+
+ return SendResult(success=True, message_id=last_event_id)
+
+ async def get_chat_info(self, chat_id: str) -> Dict[str, Any]:
+ """Return room name and type (dm/group)."""
+ name = chat_id
+ chat_type = "group"
+
+ if self._client:
+ room = self._client.rooms.get(chat_id)
+ if room:
+ name = room.display_name or room.canonical_alias or chat_id
+ # Use DM cache.
+ if self._dm_rooms.get(chat_id, False):
+ chat_type = "dm"
+ elif room.member_count == 2:
+ chat_type = "dm"
+
+ return {"name": name, "type": chat_type}
+
+ # ------------------------------------------------------------------
+ # Optional overrides
+ # ------------------------------------------------------------------
+
+ async def send_typing(
+ self, chat_id: str, metadata: Optional[Dict[str, Any]] = None
+ ) -> None:
+ """Send a typing indicator."""
+ if self._client:
+ try:
+ await self._client.room_typing(chat_id, typing_state=True, timeout=30000)
+ except Exception:
+ pass
+
+ async def edit_message(
+ self, chat_id: str, message_id: str, content: str
+ ) -> SendResult:
+ """Edit an existing message (via m.replace)."""
+ import nio
+
+ formatted = self.format_message(content)
+ msg_content: Dict[str, Any] = {
+ "msgtype": "m.text",
+ "body": f"* {formatted}",
+ "m.new_content": {
+ "msgtype": "m.text",
+ "body": formatted,
+ },
+ "m.relates_to": {
+ "rel_type": "m.replace",
+ "event_id": message_id,
+ },
+ }
+
+ html = self._markdown_to_html(formatted)
+ if html and html != formatted:
+ msg_content["m.new_content"]["format"] = "org.matrix.custom.html"
+ msg_content["m.new_content"]["formatted_body"] = html
+ msg_content["format"] = "org.matrix.custom.html"
+ msg_content["formatted_body"] = f"* {html}"
+
+ resp = await self._client.room_send(chat_id, "m.room.message", msg_content)
+ if isinstance(resp, nio.RoomSendResponse):
+ return SendResult(success=True, message_id=resp.event_id)
+ return SendResult(success=False, error=getattr(resp, "message", str(resp)))
+
+ async def send_image(
+ self,
+ chat_id: str,
+ image_url: str,
+ caption: Optional[str] = None,
+ reply_to: Optional[str] = None,
+ metadata: Optional[Dict[str, Any]] = None,
+ ) -> SendResult:
+ """Download an image URL and upload it to Matrix."""
+ try:
+ # Try aiohttp first (always available), fall back to httpx
+ try:
+ import aiohttp as _aiohttp
+ async with _aiohttp.ClientSession() as http:
+ async with http.get(image_url, timeout=_aiohttp.ClientTimeout(total=30)) as resp:
+ resp.raise_for_status()
+ data = await resp.read()
+ ct = resp.content_type or "image/png"
+ fname = image_url.rsplit("/", 1)[-1].split("?")[0] or "image.png"
+ except ImportError:
+ import httpx
+ async with httpx.AsyncClient() as http:
+ resp = await http.get(image_url, follow_redirects=True, timeout=30)
+ resp.raise_for_status()
+ data = resp.content
+ ct = resp.headers.get("content-type", "image/png")
+ fname = image_url.rsplit("/", 1)[-1].split("?")[0] or "image.png"
+ except Exception as exc:
+ logger.warning("Matrix: failed to download image %s: %s", image_url, exc)
+ return await self.send(chat_id, f"{caption or ''}\n{image_url}".strip(), reply_to)
+
+ return await self._upload_and_send(chat_id, data, fname, ct, "m.image", caption, reply_to, metadata)
+
+ async def send_image_file(
+ self,
+ chat_id: str,
+ image_path: str,
+ caption: Optional[str] = None,
+ reply_to: Optional[str] = None,
+ metadata: Optional[Dict[str, Any]] = None,
+ ) -> SendResult:
+ """Upload a local image file to Matrix."""
+ return await self._send_local_file(chat_id, image_path, "m.image", caption, reply_to, metadata=metadata)
+
+ async def send_document(
+ self,
+ chat_id: str,
+ file_path: str,
+ caption: Optional[str] = None,
+ file_name: Optional[str] = None,
+ reply_to: Optional[str] = None,
+ metadata: Optional[Dict[str, Any]] = None,
+ ) -> SendResult:
+ """Upload a local file as a document."""
+ return await self._send_local_file(chat_id, file_path, "m.file", caption, reply_to, file_name, metadata)
+
+ async def send_voice(
+ self,
+ chat_id: str,
+ audio_path: str,
+ caption: Optional[str] = None,
+ reply_to: Optional[str] = None,
+ metadata: Optional[Dict[str, Any]] = None,
+ ) -> SendResult:
+ """Upload an audio file as a voice message."""
+ return await self._send_local_file(chat_id, audio_path, "m.audio", caption, reply_to, metadata=metadata)
+
+ async def send_video(
+ self,
+ chat_id: str,
+ video_path: str,
+ caption: Optional[str] = None,
+ reply_to: Optional[str] = None,
+ metadata: Optional[Dict[str, Any]] = None,
+ ) -> SendResult:
+ """Upload a video file."""
+ return await self._send_local_file(chat_id, video_path, "m.video", caption, reply_to, metadata=metadata)
+
+ def format_message(self, content: str) -> str:
+ """Pass-through — Matrix supports standard Markdown natively."""
+ # Strip image markdown; media is uploaded separately.
+ content = re.sub(r"!\[([^\]]*)\]\(([^)]+)\)", r"\2", content)
+ return content
+
+ # ------------------------------------------------------------------
+ # File helpers
+ # ------------------------------------------------------------------
+
+ async def _upload_and_send(
+ self,
+ room_id: str,
+ data: bytes,
+ filename: str,
+ content_type: str,
+ msgtype: str,
+ caption: Optional[str] = None,
+ reply_to: Optional[str] = None,
+ metadata: Optional[Dict[str, Any]] = None,
+ ) -> SendResult:
+ """Upload bytes to Matrix and send as a media message."""
+ import nio
+
+ # Upload to homeserver.
+ resp = await self._client.upload(
+ data,
+ content_type=content_type,
+ filename=filename,
+ )
+ if not isinstance(resp, nio.UploadResponse):
+ err = getattr(resp, "message", str(resp))
+ logger.error("Matrix: upload failed: %s", err)
+ return SendResult(success=False, error=err)
+
+ mxc_url = resp.content_uri
+
+ # Build media message content.
+ msg_content: Dict[str, Any] = {
+ "msgtype": msgtype,
+ "body": caption or filename,
+ "url": mxc_url,
+ "info": {
+ "mimetype": content_type,
+ "size": len(data),
+ },
+ }
+
+ if reply_to:
+ msg_content["m.relates_to"] = {
+ "m.in_reply_to": {"event_id": reply_to}
+ }
+
+ thread_id = (metadata or {}).get("thread_id")
+ if thread_id:
+ relates_to = msg_content.get("m.relates_to", {})
+ relates_to["rel_type"] = "m.thread"
+ relates_to["event_id"] = thread_id
+ relates_to["is_falling_back"] = True
+ msg_content["m.relates_to"] = relates_to
+
+ resp2 = await self._client.room_send(room_id, "m.room.message", msg_content)
+ if isinstance(resp2, nio.RoomSendResponse):
+ return SendResult(success=True, message_id=resp2.event_id)
+ return SendResult(success=False, error=getattr(resp2, "message", str(resp2)))
+
+ async def _send_local_file(
+ self,
+ room_id: str,
+ file_path: str,
+ msgtype: str,
+ caption: Optional[str] = None,
+ reply_to: Optional[str] = None,
+ file_name: Optional[str] = None,
+ metadata: Optional[Dict[str, Any]] = None,
+ ) -> SendResult:
+ """Read a local file and upload it."""
+ p = Path(file_path)
+ if not p.exists():
+ return await self.send(
+ room_id, f"{caption or ''}\n(file not found: {file_path})", reply_to
+ )
+
+ fname = file_name or p.name
+ ct = mimetypes.guess_type(fname)[0] or "application/octet-stream"
+ data = p.read_bytes()
+
+ return await self._upload_and_send(room_id, data, fname, ct, msgtype, caption, reply_to, metadata)
+
+ # ------------------------------------------------------------------
+ # Sync loop
+ # ------------------------------------------------------------------
+
+ async def _sync_loop(self) -> None:
+ """Continuously sync with the homeserver."""
+ while not self._closing:
+ try:
+ await self._client.sync(timeout=30000)
+ except asyncio.CancelledError:
+ return
+ except Exception as exc:
+ if self._closing:
+ return
+ logger.warning("Matrix: sync error: %s — retrying in 5s", exc)
+ await asyncio.sleep(5)
+
+ # ------------------------------------------------------------------
+ # Event callbacks
+ # ------------------------------------------------------------------
+
+ async def _on_room_message(self, room: Any, event: Any) -> None:
+ """Handle incoming text messages (and decrypted megolm events)."""
+ import nio
+
+ # Ignore own messages.
+ if event.sender == self._user_id:
+ return
+
+ # Startup grace: ignore old messages from initial sync.
+ event_ts = getattr(event, "server_timestamp", 0) / 1000.0
+ if event_ts and event_ts < self._startup_ts - _STARTUP_GRACE_SECONDS:
+ return
+
+ # Handle decrypted MegolmEvents — extract the inner event.
+ if isinstance(event, nio.MegolmEvent):
+ # Failed to decrypt.
+ logger.warning(
+ "Matrix: could not decrypt event %s in %s",
+ event.event_id, room.room_id,
+ )
+ return
+
+ # Skip edits (m.replace relation).
+ source_content = getattr(event, "source", {}).get("content", {})
+ relates_to = source_content.get("m.relates_to", {})
+ if relates_to.get("rel_type") == "m.replace":
+ return
+
+ body = getattr(event, "body", "") or ""
+ if not body:
+ return
+
+ # Determine chat type.
+ is_dm = self._dm_rooms.get(room.room_id, False)
+ if not is_dm and room.member_count == 2:
+ is_dm = True
+ chat_type = "dm" if is_dm else "group"
+
+ # Thread support.
+ thread_id = None
+ if relates_to.get("rel_type") == "m.thread":
+ thread_id = relates_to.get("event_id")
+
+ # Reply-to detection.
+ reply_to = None
+ in_reply_to = relates_to.get("m.in_reply_to", {})
+ if in_reply_to:
+ reply_to = in_reply_to.get("event_id")
+
+ # Strip reply fallback from body (Matrix prepends "> ..." lines).
+ if reply_to and body.startswith("> "):
+ lines = body.split("\n")
+ stripped = []
+ past_fallback = False
+ for line in lines:
+ if not past_fallback:
+ if line.startswith("> ") or line == ">":
+ continue
+ if line == "":
+ past_fallback = True
+ continue
+ past_fallback = True
+ stripped.append(line)
+ body = "\n".join(stripped) if stripped else body
+
+ # Message type.
+ msg_type = MessageType.TEXT
+ if body.startswith("!") or body.startswith("/"):
+ msg_type = MessageType.COMMAND
+
+ source = self.build_source(
+ chat_id=room.room_id,
+ chat_type=chat_type,
+ user_id=event.sender,
+ user_name=self._get_display_name(room, event.sender),
+ thread_id=thread_id,
+ )
+
+ msg_event = MessageEvent(
+ text=body,
+ message_type=msg_type,
+ source=source,
+ raw_message=getattr(event, "source", {}),
+ message_id=event.event_id,
+ reply_to=reply_to,
+ )
+
+ await self.handle_message(msg_event)
+
+ async def _on_room_message_media(self, room: Any, event: Any) -> None:
+ """Handle incoming media messages (images, audio, video, files)."""
+ import nio
+
+ # Ignore own messages.
+ if event.sender == self._user_id:
+ return
+
+ # Startup grace.
+ event_ts = getattr(event, "server_timestamp", 0) / 1000.0
+ if event_ts and event_ts < self._startup_ts - _STARTUP_GRACE_SECONDS:
+ return
+
+ body = getattr(event, "body", "") or ""
+ url = getattr(event, "url", "")
+
+ # Convert mxc:// to HTTP URL for downstream processing.
+ http_url = ""
+ if url and url.startswith("mxc://"):
+ http_url = self._mxc_to_http(url)
+
+ # Determine message type from event class.
+ media_type = "document"
+ msg_type = MessageType.DOCUMENT
+ if isinstance(event, nio.RoomMessageImage):
+ msg_type = MessageType.PHOTO
+ media_type = "image"
+ elif isinstance(event, nio.RoomMessageAudio):
+ msg_type = MessageType.AUDIO
+ media_type = "audio"
+ elif isinstance(event, nio.RoomMessageVideo):
+ msg_type = MessageType.VIDEO
+ media_type = "video"
+
+ is_dm = self._dm_rooms.get(room.room_id, False)
+ if not is_dm and room.member_count == 2:
+ is_dm = True
+ chat_type = "dm" if is_dm else "group"
+
+ # Thread/reply detection.
+ source_content = getattr(event, "source", {}).get("content", {})
+ relates_to = source_content.get("m.relates_to", {})
+ thread_id = None
+ if relates_to.get("rel_type") == "m.thread":
+ thread_id = relates_to.get("event_id")
+
+ source = self.build_source(
+ chat_id=room.room_id,
+ chat_type=chat_type,
+ user_id=event.sender,
+ user_name=self._get_display_name(room, event.sender),
+ thread_id=thread_id,
+ )
+
+ msg_event = MessageEvent(
+ text=body,
+ message_type=msg_type,
+ source=source,
+ raw_message=getattr(event, "source", {}),
+ message_id=event.event_id,
+ media_urls=[http_url] if http_url else None,
+ media_types=[media_type] if http_url else None,
+ )
+
+ await self.handle_message(msg_event)
+
+ async def _on_invite(self, room: Any, event: Any) -> None:
+ """Auto-join rooms when invited."""
+ import nio
+
+ if not isinstance(event, nio.InviteMemberEvent):
+ return
+
+ # Only process invites directed at us.
+ if event.state_key != self._user_id:
+ return
+
+ if event.membership != "invite":
+ return
+
+ logger.info(
+ "Matrix: invited to %s by %s — joining",
+ room.room_id, event.sender,
+ )
+ try:
+ resp = await self._client.join(room.room_id)
+ if isinstance(resp, nio.JoinResponse):
+ self._joined_rooms.add(room.room_id)
+ logger.info("Matrix: joined %s", room.room_id)
+ # Refresh DM cache since new room may be a DM.
+ await self._refresh_dm_cache()
+ else:
+ logger.warning(
+ "Matrix: failed to join %s: %s",
+ room.room_id, getattr(resp, "message", resp),
+ )
+ except Exception as exc:
+ logger.warning("Matrix: error joining %s: %s", room.room_id, exc)
+
+ # ------------------------------------------------------------------
+ # Helpers
+ # ------------------------------------------------------------------
+
+ async def _refresh_dm_cache(self) -> None:
+ """Refresh the DM room cache from m.direct account data.
+
+ Tries the account_data API first, then falls back to parsing
+ the sync response's account_data for robustness.
+ """
+ if not self._client:
+ return
+
+ dm_data: Optional[Dict] = None
+
+ # Primary: try the dedicated account data endpoint.
+ try:
+ resp = await self._client.get_account_data("m.direct")
+ if hasattr(resp, "content"):
+ dm_data = resp.content
+ elif isinstance(resp, dict):
+ dm_data = resp
+ except Exception as exc:
+ logger.debug("Matrix: get_account_data('m.direct') failed: %s — trying sync fallback", exc)
+
+ # Fallback: parse from the client's account_data store (populated by sync).
+ if dm_data is None:
+ try:
+ # matrix-nio stores account data events on the client object
+ ad = getattr(self._client, "account_data", None)
+ if ad and isinstance(ad, dict) and "m.direct" in ad:
+ event = ad["m.direct"]
+ if hasattr(event, "content"):
+ dm_data = event.content
+ elif isinstance(event, dict):
+ dm_data = event
+ except Exception:
+ pass
+
+ if dm_data is None:
+ return
+
+ dm_room_ids: Set[str] = set()
+ for user_id, rooms in dm_data.items():
+ if isinstance(rooms, list):
+ dm_room_ids.update(rooms)
+
+ self._dm_rooms = {
+ rid: (rid in dm_room_ids)
+ for rid in self._joined_rooms
+ }
+
+ def _get_display_name(self, room: Any, user_id: str) -> str:
+ """Get a user's display name in a room, falling back to user_id."""
+ if room and hasattr(room, "users"):
+ user = room.users.get(user_id)
+ if user and getattr(user, "display_name", None):
+ return user.display_name
+ # Strip the @...:server format to just the localpart.
+ if user_id.startswith("@") and ":" in user_id:
+ return user_id[1:].split(":")[0]
+ return user_id
+
+ def _mxc_to_http(self, mxc_url: str) -> str:
+ """Convert mxc://server/media_id to an HTTP download URL."""
+ # mxc://matrix.org/abc123 → https://matrix.org/_matrix/client/v1/media/download/matrix.org/abc123
+ # Uses the authenticated client endpoint (spec v1.11+) instead of the
+ # deprecated /_matrix/media/v3/download/ path.
+ if not mxc_url.startswith("mxc://"):
+ return mxc_url
+ parts = mxc_url[6:] # strip mxc://
+ # Use our homeserver for download (federation handles the rest).
+ return f"{self._homeserver}/_matrix/client/v1/media/download/{parts}"
+
+ def _markdown_to_html(self, text: str) -> str:
+ """Convert Markdown to Matrix-compatible HTML.
+
+ Uses a simple conversion for common patterns. For full fidelity
+ a markdown-it style library could be used, but this covers the
+ common cases without an extra dependency.
+ """
+ try:
+ import markdown
+ html = markdown.markdown(
+ text,
+ extensions=["fenced_code", "tables", "nl2br"],
+ )
+ # Strip wrapping
tags for single-paragraph messages.
+ if html.count("
") == 1:
+ html = html.replace("
", "").replace("
", "")
+ return html
+ except ImportError:
+ pass
+
+ # Minimal fallback: just handle bold, italic, code.
+ html = text
+ html = re.sub(r"\*\*(.+?)\*\*", r"\1", html)
+ html = re.sub(r"\*(.+?)\*", r"\1", html)
+ html = re.sub(r"`([^`]+)`", r"\1", html)
+ html = re.sub(r"\n", r"
", html)
+ return html
diff --git a/gateway/platforms/mattermost.py b/gateway/platforms/mattermost.py
new file mode 100644
index 0000000000..9279b74e6b
--- /dev/null
+++ b/gateway/platforms/mattermost.py
@@ -0,0 +1,663 @@
+"""Mattermost gateway adapter.
+
+Connects to a self-hosted (or cloud) Mattermost instance via its REST API
+(v4) and WebSocket for real-time events. No external Mattermost library
+required — uses aiohttp which is already a Hermes dependency.
+
+Environment variables:
+ MATTERMOST_URL Server URL (e.g. https://mm.example.com)
+ MATTERMOST_TOKEN Bot token or personal-access token
+ MATTERMOST_ALLOWED_USERS Comma-separated user IDs
+ MATTERMOST_HOME_CHANNEL Channel ID for cron/notification delivery
+"""
+
+from __future__ import annotations
+
+import asyncio
+import json
+import logging
+import os
+import re
+import time
+from pathlib import Path
+from typing import Any, Dict, List, Optional, Tuple
+
+from gateway.config import Platform, PlatformConfig
+from gateway.platforms.base import (
+ BasePlatformAdapter,
+ MessageEvent,
+ MessageType,
+ SendResult,
+)
+
+logger = logging.getLogger(__name__)
+
+# Mattermost post size limit (server default is 16383, but 4000 is the
+# practical limit for readable messages — matching OpenClaw's choice).
+MAX_POST_LENGTH = 4000
+
+# Channel type codes returned by the Mattermost API.
+_CHANNEL_TYPE_MAP = {
+ "D": "dm",
+ "G": "group",
+ "P": "group", # private channel → treat as group
+ "O": "channel",
+}
+
+# Reconnect parameters (exponential backoff).
+_RECONNECT_BASE_DELAY = 2.0
+_RECONNECT_MAX_DELAY = 60.0
+_RECONNECT_JITTER = 0.2
+
+
+def check_mattermost_requirements() -> bool:
+ """Return True if the Mattermost adapter can be used."""
+ token = os.getenv("MATTERMOST_TOKEN", "")
+ url = os.getenv("MATTERMOST_URL", "")
+ if not token:
+ logger.debug("Mattermost: MATTERMOST_TOKEN not set")
+ return False
+ if not url:
+ logger.warning("Mattermost: MATTERMOST_URL not set")
+ return False
+ try:
+ import aiohttp # noqa: F401
+ return True
+ except ImportError:
+ logger.warning("Mattermost: aiohttp not installed")
+ return False
+
+
+class MattermostAdapter(BasePlatformAdapter):
+ """Gateway adapter for Mattermost (self-hosted or cloud)."""
+
+ def __init__(self, config: PlatformConfig):
+ super().__init__(config, Platform.MATTERMOST)
+
+ self._base_url: str = (
+ config.extra.get("url", "")
+ or os.getenv("MATTERMOST_URL", "")
+ ).rstrip("/")
+ self._token: str = config.token or os.getenv("MATTERMOST_TOKEN", "")
+
+ self._bot_user_id: str = ""
+ self._bot_username: str = ""
+
+ # aiohttp session + websocket handle
+ self._session: Any = None # aiohttp.ClientSession
+ self._ws: Any = None # aiohttp.ClientWebSocketResponse
+ self._ws_task: Optional[asyncio.Task] = None
+ self._reconnect_task: Optional[asyncio.Task] = None
+ self._closing = False
+
+ # Reply mode: "thread" to nest replies, "off" for flat messages.
+ self._reply_mode: str = (
+ config.extra.get("reply_mode", "")
+ or os.getenv("MATTERMOST_REPLY_MODE", "off")
+ ).lower()
+
+ # Dedup cache: post_id → timestamp (prevent reprocessing)
+ self._seen_posts: Dict[str, float] = {}
+ self._SEEN_MAX = 2000
+ self._SEEN_TTL = 300 # 5 minutes
+
+ # ------------------------------------------------------------------
+ # HTTP helpers
+ # ------------------------------------------------------------------
+
+ def _headers(self) -> Dict[str, str]:
+ return {
+ "Authorization": f"Bearer {self._token}",
+ "Content-Type": "application/json",
+ }
+
+ async def _api_get(self, path: str) -> Dict[str, Any]:
+ """GET /api/v4/{path}."""
+ import aiohttp
+ url = f"{self._base_url}/api/v4/{path.lstrip('/')}"
+ try:
+ async with self._session.get(url, headers=self._headers()) as resp:
+ if resp.status >= 400:
+ body = await resp.text()
+ logger.error("MM API GET %s → %s: %s", path, resp.status, body[:200])
+ return {}
+ return await resp.json()
+ except aiohttp.ClientError as exc:
+ logger.error("MM API GET %s network error: %s", path, exc)
+ return {}
+
+ async def _api_post(
+ self, path: str, payload: Dict[str, Any]
+ ) -> Dict[str, Any]:
+ """POST /api/v4/{path} with JSON body."""
+ import aiohttp
+ url = f"{self._base_url}/api/v4/{path.lstrip('/')}"
+ try:
+ async with self._session.post(
+ url, headers=self._headers(), json=payload
+ ) as resp:
+ if resp.status >= 400:
+ body = await resp.text()
+ logger.error("MM API POST %s → %s: %s", path, resp.status, body[:200])
+ return {}
+ return await resp.json()
+ except aiohttp.ClientError as exc:
+ logger.error("MM API POST %s network error: %s", path, exc)
+ return {}
+
+ async def _api_put(
+ self, path: str, payload: Dict[str, Any]
+ ) -> Dict[str, Any]:
+ """PUT /api/v4/{path} with JSON body."""
+ import aiohttp
+ url = f"{self._base_url}/api/v4/{path.lstrip('/')}"
+ try:
+ async with self._session.put(
+ url, headers=self._headers(), json=payload
+ ) as resp:
+ if resp.status >= 400:
+ body = await resp.text()
+ logger.error("MM API PUT %s → %s: %s", path, resp.status, body[:200])
+ return {}
+ return await resp.json()
+ except aiohttp.ClientError as exc:
+ logger.error("MM API PUT %s network error: %s", path, exc)
+ return {}
+
+ async def _upload_file(
+ self, channel_id: str, file_data: bytes, filename: str, content_type: str = "application/octet-stream"
+ ) -> Optional[str]:
+ """Upload a file and return its file ID, or None on failure."""
+ import aiohttp
+
+ url = f"{self._base_url}/api/v4/files"
+ form = aiohttp.FormData()
+ form.add_field("channel_id", channel_id)
+ form.add_field(
+ "files",
+ file_data,
+ filename=filename,
+ content_type=content_type,
+ )
+ headers = {"Authorization": f"Bearer {self._token}"}
+ async with self._session.post(url, headers=headers, data=form) as resp:
+ if resp.status >= 400:
+ body = await resp.text()
+ logger.error("MM file upload → %s: %s", resp.status, body[:200])
+ return None
+ data = await resp.json()
+ infos = data.get("file_infos", [])
+ return infos[0]["id"] if infos else None
+
+ # ------------------------------------------------------------------
+ # Required overrides
+ # ------------------------------------------------------------------
+
+ async def connect(self) -> bool:
+ """Connect to Mattermost and start the WebSocket listener."""
+ import aiohttp
+
+ if not self._base_url or not self._token:
+ logger.error("Mattermost: URL or token not configured")
+ return False
+
+ self._session = aiohttp.ClientSession()
+ self._closing = False
+
+ # Verify credentials and fetch bot identity.
+ me = await self._api_get("users/me")
+ if not me or "id" not in me:
+ logger.error("Mattermost: failed to authenticate — check MATTERMOST_TOKEN and MATTERMOST_URL")
+ await self._session.close()
+ return False
+
+ self._bot_user_id = me["id"]
+ self._bot_username = me.get("username", "")
+ logger.info(
+ "Mattermost: authenticated as @%s (%s) on %s",
+ self._bot_username,
+ self._bot_user_id,
+ self._base_url,
+ )
+
+ # Start WebSocket in background.
+ self._ws_task = asyncio.create_task(self._ws_loop())
+ return True
+
+ async def disconnect(self) -> None:
+ """Disconnect from Mattermost."""
+ self._closing = True
+
+ if self._ws_task and not self._ws_task.done():
+ self._ws_task.cancel()
+ try:
+ await self._ws_task
+ except (asyncio.CancelledError, Exception):
+ pass
+
+ if self._reconnect_task and not self._reconnect_task.done():
+ self._reconnect_task.cancel()
+
+ if self._ws:
+ await self._ws.close()
+ self._ws = None
+
+ if self._session and not self._session.closed:
+ await self._session.close()
+
+ logger.info("Mattermost: disconnected")
+
+ async def send(
+ self,
+ chat_id: str,
+ content: str,
+ reply_to: Optional[str] = None,
+ metadata: Optional[Dict[str, Any]] = None,
+ ) -> SendResult:
+ """Send a message (or multiple chunks) to a channel."""
+ if not content:
+ return SendResult(success=True)
+
+ formatted = self.format_message(content)
+ chunks = self.truncate_message(formatted, MAX_POST_LENGTH)
+
+ last_id = None
+ for chunk in chunks:
+ payload: Dict[str, Any] = {
+ "channel_id": chat_id,
+ "message": chunk,
+ }
+ # Thread support: reply_to is the root post ID.
+ if reply_to and self._reply_mode == "thread":
+ payload["root_id"] = reply_to
+
+ data = await self._api_post("posts", payload)
+ if not data or "id" not in data:
+ return SendResult(success=False, error="Failed to create post")
+ last_id = data["id"]
+
+ return SendResult(success=True, message_id=last_id)
+
+ async def get_chat_info(self, chat_id: str) -> Dict[str, Any]:
+ """Return channel name and type."""
+ data = await self._api_get(f"channels/{chat_id}")
+ if not data:
+ return {"name": chat_id, "type": "channel"}
+
+ ch_type = _CHANNEL_TYPE_MAP.get(data.get("type", "O"), "channel")
+ display_name = data.get("display_name") or data.get("name") or chat_id
+ return {"name": display_name, "type": ch_type}
+
+ # ------------------------------------------------------------------
+ # Optional overrides
+ # ------------------------------------------------------------------
+
+ async def send_typing(
+ self, chat_id: str, metadata: Optional[Dict[str, Any]] = None
+ ) -> None:
+ """Send a typing indicator."""
+ await self._api_post(
+ f"users/{self._bot_user_id}/typing",
+ {"channel_id": chat_id},
+ )
+
+ async def edit_message(
+ self, chat_id: str, message_id: str, content: str
+ ) -> SendResult:
+ """Edit an existing post."""
+ formatted = self.format_message(content)
+ data = await self._api_put(
+ f"posts/{message_id}/patch",
+ {"message": formatted},
+ )
+ if not data or "id" not in data:
+ return SendResult(success=False, error="Failed to edit post")
+ return SendResult(success=True, message_id=data["id"])
+
+ async def send_image(
+ self,
+ chat_id: str,
+ image_url: str,
+ caption: Optional[str] = None,
+ reply_to: Optional[str] = None,
+ metadata: Optional[Dict[str, Any]] = None,
+ ) -> SendResult:
+ """Download an image and upload it as a file attachment."""
+ return await self._send_url_as_file(
+ chat_id, image_url, caption, reply_to, "image"
+ )
+
+ async def send_image_file(
+ self,
+ chat_id: str,
+ image_path: str,
+ caption: Optional[str] = None,
+ reply_to: Optional[str] = None,
+ metadata: Optional[Dict[str, Any]] = None,
+ ) -> SendResult:
+ """Upload a local image file."""
+ return await self._send_local_file(
+ chat_id, image_path, caption, reply_to
+ )
+
+ async def send_document(
+ self,
+ chat_id: str,
+ file_path: str,
+ caption: Optional[str] = None,
+ file_name: Optional[str] = None,
+ reply_to: Optional[str] = None,
+ metadata: Optional[Dict[str, Any]] = None,
+ ) -> SendResult:
+ """Upload a local file as a document."""
+ return await self._send_local_file(
+ chat_id, file_path, caption, reply_to, file_name
+ )
+
+ async def send_voice(
+ self,
+ chat_id: str,
+ audio_path: str,
+ caption: Optional[str] = None,
+ reply_to: Optional[str] = None,
+ metadata: Optional[Dict[str, Any]] = None,
+ ) -> SendResult:
+ """Upload an audio file."""
+ return await self._send_local_file(
+ chat_id, audio_path, caption, reply_to
+ )
+
+ async def send_video(
+ self,
+ chat_id: str,
+ video_path: str,
+ caption: Optional[str] = None,
+ reply_to: Optional[str] = None,
+ metadata: Optional[Dict[str, Any]] = None,
+ ) -> SendResult:
+ """Upload a video file."""
+ return await self._send_local_file(
+ chat_id, video_path, caption, reply_to
+ )
+
+ def format_message(self, content: str) -> str:
+ """Mattermost uses standard Markdown — mostly pass through.
+
+ Strip image markdown into plain links (files are uploaded separately).
+ """
+ # Convert  to just the URL — Mattermost renders
+ # image URLs as inline previews automatically.
+ content = re.sub(r"!\[([^\]]*)\]\(([^)]+)\)", r"\2", content)
+ return content
+
+ # ------------------------------------------------------------------
+ # File helpers
+ # ------------------------------------------------------------------
+
+ async def _send_url_as_file(
+ self,
+ chat_id: str,
+ url: str,
+ caption: Optional[str],
+ reply_to: Optional[str],
+ kind: str = "file",
+ ) -> SendResult:
+ """Download a URL and upload it as a file attachment."""
+ import aiohttp
+ try:
+ async with self._session.get(url, timeout=aiohttp.ClientTimeout(total=30)) as resp:
+ if resp.status >= 400:
+ # Fall back to sending the URL as text.
+ return await self.send(chat_id, f"{caption or ''}\n{url}".strip(), reply_to)
+ file_data = await resp.read()
+ ct = resp.content_type or "application/octet-stream"
+ # Derive filename from URL.
+ fname = url.rsplit("/", 1)[-1].split("?")[0] or f"{kind}.png"
+ except Exception as exc:
+ logger.warning("Mattermost: failed to download %s: %s", url, exc)
+ return await self.send(chat_id, f"{caption or ''}\n{url}".strip(), reply_to)
+
+ file_id = await self._upload_file(chat_id, file_data, fname, ct)
+ if not file_id:
+ return await self.send(chat_id, f"{caption or ''}\n{url}".strip(), reply_to)
+
+ payload: Dict[str, Any] = {
+ "channel_id": chat_id,
+ "message": caption or "",
+ "file_ids": [file_id],
+ }
+ if reply_to and self._reply_mode == "thread":
+ payload["root_id"] = reply_to
+
+ data = await self._api_post("posts", payload)
+ if not data or "id" not in data:
+ return SendResult(success=False, error="Failed to post with file")
+ return SendResult(success=True, message_id=data["id"])
+
+ async def _send_local_file(
+ self,
+ chat_id: str,
+ file_path: str,
+ caption: Optional[str],
+ reply_to: Optional[str],
+ file_name: Optional[str] = None,
+ ) -> SendResult:
+ """Upload a local file and attach it to a post."""
+ import mimetypes
+
+ p = Path(file_path)
+ if not p.exists():
+ return await self.send(
+ chat_id, f"{caption or ''}\n(file not found: {file_path})", reply_to
+ )
+
+ fname = file_name or p.name
+ ct = mimetypes.guess_type(fname)[0] or "application/octet-stream"
+ file_data = p.read_bytes()
+
+ file_id = await self._upload_file(chat_id, file_data, fname, ct)
+ if not file_id:
+ return SendResult(success=False, error="File upload failed")
+
+ payload: Dict[str, Any] = {
+ "channel_id": chat_id,
+ "message": caption or "",
+ "file_ids": [file_id],
+ }
+ if reply_to and self._reply_mode == "thread":
+ payload["root_id"] = reply_to
+
+ data = await self._api_post("posts", payload)
+ if not data or "id" not in data:
+ return SendResult(success=False, error="Failed to post with file")
+ return SendResult(success=True, message_id=data["id"])
+
+ # ------------------------------------------------------------------
+ # WebSocket
+ # ------------------------------------------------------------------
+
+ async def _ws_loop(self) -> None:
+ """Connect to the WebSocket and listen for events, reconnecting on failure."""
+ delay = _RECONNECT_BASE_DELAY
+ while not self._closing:
+ try:
+ await self._ws_connect_and_listen()
+ # Clean disconnect — reset delay.
+ delay = _RECONNECT_BASE_DELAY
+ except asyncio.CancelledError:
+ return
+ except Exception as exc:
+ if self._closing:
+ return
+ logger.warning("Mattermost WS error: %s — reconnecting in %.0fs", exc, delay)
+
+ if self._closing:
+ return
+
+ # Exponential backoff with jitter.
+ import random
+ jitter = delay * _RECONNECT_JITTER * random.random()
+ await asyncio.sleep(delay + jitter)
+ delay = min(delay * 2, _RECONNECT_MAX_DELAY)
+
+ async def _ws_connect_and_listen(self) -> None:
+ """Single WebSocket session: connect, authenticate, process events."""
+ # Build WS URL: https:// → wss://, http:// → ws://
+ ws_url = re.sub(r"^http", "ws", self._base_url) + "/api/v4/websocket"
+ logger.info("Mattermost: connecting to %s", ws_url)
+
+ self._ws = await self._session.ws_connect(ws_url, heartbeat=30.0)
+
+ # Authenticate via the WebSocket.
+ auth_msg = {
+ "seq": 1,
+ "action": "authentication_challenge",
+ "data": {"token": self._token},
+ }
+ await self._ws.send_json(auth_msg)
+ logger.info("Mattermost: WebSocket connected and authenticated")
+
+ async for raw_msg in self._ws:
+ if self._closing:
+ return
+
+ if raw_msg.type in (
+ raw_msg.type.TEXT,
+ raw_msg.type.BINARY,
+ ):
+ try:
+ event = json.loads(raw_msg.data)
+ except (json.JSONDecodeError, TypeError):
+ continue
+ await self._handle_ws_event(event)
+ elif raw_msg.type in (
+ raw_msg.type.ERROR,
+ raw_msg.type.CLOSE,
+ raw_msg.type.CLOSING,
+ raw_msg.type.CLOSED,
+ ):
+ logger.info("Mattermost: WebSocket closed (%s)", raw_msg.type)
+ break
+
+ async def _handle_ws_event(self, event: Dict[str, Any]) -> None:
+ """Process a single WebSocket event."""
+ event_type = event.get("event")
+ if event_type != "posted":
+ return
+
+ data = event.get("data", {})
+ raw_post_str = data.get("post")
+ if not raw_post_str:
+ return
+
+ try:
+ post = json.loads(raw_post_str)
+ except (json.JSONDecodeError, TypeError):
+ return
+
+ # Ignore own messages.
+ if post.get("user_id") == self._bot_user_id:
+ return
+
+ # Ignore system posts.
+ if post.get("type"):
+ return
+
+ post_id = post.get("id", "")
+
+ # Dedup.
+ self._prune_seen()
+ if post_id in self._seen_posts:
+ return
+ self._seen_posts[post_id] = time.time()
+
+ # Build message event.
+ channel_id = post.get("channel_id", "")
+ channel_type_raw = data.get("channel_type", "O")
+ chat_type = _CHANNEL_TYPE_MAP.get(channel_type_raw, "channel")
+
+ # For DMs, user_id is sufficient. For channels, check for @mention.
+ message_text = post.get("message", "")
+
+ # Resolve sender info.
+ sender_id = post.get("user_id", "")
+ sender_name = data.get("sender_name", "").lstrip("@") or sender_id
+
+ # Thread support: if the post is in a thread, use root_id.
+ thread_id = post.get("root_id") or None
+
+ # Determine message type.
+ file_ids = post.get("file_ids") or []
+ msg_type = MessageType.TEXT
+ if message_text.startswith("/"):
+ msg_type = MessageType.COMMAND
+
+ # Download file attachments immediately (URLs require auth headers
+ # that downstream tools won't have).
+ media_urls: List[str] = []
+ media_types: List[str] = []
+ for fid in file_ids:
+ try:
+ file_info = await self._api_get(f"files/{fid}/info")
+ fname = file_info.get("name", f"file_{fid}")
+ ext = Path(fname).suffix or ""
+ mime = file_info.get("mime_type", "application/octet-stream")
+
+ import aiohttp
+ dl_url = f"{self._base_url}/api/v4/files/{fid}"
+ async with self._session.get(
+ dl_url,
+ headers={"Authorization": f"Bearer {self._token}"},
+ timeout=aiohttp.ClientTimeout(total=30),
+ ) as resp:
+ if resp.status < 400:
+ file_data = await resp.read()
+ from gateway.platforms.base import cache_image_from_bytes, cache_document_from_bytes
+ if mime.startswith("image/"):
+ local_path = cache_image_from_bytes(file_data, ext or ".png")
+ media_urls.append(local_path)
+ media_types.append("image")
+ elif mime.startswith("audio/"):
+ from gateway.platforms.base import cache_audio_from_bytes
+ local_path = cache_audio_from_bytes(file_data, ext or ".ogg")
+ media_urls.append(local_path)
+ media_types.append("audio")
+ else:
+ local_path = cache_document_from_bytes(file_data, fname)
+ media_urls.append(local_path)
+ media_types.append("document")
+ else:
+ logger.warning("Mattermost: failed to download file %s: HTTP %s", fid, resp.status)
+ except Exception as exc:
+ logger.warning("Mattermost: error downloading file %s: %s", fid, exc)
+
+ source = self.build_source(
+ chat_id=channel_id,
+ chat_type=chat_type,
+ user_id=sender_id,
+ user_name=sender_name,
+ thread_id=thread_id,
+ )
+
+ msg_event = MessageEvent(
+ text=message_text,
+ message_type=msg_type,
+ source=source,
+ raw_message=post,
+ message_id=post_id,
+ media_urls=media_urls if media_urls else None,
+ media_types=media_types if media_types else None,
+ )
+
+ await self.handle_message(msg_event)
+
+ def _prune_seen(self) -> None:
+ """Remove expired entries from the dedup cache."""
+ if len(self._seen_posts) < self._SEEN_MAX:
+ return
+ now = time.time()
+ self._seen_posts = {
+ pid: ts
+ for pid, ts in self._seen_posts.items()
+ if now - ts < self._SEEN_TTL
+ }
diff --git a/gateway/run.py b/gateway/run.py
index 4d8910c225..b5fc8ee80d 100644
--- a/gateway/run.py
+++ b/gateway/run.py
@@ -1147,6 +1147,20 @@ class GatewayRunner:
return None
return DingTalkAdapter(config)
+ elif platform == Platform.MATTERMOST:
+ from gateway.platforms.mattermost import MattermostAdapter, check_mattermost_requirements
+ if not check_mattermost_requirements():
+ logger.warning("Mattermost: MATTERMOST_TOKEN or MATTERMOST_URL not set, or aiohttp missing")
+ return None
+ return MattermostAdapter(config)
+
+ elif platform == Platform.MATRIX:
+ from gateway.platforms.matrix import MatrixAdapter, check_matrix_requirements
+ if not check_matrix_requirements():
+ logger.warning("Matrix: matrix-nio not installed or credentials not set. Run: pip install 'matrix-nio[e2e]'")
+ return None
+ return MatrixAdapter(config)
+
return None
def _is_user_authorized(self, source: SessionSource) -> bool:
@@ -1178,6 +1192,8 @@ class GatewayRunner:
Platform.SIGNAL: "SIGNAL_ALLOWED_USERS",
Platform.EMAIL: "EMAIL_ALLOWED_USERS",
Platform.SMS: "SMS_ALLOWED_USERS",
+ Platform.MATTERMOST: "MATTERMOST_ALLOWED_USERS",
+ Platform.MATRIX: "MATRIX_ALLOWED_USERS",
}
platform_allow_all_map = {
Platform.TELEGRAM: "TELEGRAM_ALLOW_ALL_USERS",
@@ -1187,6 +1203,8 @@ class GatewayRunner:
Platform.SIGNAL: "SIGNAL_ALLOW_ALL_USERS",
Platform.EMAIL: "EMAIL_ALLOW_ALL_USERS",
Platform.SMS: "SMS_ALLOW_ALL_USERS",
+ Platform.MATTERMOST: "MATTERMOST_ALLOW_ALL_USERS",
+ Platform.MATRIX: "MATRIX_ALLOW_ALL_USERS",
}
# Per-platform allow-all flag (e.g., DISCORD_ALLOW_ALL_USERS=true)
diff --git a/hermes_cli/gateway.py b/hermes_cli/gateway.py
index 2de2ac12cf..d1106ae67f 100644
--- a/hermes_cli/gateway.py
+++ b/hermes_cli/gateway.py
@@ -1001,6 +1001,64 @@ _PLATFORMS = [
"help": "Paste your member ID from step 7 above."},
],
},
+ {
+ "key": "matrix",
+ "label": "Matrix",
+ "emoji": "🔐",
+ "token_var": "MATRIX_ACCESS_TOKEN",
+ "setup_instructions": [
+ "1. Works with any Matrix homeserver (self-hosted Synapse/Conduit/Dendrite or matrix.org)",
+ "2. Create a bot user on your homeserver, or use your own account",
+ "3. Get an access token: Element → Settings → Help & About → Access Token",
+ " Or via API: curl -X POST https://your-server/_matrix/client/v3/login \\",
+ " -d '{\"type\":\"m.login.password\",\"user\":\"@bot:server\",\"password\":\"...\"}'",
+ "4. Alternatively, provide user ID + password and Hermes will log in directly",
+ "5. For E2EE: set MATRIX_ENCRYPTION=true (requires pip install 'matrix-nio[e2e]')",
+ "6. To find your user ID: it's @username:your-server (shown in Element profile)",
+ ],
+ "vars": [
+ {"name": "MATRIX_HOMESERVER", "prompt": "Homeserver URL (e.g. https://matrix.example.org)", "password": False,
+ "help": "Your Matrix homeserver URL. Works with any self-hosted instance."},
+ {"name": "MATRIX_ACCESS_TOKEN", "prompt": "Access token (leave empty to use password login instead)", "password": True,
+ "help": "Paste your access token, or leave empty and provide user ID + password below."},
+ {"name": "MATRIX_USER_ID", "prompt": "User ID (@bot:server — required for password login)", "password": False,
+ "help": "Full Matrix user ID, e.g. @hermes:matrix.example.org"},
+ {"name": "MATRIX_ALLOWED_USERS", "prompt": "Allowed user IDs (comma-separated, e.g. @you:server)", "password": False,
+ "is_allowlist": True,
+ "help": "Matrix user IDs who can interact with the bot."},
+ {"name": "MATRIX_HOME_ROOM", "prompt": "Home room ID (for cron/notification delivery, or empty to set later with /set-home)", "password": False,
+ "help": "Room ID (e.g. !abc123:server) for delivering cron results and notifications."},
+ ],
+ },
+ {
+ "key": "mattermost",
+ "label": "Mattermost",
+ "emoji": "💬",
+ "token_var": "MATTERMOST_TOKEN",
+ "setup_instructions": [
+ "1. In Mattermost: Integrations → Bot Accounts → Add Bot Account",
+ " (System Console → Integrations → Bot Accounts must be enabled)",
+ "2. Give it a username (e.g. hermes) and copy the bot token",
+ "3. Works with any self-hosted Mattermost instance — enter your server URL",
+ "4. To find your user ID: click your avatar (top-left) → Profile",
+ " Your user ID is displayed there — click it to copy.",
+ " ⚠ This is NOT your username — it's a 26-character alphanumeric ID.",
+ "5. To get a channel ID: click the channel name → View Info → copy the ID",
+ ],
+ "vars": [
+ {"name": "MATTERMOST_URL", "prompt": "Server URL (e.g. https://mm.example.com)", "password": False,
+ "help": "Your Mattermost server URL. Works with any self-hosted instance."},
+ {"name": "MATTERMOST_TOKEN", "prompt": "Bot token", "password": True,
+ "help": "Paste the bot token from step 2 above."},
+ {"name": "MATTERMOST_ALLOWED_USERS", "prompt": "Allowed user IDs (comma-separated)", "password": False,
+ "is_allowlist": True,
+ "help": "Your Mattermost user ID from step 4 above."},
+ {"name": "MATTERMOST_HOME_CHANNEL", "prompt": "Home channel ID (for cron/notification delivery, or empty to set later with /set-home)", "password": False,
+ "help": "Channel ID where Hermes delivers cron results and notifications."},
+ {"name": "MATTERMOST_REPLY_MODE", "prompt": "Reply mode — 'off' for flat messages, 'thread' for threaded replies (default: off)", "password": False,
+ "help": "off = flat channel messages, thread = replies nest under your message."},
+ ],
+ },
{
"key": "whatsapp",
"label": "WhatsApp",
@@ -1100,6 +1158,16 @@ def _platform_status(platform: dict) -> str:
if any([val, pwd, imap, smtp]):
return "partially configured"
return "not configured"
+ if platform.get("key") == "matrix":
+ homeserver = get_env_value("MATRIX_HOMESERVER")
+ password = get_env_value("MATRIX_PASSWORD")
+ if (val or password) and homeserver:
+ e2ee = get_env_value("MATRIX_ENCRYPTION")
+ suffix = " + E2EE" if e2ee and e2ee.lower() in ("true", "1", "yes") else ""
+ return f"configured{suffix}"
+ if val or password or homeserver:
+ return "partially configured"
+ return "not configured"
if val:
return "configured"
return "not configured"
diff --git a/hermes_cli/setup.py b/hermes_cli/setup.py
index b635377f53..18b4485bb4 100644
--- a/hermes_cli/setup.py
+++ b/hermes_cli/setup.py
@@ -2518,6 +2518,119 @@ def setup_gateway(config: dict):
" Set SLACK_ALLOW_ALL_USERS=true or GATEWAY_ALLOW_ALL_USERS=true only if you intentionally want open workspace access."
)
+ # ── Matrix ──
+ existing_matrix = get_env_value("MATRIX_ACCESS_TOKEN") or get_env_value("MATRIX_PASSWORD")
+ if existing_matrix:
+ print_info("Matrix: already configured")
+ if prompt_yes_no("Reconfigure Matrix?", False):
+ existing_matrix = None
+
+ if not existing_matrix and prompt_yes_no("Set up Matrix?", False):
+ print_info("Works with any Matrix homeserver (Synapse, Conduit, Dendrite, or matrix.org).")
+ print_info(" 1. Create a bot user on your homeserver, or use your own account")
+ print_info(" 2. Get an access token from Element, or provide user ID + password")
+ print()
+ homeserver = prompt("Homeserver URL (e.g. https://matrix.example.org)")
+ if homeserver:
+ save_env_value("MATRIX_HOMESERVER", homeserver.rstrip("/"))
+
+ print()
+ print_info("Auth: provide an access token (recommended), or user ID + password.")
+ token = prompt("Access token (leave empty for password login)", password=True)
+ if token:
+ save_env_value("MATRIX_ACCESS_TOKEN", token)
+ user_id = prompt("User ID (@bot:server — optional, will be auto-detected)")
+ if user_id:
+ save_env_value("MATRIX_USER_ID", user_id)
+ print_success("Matrix access token saved")
+ else:
+ user_id = prompt("User ID (@bot:server)")
+ if user_id:
+ save_env_value("MATRIX_USER_ID", user_id)
+ password = prompt("Password", password=True)
+ if password:
+ save_env_value("MATRIX_PASSWORD", password)
+ print_success("Matrix credentials saved")
+
+ if token or get_env_value("MATRIX_PASSWORD"):
+ # E2EE
+ print()
+ if prompt_yes_no("Enable end-to-end encryption (E2EE)?", False):
+ save_env_value("MATRIX_ENCRYPTION", "true")
+ print_success("E2EE enabled")
+ print_info(" Requires: pip install 'matrix-nio[e2e]'")
+
+ # Allowed users
+ print()
+ print_info("🔒 Security: Restrict who can use your bot")
+ print_info(" Matrix user IDs look like @username:server")
+ print()
+ allowed_users = prompt(
+ "Allowed user IDs (comma-separated, leave empty for open access)"
+ )
+ if allowed_users:
+ save_env_value("MATRIX_ALLOWED_USERS", allowed_users.replace(" ", ""))
+ print_success("Matrix allowlist configured")
+ else:
+ print_info(
+ "⚠️ No allowlist set - anyone who can message the bot can use it!"
+ )
+
+ # Home room
+ print()
+ print_info("📬 Home Room: where Hermes delivers cron job results and notifications.")
+ print_info(" Room IDs look like !abc123:server (shown in Element room settings)")
+ print_info(" You can also set this later by typing /set-home in a Matrix room.")
+ home_room = prompt("Home room ID (leave empty to set later with /set-home)")
+ if home_room:
+ save_env_value("MATRIX_HOME_ROOM", home_room)
+
+ # ── Mattermost ──
+ existing_mattermost = get_env_value("MATTERMOST_TOKEN")
+ if existing_mattermost:
+ print_info("Mattermost: already configured")
+ if prompt_yes_no("Reconfigure Mattermost?", False):
+ existing_mattermost = None
+
+ if not existing_mattermost and prompt_yes_no("Set up Mattermost?", False):
+ print_info("Works with any self-hosted Mattermost instance.")
+ print_info(" 1. In Mattermost: Integrations → Bot Accounts → Add Bot Account")
+ print_info(" 2. Copy the bot token")
+ print()
+ mm_url = prompt("Mattermost server URL (e.g. https://mm.example.com)")
+ if mm_url:
+ save_env_value("MATTERMOST_URL", mm_url.rstrip("/"))
+ token = prompt("Bot token", password=True)
+ if token:
+ save_env_value("MATTERMOST_TOKEN", token)
+ print_success("Mattermost token saved")
+
+ # Allowed users
+ print()
+ print_info("🔒 Security: Restrict who can use your bot")
+ print_info(" To find your user ID: click your avatar → Profile")
+ print_info(" or use the API: GET /api/v4/users/me")
+ print()
+ allowed_users = prompt(
+ "Allowed user IDs (comma-separated, leave empty for open access)"
+ )
+ if allowed_users:
+ save_env_value("MATTERMOST_ALLOWED_USERS", allowed_users.replace(" ", ""))
+ print_success("Mattermost allowlist configured")
+ else:
+ print_info(
+ "⚠️ No allowlist set - anyone who can message the bot can use it!"
+ )
+
+ # Home channel
+ print()
+ print_info("📬 Home Channel: where Hermes delivers cron job results and notifications.")
+ print_info(" To get a channel ID: click channel name → View Info → copy the ID")
+ print_info(" You can also set this later by typing /set-home in a Mattermost channel.")
+ home_channel = prompt("Home channel ID (leave empty to set later with /set-home)")
+ if home_channel:
+ save_env_value("MATTERMOST_HOME_CHANNEL", home_channel)
+
# ── WhatsApp ──
existing_whatsapp = get_env_value("WHATSAPP_ENABLED")
if not existing_whatsapp and prompt_yes_no("Set up WhatsApp?", False):
@@ -2535,6 +2648,9 @@ def setup_gateway(config: dict):
get_env_value("TELEGRAM_BOT_TOKEN")
or get_env_value("DISCORD_BOT_TOKEN")
or get_env_value("SLACK_BOT_TOKEN")
+ or get_env_value("MATTERMOST_TOKEN")
+ or get_env_value("MATRIX_ACCESS_TOKEN")
+ or get_env_value("MATRIX_PASSWORD")
or get_env_value("WHATSAPP_ENABLED")
)
if any_messaging:
diff --git a/pyproject.toml b/pyproject.toml
index 9343a4c6ba..861fca137e 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -46,6 +46,7 @@ dev = ["pytest", "pytest-asyncio", "pytest-xdist", "mcp>=1.2.0"]
messaging = ["python-telegram-bot>=20.0", "discord.py[voice]>=2.0", "aiohttp>=3.9.0", "slack-bolt>=1.18.0", "slack-sdk>=3.27.0"]
cron = ["croniter"]
slack = ["slack-bolt>=1.18.0", "slack-sdk>=3.27.0"]
+matrix = ["matrix-nio[e2e]>=0.24.0"]
cli = ["simple-term-menu"]
tts-premium = ["elevenlabs"]
voice = ["sounddevice>=0.4.6", "numpy>=1.24.0"]