From 8053d48c8df8d931d6ec21bb563d7dfa6434b3c5 Mon Sep 17 00:00:00 2001 From: alt-glitch Date: Sat, 11 Apr 2026 06:51:43 +0530 Subject: [PATCH] refactor(matrix): rewrite adapter from matrix-nio to mautrix-python Translate all nio SDK calls to mautrix equivalents while preserving the adapter structure, business logic, and all features (E2EE, reactions, threading, mention gating, text batching, media caching, voice MSC3245). Key changes: - nio.AsyncClient -> mautrix.client.Client + HTTPAPI + MemoryStateStore - Manual E2EE key management -> OlmMachine with auto key lifecycle - isinstance(resp, nio.XxxResponse) -> mautrix returns values directly - add_event_callback per type -> single ROOM_MESSAGE handler with msgtype dispatch - Room state (member_count, display_name) via async state store lookups - Upload/download return ContentURI/bytes directly (no wrapper objects) --- gateway/platforms/matrix.py | 1407 ++++++++++++++--------------------- gateway/run.py | 2 +- hermes_cli/gateway.py | 2 +- hermes_cli/setup.py | 4 +- 4 files changed, 578 insertions(+), 837 deletions(-) diff --git a/gateway/platforms/matrix.py b/gateway/platforms/matrix.py index 053a5e619..6c1041cf2 100644 --- a/gateway/platforms/matrix.py +++ b/gateway/platforms/matrix.py @@ -1,8 +1,8 @@ """Matrix gateway adapter. Connects to any Matrix homeserver (self-hosted or matrix.org) via the -matrix-nio Python SDK. Supports optional end-to-end encryption (E2EE) -when installed with ``pip install "matrix-nio[e2e]"``. +mautrix Python SDK. Supports optional end-to-end encryption (E2EE) +when installed with ``pip install "mautrix[encryption]"``. Environment variables: MATRIX_HOMESERVER Homeserver URL (e.g. https://matrix.example.org) @@ -24,7 +24,6 @@ Environment variables: from __future__ import annotations import asyncio -import io import json import logging import mimetypes @@ -59,26 +58,22 @@ _STORE_DIR = _get_hermes_dir("platforms/matrix/store", "matrix/store") # Grace period: ignore messages older than this many seconds before startup. _STARTUP_GRACE_SECONDS = 5 -# E2EE key export file for persistence across restarts. -_KEY_EXPORT_FILE = _STORE_DIR / "exported_keys.txt" -_KEY_EXPORT_PASSPHRASE = "hermes-matrix-e2ee-keys" - # Pending undecrypted events: cap and TTL for retry buffer. _MAX_PENDING_EVENTS = 100 _PENDING_EVENT_TTL = 300 # seconds — stop retrying after 5 min _E2EE_INSTALL_HINT = ( - "Install with: pip install 'matrix-nio[e2e]' " + "Install with: pip install 'mautrix[encryption]' " "(requires libolm C library)" ) def _check_e2ee_deps() -> bool: - """Return True if matrix-nio E2EE dependencies (python-olm) are available.""" + """Return True if mautrix E2EE dependencies (python-olm) are available.""" try: - from nio.crypto import ENCRYPTION_ENABLED - return bool(ENCRYPTION_ENABLED) + from mautrix.crypto import OlmMachine # noqa: F401 + return True except (ImportError, AttributeError): return False @@ -96,11 +91,11 @@ def check_matrix_requirements() -> bool: logger.warning("Matrix: MATRIX_HOMESERVER not set") return False try: - import nio # noqa: F401 + import mautrix # noqa: F401 except ImportError: logger.warning( - "Matrix: matrix-nio not installed. " - "Run: pip install 'matrix-nio[e2e]'" + "Matrix: mautrix not installed. " + "Run: pip install 'mautrix[encryption]'" ) return False @@ -152,7 +147,7 @@ class MatrixAdapter(BasePlatformAdapter): or os.getenv("MATRIX_DEVICE_ID", "") ) - self._client: Any = None # nio.AsyncClient + self._client: Any = None # mautrix.client.Client self._sync_task: Optional[asyncio.Task] = None self._closing = False self._startup_ts: float = 0.0 @@ -167,7 +162,7 @@ class MatrixAdapter(BasePlatformAdapter): self._processed_events_set: set = set() # Buffer for undecrypted events pending key receipt. - # Each entry: (room, event, timestamp) + # Each entry: (room_id, event, timestamp) self._pending_megolm: list = [] # Thread participation tracking (for require_mention bypass) @@ -208,21 +203,86 @@ class MatrixAdapter(BasePlatformAdapter): async def connect(self) -> bool: """Connect to the Matrix homeserver and start syncing.""" - import nio + from mautrix.api import HTTPAPI + from mautrix.client import Client + from mautrix.client.state_store import MemoryStateStore, MemorySyncStore + from mautrix.types import EventType, UserID if not self._homeserver: logger.error("Matrix: homeserver URL not configured") return False - # Determine store path and ensure it exists. - store_path = str(_STORE_DIR) + # Ensure store dir exists for E2EE key persistence. _STORE_DIR.mkdir(parents=True, exist_ok=True) + # Create the HTTP API layer. + api = HTTPAPI( + base_url=self._homeserver, + token=self._access_token or "", + ) + # Create the client. - # When a stable device_id is configured, pass it to the constructor - # so matrix-nio binds to it from the start (important for E2EE - # crypto-store persistence across restarts). - ctor_device_id = self._device_id or None + state_store = MemoryStateStore() + sync_store = MemorySyncStore() + client = Client( + mxid=UserID(self._user_id) if self._user_id else UserID(""), + device_id=self._device_id or None, + api=api, + state_store=state_store, + sync_store=sync_store, + ) + + self._client = client + + # Authenticate. + if self._access_token: + api.token = self._access_token + + # Validate the token and learn user_id / device_id. + try: + resp = await client.whoami() + resolved_user_id = getattr(resp, "user_id", "") or self._user_id + resolved_device_id = getattr(resp, "device_id", "") + if resolved_user_id: + self._user_id = str(resolved_user_id) + client.mxid = UserID(self._user_id) + + # Prefer user-configured device_id for stable E2EE identity. + effective_device_id = self._device_id or resolved_device_id + if effective_device_id: + client.device_id = effective_device_id + + logger.info( + "Matrix: using access token for %s%s", + self._user_id or "(unknown user)", + f" (device {effective_device_id})" if effective_device_id else "", + ) + except Exception as exc: + logger.error( + "Matrix: whoami failed — check MATRIX_ACCESS_TOKEN and MATRIX_HOMESERVER: %s", + exc, + ) + return False + elif self._password and self._user_id: + try: + resp = await client.login( + identifier=self._user_id, + password=self._password, + device_name="Hermes Agent", + device_id=self._device_id or None, + ) + # login() stores the token automatically. + if resp and hasattr(resp, "device_id"): + client.device_id = resp.device_id + logger.info("Matrix: logged in as %s", self._user_id) + except Exception as exc: + logger.error("Matrix: login failed — %s", exc) + return False + else: + logger.error("Matrix: need MATRIX_ACCESS_TOKEN or MATRIX_USER_ID + MATRIX_PASSWORD") + return False + + # Set up E2EE if requested. if self._encryption: if not _check_e2ee_deps(): logger.error( @@ -232,16 +292,24 @@ class MatrixAdapter(BasePlatformAdapter): ) return False try: - client = nio.AsyncClient( - self._homeserver, - self._user_id or "", - device_id=ctor_device_id, - store_path=store_path, - ) + from mautrix.crypto import OlmMachine + from mautrix.crypto.store import MemoryCryptoStore + + crypto_store = MemoryCryptoStore() + olm = OlmMachine(client, crypto_store, state_store) + + # Set trust policy: accept unverified devices so senders + # share Megolm session keys with us automatically. + from mautrix.types import TrustState + olm.share_keys_min_trust = TrustState.UNVERIFIED + olm.send_keys_min_trust = TrustState.UNVERIFIED + + await olm.load() + client.crypto = olm logger.info( "Matrix: E2EE enabled (store: %s%s)", - store_path, - f", device_id={self._device_id}" if self._device_id else "", + str(_STORE_DIR), + f", device_id={client.device_id}" if client.device_id else "", ) except Exception as exc: logger.error( @@ -249,158 +317,43 @@ class MatrixAdapter(BasePlatformAdapter): exc, _E2EE_INSTALL_HINT, ) return False - else: - client = nio.AsyncClient( - self._homeserver, - self._user_id or "", - device_id=ctor_device_id, - ) - self._client = client + # Register event handlers. + from mautrix.client import InternalEventType as IntEvt - # Authenticate. - if self._access_token: - client.access_token = self._access_token + client.add_event_handler(EventType.ROOM_MESSAGE, self._on_room_message) + client.add_event_handler(EventType.REACTION, self._on_reaction) + client.add_event_handler(IntEvt.INVITE, self._on_invite) - # With access-token auth, always resolve whoami so we validate the - # token and learn the device_id. The device_id matters for E2EE: - # without it, matrix-nio can send plain messages but may fail to - # decrypt inbound encrypted events or encrypt outbound room sends. - resp = await client.whoami() - if isinstance(resp, nio.WhoamiResponse): - resolved_user_id = getattr(resp, "user_id", "") or self._user_id - resolved_device_id = getattr(resp, "device_id", "") - if resolved_user_id: - self._user_id = resolved_user_id - - # Prefer the user-configured device_id (MATRIX_DEVICE_ID) so - # the bot reuses a stable identity across restarts. Fall back - # to whatever whoami returned. - effective_device_id = self._device_id or resolved_device_id - - # restore_login() is the matrix-nio path that binds the access - # token to a specific device and loads the crypto store. - if effective_device_id and hasattr(client, "restore_login"): - client.restore_login( - self._user_id or resolved_user_id, - effective_device_id, - self._access_token, - ) - else: - if self._user_id: - client.user_id = self._user_id - if effective_device_id: - client.device_id = effective_device_id - client.access_token = self._access_token - if self._encryption: - logger.warning( - "Matrix: access-token login did not restore E2EE state; " - "encrypted rooms may fail until a device_id is available. " - "Set MATRIX_DEVICE_ID to a stable value." - ) - - logger.info( - "Matrix: using access token for %s%s", - self._user_id or "(unknown user)", - f" (device {effective_device_id})" if effective_device_id else "", - ) - else: - logger.error( - "Matrix: whoami failed — check MATRIX_ACCESS_TOKEN and MATRIX_HOMESERVER" - ) - await client.close() - return False - elif self._password and self._user_id: - resp = await client.login( - self._password, - device_name="Hermes Agent", - ) - if isinstance(resp, nio.LoginResponse): - logger.info("Matrix: logged in as %s", self._user_id) - else: - logger.error("Matrix: login failed — %s", getattr(resp, "message", resp)) - await client.close() - return False - else: - logger.error("Matrix: need MATRIX_ACCESS_TOKEN or MATRIX_USER_ID + MATRIX_PASSWORD") - await client.close() - return False - - # If E2EE is enabled, load the crypto store. - if self._encryption and getattr(client, "olm", None): - try: - if client.should_upload_keys: - await client.keys_upload() - logger.info("Matrix: E2EE crypto initialized") - except Exception as exc: - logger.warning("Matrix: crypto init issue: %s", exc) - - # Import previously exported Megolm keys (survives restarts). - if _KEY_EXPORT_FILE.exists(): - try: - await client.import_keys( - str(_KEY_EXPORT_FILE), _KEY_EXPORT_PASSPHRASE, - ) - logger.info("Matrix: imported Megolm keys from backup") - except Exception as exc: - logger.debug("Matrix: could not import keys: %s", exc) - elif self._encryption: - # E2EE was requested but the crypto store failed to load — - # this means encrypted rooms will silently not work. Hard-fail. - logger.error( - "Matrix: E2EE requested but crypto store is not loaded — " - "cannot decrypt or encrypt messages. %s", - _E2EE_INSTALL_HINT, - ) - await client.close() - return False - - # Register event callbacks. - client.add_event_callback(self._on_room_message, nio.RoomMessageText) - client.add_event_callback(self._on_room_message_media, nio.RoomMessageImage) - client.add_event_callback(self._on_room_message_media, nio.RoomMessageAudio) - client.add_event_callback(self._on_room_message_media, nio.RoomMessageVideo) - client.add_event_callback(self._on_room_message_media, nio.RoomMessageFile) - for encrypted_media_cls in ( - getattr(nio, "RoomEncryptedImage", None), - getattr(nio, "RoomEncryptedAudio", None), - getattr(nio, "RoomEncryptedVideo", None), - getattr(nio, "RoomEncryptedFile", None), - ): - if encrypted_media_cls is not None: - client.add_event_callback(self._on_room_message_media, encrypted_media_cls) - client.add_event_callback(self._on_invite, nio.InviteMemberEvent) - - # Reaction events (m.reaction). - if hasattr(nio, "ReactionEvent"): - client.add_event_callback(self._on_reaction, nio.ReactionEvent) - else: - # Older matrix-nio versions: use UnknownEvent fallback. - client.add_event_callback(self._on_unknown_event, nio.UnknownEvent) - - # If E2EE: handle encrypted events. - if self._encryption and hasattr(client, "olm"): - client.add_event_callback( - self._on_room_message, nio.MegolmEvent - ) + if self._encryption and getattr(client, "crypto", None): + client.add_event_handler(EventType.ROOM_ENCRYPTED, self._on_encrypted_event) # Initial sync to catch up, then start background sync. self._startup_ts = time.time() self._closing = False - # Do an initial sync to populate room state. - resp = await client.sync(timeout=10000, full_state=True) - if isinstance(resp, nio.SyncResponse): - self._joined_rooms = set(resp.rooms.join.keys()) - logger.info( - "Matrix: initial sync complete, joined %d rooms", - len(self._joined_rooms), - ) - # Build DM room cache from m.direct account data. - await self._refresh_dm_cache() - await self._run_e2ee_maintenance() - else: - logger.warning("Matrix: initial sync returned %s", type(resp).__name__) + try: + sync_data = await client.sync(timeout=10000, full_state=True) + if isinstance(sync_data, dict): + rooms_join = sync_data.get("rooms", {}).get("join", {}) + self._joined_rooms = set(rooms_join.keys()) + logger.info( + "Matrix: initial sync complete, joined %d rooms", + len(self._joined_rooms), + ) + # Build DM room cache from m.direct account data. + await self._refresh_dm_cache() + else: + logger.warning("Matrix: initial sync returned unexpected type %s", type(sync_data).__name__) + except Exception as exc: + logger.warning("Matrix: initial sync error: %s", exc) + + # Share keys after initial sync if E2EE is enabled. + if self._encryption and getattr(client, "crypto", None): + try: + await client.crypto.share_keys() + except Exception as exc: + logger.warning("Matrix: initial key share failed: %s", exc) # Start the sync loop. self._sync_task = asyncio.create_task(self._sync_loop()) @@ -418,20 +371,11 @@ class MatrixAdapter(BasePlatformAdapter): except (asyncio.CancelledError, Exception): pass - # Export Megolm keys before closing so the next restart can decrypt - # events that used sessions from this run. - if self._client and self._encryption and getattr(self._client, "olm", None): - try: - _STORE_DIR.mkdir(parents=True, exist_ok=True) - await self._client.export_keys( - str(_KEY_EXPORT_FILE), _KEY_EXPORT_PASSPHRASE, - ) - logger.info("Matrix: exported Megolm keys for next restart") - except Exception as exc: - logger.debug("Matrix: could not export keys on disconnect: %s", exc) - if self._client: - await self._client.close() + try: + await self._client.api.session.close() + except Exception: + pass self._client = None logger.info("Matrix: disconnected") @@ -444,7 +388,7 @@ class MatrixAdapter(BasePlatformAdapter): metadata: Optional[Dict[str, Any]] = None, ) -> SendResult: """Send a message to a Matrix room.""" - import nio + from mautrix.types import EventType, RoomID if not content: return SendResult(success=True) @@ -482,52 +426,38 @@ class MatrixAdapter(BasePlatformAdapter): relates_to["m.in_reply_to"] = {"event_id": reply_to} msg_content["m.relates_to"] = relates_to - async def _room_send_once(*, ignore_unverified_devices: bool = False): - return await asyncio.wait_for( - self._client.room_send( - chat_id, - "m.room.message", + try: + event_id = await asyncio.wait_for( + self._client.send_message_event( + RoomID(chat_id), + EventType.ROOM_MESSAGE, msg_content, - ignore_unverified_devices=ignore_unverified_devices, ), timeout=45, ) - - try: - resp = await _room_send_once(ignore_unverified_devices=False) - except Exception as exc: - retryable = isinstance(exc, asyncio.TimeoutError) - olm_unverified = getattr(nio, "OlmUnverifiedDeviceError", None) - send_retry = getattr(nio, "SendRetryError", None) - if isinstance(olm_unverified, type) and isinstance(exc, olm_unverified): - retryable = True - if isinstance(send_retry, type) and isinstance(exc, send_retry): - retryable = True - - if not retryable: - logger.error("Matrix: failed to send to %s: %s", chat_id, exc) - return SendResult(success=False, error=str(exc)) - - logger.warning( - "Matrix: initial encrypted send to %s failed (%s); " - "retrying after E2EE maintenance with ignored unverified devices", - chat_id, - exc, - ) - await self._run_e2ee_maintenance() - try: - resp = await _room_send_once(ignore_unverified_devices=True) - except Exception as retry_exc: - logger.error("Matrix: failed to send to %s after retry: %s", chat_id, retry_exc) - return SendResult(success=False, error=str(retry_exc)) - - if isinstance(resp, nio.RoomSendResponse): - last_event_id = resp.event_id + last_event_id = str(event_id) logger.info("Matrix: sent event %s to %s", last_event_id, chat_id) - else: - err = getattr(resp, "message", str(resp)) - logger.error("Matrix: failed to send to %s: %s", chat_id, err) - return SendResult(success=False, error=err) + except Exception as exc: + # On E2EE errors, retry after sharing keys. + if self._encryption and getattr(self._client, "crypto", None): + try: + await self._client.crypto.share_keys() + event_id = await asyncio.wait_for( + self._client.send_message_event( + RoomID(chat_id), + EventType.ROOM_MESSAGE, + msg_content, + ), + timeout=45, + ) + last_event_id = str(event_id) + logger.info("Matrix: sent event %s to %s (after key share)", last_event_id, chat_id) + continue + except Exception as retry_exc: + logger.error("Matrix: failed to send to %s after retry: %s", chat_id, retry_exc) + return SendResult(success=False, error=str(retry_exc)) + logger.error("Matrix: failed to send to %s: %s", chat_id, exc) + return SendResult(success=False, error=str(exc)) return SendResult(success=True, message_id=last_event_id) @@ -537,14 +467,32 @@ class MatrixAdapter(BasePlatformAdapter): chat_type = "group" if self._client: - room = self._client.rooms.get(chat_id) - if room: - name = room.display_name or room.canonical_alias or chat_id - # Use DM cache. - if self._dm_rooms.get(chat_id, False): - chat_type = "dm" - elif room.member_count == 2: - chat_type = "dm" + # Try state store for member count. + state_store = getattr(self._client, "state_store", None) + if state_store: + try: + members = await state_store.get_members( + chat_id, + ) + if members and len(members) == 2: + chat_type = "dm" + except Exception: + pass + + # Use DM cache. + if self._dm_rooms.get(chat_id, False): + chat_type = "dm" + + # Try to get room name from state. + try: + from mautrix.types import EventType as ET, RoomID + name_evt = await self._client.get_state_event( + RoomID(chat_id), ET.ROOM_NAME, + ) + if name_evt and hasattr(name_evt, "name") and name_evt.name: + name = name_evt.name + except Exception: + pass return {"name": name, "type": chat_type} @@ -558,7 +506,8 @@ class MatrixAdapter(BasePlatformAdapter): """Send a typing indicator.""" if self._client: try: - await self._client.room_typing(chat_id, typing_state=True, timeout=30000) + from mautrix.types import RoomID + await self._client.set_typing(RoomID(chat_id), timeout=30000) except Exception: pass @@ -566,7 +515,7 @@ class MatrixAdapter(BasePlatformAdapter): self, chat_id: str, message_id: str, content: str ) -> SendResult: """Edit an existing message (via m.replace).""" - import nio + from mautrix.types import EventType, RoomID formatted = self.format_message(content) msg_content: Dict[str, Any] = { @@ -589,10 +538,13 @@ class MatrixAdapter(BasePlatformAdapter): msg_content["format"] = "org.matrix.custom.html" msg_content["formatted_body"] = f"* {html}" - resp = await self._client.room_send(chat_id, "m.room.message", msg_content) - if isinstance(resp, nio.RoomSendResponse): - return SendResult(success=True, message_id=resp.event_id) - return SendResult(success=False, error=getattr(resp, "message", str(resp))) + try: + event_id = await self._client.send_message_event( + RoomID(chat_id), EventType.ROOM_MESSAGE, msg_content, + ) + return SendResult(success=True, message_id=str(event_id)) + except Exception as exc: + return SendResult(success=False, error=str(exc)) async def send_image( self, @@ -665,7 +617,7 @@ class MatrixAdapter(BasePlatformAdapter): ) -> SendResult: """Upload an audio file as a voice message (MSC3245 native voice).""" return await self._send_local_file( - chat_id, audio_path, "m.audio", caption, reply_to, + chat_id, audio_path, "m.audio", caption, reply_to, metadata=metadata, is_voice=True ) @@ -703,29 +655,24 @@ class MatrixAdapter(BasePlatformAdapter): is_voice: bool = False, ) -> SendResult: """Upload bytes to Matrix and send as a media message.""" - import nio + from mautrix.types import EventType, RoomID # Upload to homeserver. - # nio expects a DataProvider (callable) or file-like object, not raw bytes. - # nio.upload() returns a tuple (UploadResponse|UploadError, Optional[Dict]) - resp, maybe_encryption_info = await self._client.upload( - io.BytesIO(data), - content_type=content_type, - filename=filename, - filesize=len(data), - ) - if not isinstance(resp, nio.UploadResponse): - err = getattr(resp, "message", str(resp)) - logger.error("Matrix: upload failed: %s", err) - return SendResult(success=False, error=err) - - mxc_url = resp.content_uri + try: + mxc_url = await self._client.upload_media( + data, + mime_type=content_type, + filename=filename, + ) + except Exception as exc: + logger.error("Matrix: upload failed: %s", exc) + return SendResult(success=False, error=str(exc)) # Build media message content. msg_content: Dict[str, Any] = { "msgtype": msgtype, "body": caption or filename, - "url": mxc_url, + "url": str(mxc_url), "info": { "mimetype": content_type, "size": len(data), @@ -749,10 +696,13 @@ class MatrixAdapter(BasePlatformAdapter): relates_to["is_falling_back"] = True msg_content["m.relates_to"] = relates_to - resp2 = await self._client.room_send(room_id, "m.room.message", msg_content) - if isinstance(resp2, nio.RoomSendResponse): - return SendResult(success=True, message_id=resp2.event_id) - return SendResult(success=False, error=getattr(resp2, "message", str(resp2))) + try: + event_id = await self._client.send_message_event( + RoomID(room_id), EventType.ROOM_MESSAGE, msg_content, + ) + return SendResult(success=True, message_id=str(event_id)) + except Exception as exc: + return SendResult(success=False, error=str(exc)) async def _send_local_file( self, @@ -784,37 +734,32 @@ class MatrixAdapter(BasePlatformAdapter): async def _sync_loop(self) -> None: """Continuously sync with the homeserver.""" - import nio - while not self._closing: try: - resp = await self._client.sync(timeout=30000) - if isinstance(resp, nio.SyncError): - if self._closing: - return - err_msg = str(getattr(resp, "message", resp)).lower() - if "m_unknown_token" in err_msg or "m_forbidden" in err_msg or "401" in err_msg: - logger.error( - "Matrix: permanent auth error from sync: %s — stopping sync", - getattr(resp, "message", resp), - ) - return - logger.warning( - "Matrix: sync returned %s: %s — retrying in 5s", - type(resp).__name__, - getattr(resp, "message", resp), - ) - await asyncio.sleep(5) - continue + sync_data = await self._client.sync(timeout=30000) + if isinstance(sync_data, dict): + # Update joined rooms from sync response. + rooms_join = sync_data.get("rooms", {}).get("join", {}) + if rooms_join: + self._joined_rooms.update(rooms_join.keys()) + + # Share keys periodically if E2EE is enabled. + if self._encryption and getattr(self._client, "crypto", None): + try: + await self._client.crypto.share_keys() + except Exception as exc: + logger.warning("Matrix: E2EE key share failed: %s", exc) + + # Retry any buffered undecrypted events. + if self._pending_megolm: + await self._retry_pending_decryptions() - await self._run_e2ee_maintenance() except asyncio.CancelledError: return except Exception as exc: if self._closing: return - # Detect permanent auth/permission failures that will never - # succeed on retry — stop syncing instead of looping forever. + # Detect permanent auth/permission failures. err_str = str(exc).lower() if "401" in err_str or "403" in err_str or "unauthorized" in err_str or "forbidden" in err_str: logger.error("Matrix: permanent auth error: %s — stopping sync", exc) @@ -822,98 +767,19 @@ class MatrixAdapter(BasePlatformAdapter): logger.warning("Matrix: sync error: %s — retrying in 5s", exc) await asyncio.sleep(5) - async def _run_e2ee_maintenance(self) -> None: - """Run matrix-nio E2EE housekeeping between syncs. - - Hermes uses a custom sync loop instead of matrix-nio's sync_forever(), - so we need to explicitly drive the key management work that sync_forever() - normally handles for encrypted rooms. - - Also auto-trusts all devices (so senders share session keys with us) - and retries decryption for any buffered MegolmEvents. - """ - client = self._client - if not client or not self._encryption or not getattr(client, "olm", None): - return - - did_query_keys = client.should_query_keys - - tasks = [asyncio.create_task(client.send_to_device_messages())] - - if client.should_upload_keys: - tasks.append(asyncio.create_task(client.keys_upload())) - - if did_query_keys: - tasks.append(asyncio.create_task(client.keys_query())) - - if client.should_claim_keys: - users = client.get_users_for_key_claiming() - if users: - tasks.append(asyncio.create_task(client.keys_claim(users))) - - for task in asyncio.as_completed(tasks): - try: - await task - except asyncio.CancelledError: - raise - except Exception as exc: - logger.warning("Matrix: E2EE maintenance task failed: %s", exc) - - # After key queries, auto-trust all devices so senders share keys with - # us. For a bot this is the right default — we want to decrypt - # everything, not enforce manual verification. - if did_query_keys: - self._auto_trust_devices() - - # Retry any buffered undecrypted events now that new keys may have - # arrived (from key requests, key queries, or to-device forwarding). - if self._pending_megolm: - await self._retry_pending_decryptions() - - def _auto_trust_devices(self) -> None: - """Trust/verify all unverified devices we know about. - - When other clients see our device as verified, they proactively share - Megolm session keys with us. Without this, many clients will refuse - to include an unverified device in key distributions. - """ - client = self._client - if not client: - return - - device_store = getattr(client, "device_store", None) - if not device_store: - return - - own_device = getattr(client, "device_id", None) - trusted_count = 0 - - try: - # DeviceStore.__iter__ yields OlmDevice objects directly. - for device in device_store: - if getattr(device, "device_id", None) == own_device: - continue - if not getattr(device, "verified", False): - client.verify_device(device) - trusted_count += 1 - except Exception as exc: - logger.debug("Matrix: auto-trust error: %s", exc) - - if trusted_count: - logger.info("Matrix: auto-trusted %d new device(s)", trusted_count) - async def _retry_pending_decryptions(self) -> None: - """Retry decrypting buffered MegolmEvents after new keys arrive.""" - import nio - + """Retry decrypting buffered encrypted events after new keys arrive.""" client = self._client if not client or not self._pending_megolm: return + crypto = getattr(client, "crypto", None) + if not crypto: + return now = time.time() still_pending: list = [] - for room, event, ts in self._pending_megolm: + for room_id, event, ts in self._pending_megolm: # Drop events that have aged past the TTL. if now - ts > _PENDING_EVENT_TTL: logger.debug( @@ -923,39 +789,23 @@ class MatrixAdapter(BasePlatformAdapter): continue try: - decrypted = client.decrypt_event(event) + decrypted = await crypto.decrypt_megolm_event(event) except Exception: - # Still missing the key — keep in buffer. - still_pending.append((room, event, ts)) + still_pending.append((room_id, event, ts)) continue - if isinstance(decrypted, nio.MegolmEvent): - # decrypt_event returned the same undecryptable event. - still_pending.append((room, event, ts)) + if decrypted is None or decrypted is event: + still_pending.append((room_id, event, ts)) continue logger.info( - "Matrix: decrypted buffered event %s (%s)", + "Matrix: decrypted buffered event %s", getattr(event, "event_id", "?"), - type(decrypted).__name__, ) - # Route to the appropriate handler based on decrypted type. + # Route to the appropriate handler. try: - if isinstance(decrypted, nio.RoomMessageText): - await self._on_room_message(room, decrypted) - elif isinstance( - decrypted, - (nio.RoomMessageImage, nio.RoomMessageAudio, - nio.RoomMessageVideo, nio.RoomMessageFile), - ): - await self._on_room_message_media(room, decrypted) - else: - logger.debug( - "Matrix: decrypted event %s has unhandled type %s", - getattr(event, "event_id", "?"), - type(decrypted).__name__, - ) + await self._on_room_message(decrypted) except Exception as exc: logger.warning( "Matrix: error processing decrypted event %s: %s", @@ -968,62 +818,78 @@ class MatrixAdapter(BasePlatformAdapter): # Event callbacks # ------------------------------------------------------------------ - async def _on_room_message(self, room: Any, event: Any) -> None: - """Handle incoming text messages (and decrypted megolm events).""" - import nio + async def _on_room_message(self, event: Any) -> None: + """Handle incoming room message events (text, media).""" + room_id = str(getattr(event, "room_id", "")) + sender = str(getattr(event, "sender", "")) # Ignore own messages. - if event.sender == self._user_id: + if sender == self._user_id: return - # Deduplicate by event ID (nio can fire the same event more than once). - if self._is_duplicate_event(getattr(event, "event_id", None)): + # Deduplicate by event ID. + event_id = str(getattr(event, "event_id", "")) + if self._is_duplicate_event(event_id): return # Startup grace: ignore old messages from initial sync. - event_ts = getattr(event, "server_timestamp", 0) / 1000.0 + event_ts = getattr(event, "timestamp", 0) / 1000.0 if getattr(event, "timestamp", 0) else 0 + # Also check server_timestamp for compatibility. + if not event_ts: + event_ts = getattr(event, "server_timestamp", 0) / 1000.0 if getattr(event, "server_timestamp", 0) else 0 if event_ts and event_ts < self._startup_ts - _STARTUP_GRACE_SECONDS: return - # Handle undecryptable MegolmEvents: request the missing session key - # and buffer the event for retry once the key arrives. - if isinstance(event, nio.MegolmEvent): - logger.warning( - "Matrix: could not decrypt event %s in %s — requesting key", - event.event_id, room.room_id, - ) - - # Ask other devices in the room to forward the session key. - try: - resp = await self._client.request_room_key(event) - if hasattr(resp, "event_id") or not isinstance(resp, Exception): - logger.debug( - "Matrix: room key request sent for session %s", - getattr(event, "session_id", "?"), - ) - except Exception as exc: - logger.debug("Matrix: room key request failed: %s", exc) - - # Buffer for retry on next maintenance cycle. - self._pending_megolm.append((room, event, time.time())) - if len(self._pending_megolm) > _MAX_PENDING_EVENTS: - self._pending_megolm = self._pending_megolm[-_MAX_PENDING_EVENTS:] + # Extract content from the event. + content = getattr(event, "content", None) + if content is None: return - # Skip edits (m.replace relation). - source_content = getattr(event, "source", {}).get("content", {}) + # Get msgtype — either from content object or raw dict. + if hasattr(content, "msgtype"): + msgtype = str(content.msgtype) + elif isinstance(content, dict): + msgtype = content.get("msgtype", "") + else: + msgtype = "" + + # Determine source content dict for relation/thread extraction. + if isinstance(content, dict): + source_content = content + elif hasattr(content, "serialize"): + source_content = content.serialize() + else: + source_content = {} + relates_to = source_content.get("m.relates_to", {}) + + # Skip edits (m.replace relation). if relates_to.get("rel_type") == "m.replace": return - body = getattr(event, "body", "") or "" + # Dispatch by msgtype. + media_msgtypes = ("m.image", "m.audio", "m.video", "m.file") + if msgtype in media_msgtypes: + await self._handle_media_message(room_id, sender, event_id, event_ts, source_content, relates_to, msgtype) + elif msgtype in ("m.text", "m.notice"): + await self._handle_text_message(room_id, sender, event_id, event_ts, source_content, relates_to) + + async def _handle_text_message( + self, + room_id: str, + sender: str, + event_id: str, + event_ts: float, + source_content: dict, + relates_to: dict, + ) -> None: + """Process a text message event.""" + body = source_content.get("body", "") or "" if not body: return # Determine chat type. - is_dm = self._dm_rooms.get(room.room_id, False) - if not is_dm and room.member_count == 2: - is_dm = True + is_dm = await self._is_dm_room(room_id) chat_type = "dm" if is_dm else "group" # Thread support. @@ -1036,7 +902,7 @@ class MatrixAdapter(BasePlatformAdapter): free_rooms_raw = os.getenv("MATRIX_FREE_RESPONSE_ROOMS", "") free_rooms = {r.strip() for r in free_rooms_raw.split(",") if r.strip()} require_mention = os.getenv("MATRIX_REQUIRE_MENTION", "true").lower() not in ("false", "0", "no") - is_free_room = room.room_id in free_rooms + is_free_room = room_id in free_rooms in_bot_thread = bool(thread_id and thread_id in self._bot_participated_threads) formatted_body = source_content.get("formatted_body") @@ -1044,22 +910,22 @@ class MatrixAdapter(BasePlatformAdapter): if not self._is_bot_mentioned(body, formatted_body): return - # DM mention-thread: when enabled, @mentioning bot in a DM creates a thread. + # DM mention-thread. if is_dm and not thread_id: dm_mention_threads = os.getenv("MATRIX_DM_MENTION_THREADS", "false").lower() in ("true", "1", "yes") if dm_mention_threads and self._is_bot_mentioned(body, source_content.get("formatted_body")): - thread_id = event.event_id + thread_id = event_id self._track_thread(thread_id) - # Strip mention from body when present (including in DMs). + # Strip mention from body. if self._is_bot_mentioned(body, source_content.get("formatted_body")): body = self._strip_mention(body) - # Auto-thread: create a thread for non-DM, non-threaded messages. + # Auto-thread. if not is_dm and not thread_id: auto_thread = os.getenv("MATRIX_AUTO_THREAD", "true").lower() in ("true", "1", "yes") if auto_thread: - thread_id = event.event_id + thread_id = event_id self._track_thread(thread_id) # Reply-to detection. @@ -1068,7 +934,7 @@ class MatrixAdapter(BasePlatformAdapter): if in_reply_to: reply_to = in_reply_to.get("event_id") - # Strip reply fallback from body (Matrix prepends "> ..." lines). + # Strip reply fallback from body. if reply_to and body.startswith("> "): lines = body.split("\n") stripped = [] @@ -1089,11 +955,12 @@ class MatrixAdapter(BasePlatformAdapter): if body.startswith(("!", "/")): msg_type = MessageType.COMMAND + display_name = await self._get_display_name(room_id, sender) source = self.build_source( - chat_id=room.room_id, + chat_id=room_id, chat_type=chat_type, - user_id=event.sender, - user_name=self._get_display_name(room, event.sender), + user_id=sender, + user_name=display_name, thread_id=thread_id, ) @@ -1101,218 +968,105 @@ class MatrixAdapter(BasePlatformAdapter): text=body, message_type=msg_type, source=source, - raw_message=getattr(event, "source", {}), - message_id=event.event_id, + raw_message=source_content, + message_id=event_id, reply_to_message_id=reply_to, ) if thread_id: self._track_thread(thread_id) - # Acknowledge receipt so the room shows as read (fire-and-forget). - self._background_read_receipt(room.room_id, event.event_id) + # Acknowledge receipt (fire-and-forget). + self._background_read_receipt(room_id, event_id) - # Only batch plain text messages — commands dispatch immediately. + # Batch plain text messages — commands dispatch immediately. if msg_type == MessageType.TEXT and self._text_batch_delay_seconds > 0: self._enqueue_text_event(msg_event) else: await self.handle_message(msg_event) - # ------------------------------------------------------------------ - # Text message aggregation (handles Matrix client-side splits) - # ------------------------------------------------------------------ - - def _text_batch_key(self, event: MessageEvent) -> str: - """Session-scoped key for text message batching.""" - from gateway.session import build_session_key - return build_session_key( - event.source, - group_sessions_per_user=self.config.extra.get("group_sessions_per_user", True), - thread_sessions_per_user=self.config.extra.get("thread_sessions_per_user", False), - ) - - def _enqueue_text_event(self, event: MessageEvent) -> None: - """Buffer a text event and reset the flush timer. - - When a Matrix client splits a long message, the chunks arrive within - a few hundred milliseconds. This merges them into a single event - before dispatching. - """ - key = self._text_batch_key(event) - existing = self._pending_text_batches.get(key) - chunk_len = len(event.text or "") - if existing is None: - event._last_chunk_len = chunk_len # type: ignore[attr-defined] - self._pending_text_batches[key] = event - else: - if event.text: - existing.text = f"{existing.text}\n{event.text}" if existing.text else event.text - existing._last_chunk_len = chunk_len # type: ignore[attr-defined] - # Merge any media that might be attached - if event.media_urls: - existing.media_urls.extend(event.media_urls) - existing.media_types.extend(event.media_types) - - # Cancel any pending flush and restart the timer - prior_task = self._pending_text_batch_tasks.get(key) - if prior_task and not prior_task.done(): - prior_task.cancel() - self._pending_text_batch_tasks[key] = asyncio.create_task( - self._flush_text_batch(key) - ) - - async def _flush_text_batch(self, key: str) -> None: - """Wait for the quiet period then dispatch the aggregated text. - - Uses a longer delay when the latest chunk is near Matrix's ~4000-char - split point, since a continuation chunk is almost certain. - """ - current_task = asyncio.current_task() - try: - pending = self._pending_text_batches.get(key) - last_len = getattr(pending, "_last_chunk_len", 0) if pending else 0 - if last_len >= self._SPLIT_THRESHOLD: - delay = self._text_batch_split_delay_seconds - else: - delay = self._text_batch_delay_seconds - await asyncio.sleep(delay) - event = self._pending_text_batches.pop(key, None) - if not event: - return - logger.info( - "[Matrix] Flushing text batch %s (%d chars)", - key, len(event.text or ""), - ) - await self.handle_message(event) - finally: - if self._pending_text_batch_tasks.get(key) is current_task: - self._pending_text_batch_tasks.pop(key, None) - - async def _on_room_message_media(self, room: Any, event: Any) -> None: - """Handle incoming media messages (images, audio, video, files).""" - import nio - - # Ignore own messages. - if event.sender == self._user_id: - return - - # Deduplicate by event ID. - if self._is_duplicate_event(getattr(event, "event_id", None)): - return - - # Startup grace. - event_ts = getattr(event, "server_timestamp", 0) / 1000.0 - if event_ts and event_ts < self._startup_ts - _STARTUP_GRACE_SECONDS: - return - - body = getattr(event, "body", "") or "" - url = getattr(event, "url", "") + async def _handle_media_message( + self, + room_id: str, + sender: str, + event_id: str, + event_ts: float, + source_content: dict, + relates_to: dict, + msgtype: str, + ) -> None: + """Process a media message event (image, audio, video, file).""" + body = source_content.get("body", "") or "" + url = source_content.get("url", "") # Convert mxc:// to HTTP URL for downstream processing. http_url = "" if url and url.startswith("mxc://"): http_url = self._mxc_to_http(url) - # Determine message type from event class. - # Use the MIME type from the event's content info when available, - # falling back to category-level MIME types for downstream matching - # (gateway/run.py checks startswith("image/"), startswith("audio/"), etc.) - source_content = getattr(event, "source", {}).get("content", {}) - if not isinstance(source_content, dict): - source_content = {} - event_content = getattr(event, "content", {}) - if not isinstance(event_content, dict): - event_content = {} - content_info = event_content.get("info") if isinstance(event_content, dict) else {} - if not isinstance(content_info, dict) or not content_info: - content_info = source_content.get("info", {}) if isinstance(source_content, dict) else {} - event_mimetype = ( - (content_info.get("mimetype") if isinstance(content_info, dict) else None) - or getattr(event, "mimetype", "") - or "" - ) - # For encrypted media, the URL may be in file.url instead of event.url. - file_content = source_content.get("file", {}) if isinstance(source_content, dict) else {} + # Extract MIME type from content info. + content_info = source_content.get("info", {}) + if not isinstance(content_info, dict): + content_info = {} + event_mimetype = content_info.get("mimetype", "") + + # For encrypted media, the URL may be in file.url. + file_content = source_content.get("file", {}) if not url and isinstance(file_content, dict): url = file_content.get("url", "") or "" if url and url.startswith("mxc://"): http_url = self._mxc_to_http(url) + is_encrypted_media = bool(file_content and isinstance(file_content, dict) and file_content.get("url")) + media_type = "application/octet-stream" msg_type = MessageType.DOCUMENT - - # Safely resolve encrypted media classes — they may not exist on older - # nio versions, and in test environments nio may be mocked (MagicMock - # auto-attributes are not valid types for isinstance). - def _safe_isinstance(obj, cls_name): - cls = getattr(nio, cls_name, None) - if cls is None or not isinstance(cls, type): - return False - return isinstance(obj, cls) - - is_encrypted_image = _safe_isinstance(event, "RoomEncryptedImage") - is_encrypted_audio = _safe_isinstance(event, "RoomEncryptedAudio") - is_encrypted_video = _safe_isinstance(event, "RoomEncryptedVideo") - is_encrypted_file = _safe_isinstance(event, "RoomEncryptedFile") - is_encrypted_media = any((is_encrypted_image, is_encrypted_audio, is_encrypted_video, is_encrypted_file)) is_voice_message = False - if isinstance(event, nio.RoomMessageImage) or is_encrypted_image: + if msgtype == "m.image": msg_type = MessageType.PHOTO media_type = event_mimetype or "image/png" - elif isinstance(event, nio.RoomMessageAudio) or is_encrypted_audio: + elif msgtype == "m.audio": if source_content.get("org.matrix.msc3245.voice") is not None: is_voice_message = True msg_type = MessageType.VOICE else: msg_type = MessageType.AUDIO media_type = event_mimetype or "audio/ogg" - elif isinstance(event, nio.RoomMessageVideo) or is_encrypted_video: + elif msgtype == "m.video": msg_type = MessageType.VIDEO media_type = event_mimetype or "video/mp4" elif event_mimetype: media_type = event_mimetype - # Cache media locally when downstream tools need a real file path: - # - photos (vision tools can't access MXC URLs) - # - voice messages (transcription tools need local files) - # - any encrypted media (HTTP fallback would point at ciphertext) + # Cache media locally when downstream tools need a real file path. cached_path = None should_cache_locally = ( msg_type == MessageType.PHOTO or is_voice_message or is_encrypted_media ) if should_cache_locally and url: try: - if is_voice_message: - download_resp = await self._client.download(mxc=url) - else: - download_resp = await self._client.download(url) - file_bytes = getattr(download_resp, "body", None) + from mautrix.types import ContentURI + file_bytes = await self._client.download_media(ContentURI(url)) if file_bytes is not None: if is_encrypted_media: - from nio.crypto.attachments import decrypt_attachment + from mautrix.crypto.attachments import decrypt_attachment - hashes_value = getattr(event, "hashes", None) - if hashes_value is None and isinstance(file_content, dict): - hashes_value = file_content.get("hashes") + hashes_value = file_content.get("hashes") if isinstance(file_content, dict) else None hash_value = hashes_value.get("sha256") if isinstance(hashes_value, dict) else None - key_value = getattr(event, "key", None) - if key_value is None and isinstance(file_content, dict): - key_value = file_content.get("key") + key_value = file_content.get("key") if isinstance(file_content, dict) else None if isinstance(key_value, dict): key_value = key_value.get("k") - iv_value = getattr(event, "iv", None) - if iv_value is None and isinstance(file_content, dict): - iv_value = file_content.get("iv") + iv_value = file_content.get("iv") if isinstance(file_content, dict) else None if key_value and hash_value and iv_value: file_bytes = decrypt_attachment(file_bytes, key_value, hash_value, iv_value) else: logger.warning( "[Matrix] Encrypted media event missing decryption metadata for %s", - event.event_id, + event_id, ) file_bytes = None @@ -1344,13 +1098,10 @@ class MatrixAdapter(BasePlatformAdapter): except Exception as e: logger.warning("[Matrix] Failed to cache media: %s", e) - is_dm = self._dm_rooms.get(room.room_id, False) - if not is_dm and room.member_count == 2: - is_dm = True + is_dm = await self._is_dm_room(room_id) chat_type = "dm" if is_dm else "group" # Thread/reply detection. - relates_to = source_content.get("m.relates_to", {}) thread_id = None if relates_to.get("rel_type") == "m.thread": thread_id = relates_to.get("event_id") @@ -1360,7 +1111,7 @@ class MatrixAdapter(BasePlatformAdapter): free_rooms_raw = os.getenv("MATRIX_FREE_RESPONSE_ROOMS", "") free_rooms = {r.strip() for r in free_rooms_raw.split(",") if r.strip()} require_mention = os.getenv("MATRIX_REQUIRE_MENTION", "true").lower() not in ("false", "0", "no") - is_free_room = room.room_id in free_rooms + is_free_room = room_id in free_rooms in_bot_thread = bool(thread_id and thread_id in self._bot_participated_threads) if require_mention and not is_free_room and not in_bot_thread: @@ -1368,29 +1119,30 @@ class MatrixAdapter(BasePlatformAdapter): if not self._is_bot_mentioned(body, formatted_body): return - # DM mention-thread: when enabled, @mentioning bot in a DM creates a thread. + # DM mention-thread. if is_dm and not thread_id: dm_mention_threads = os.getenv("MATRIX_DM_MENTION_THREADS", "false").lower() in ("true", "1", "yes") if dm_mention_threads and self._is_bot_mentioned(body, source_content.get("formatted_body")): - thread_id = event.event_id + thread_id = event_id self._track_thread(thread_id) - # Strip mention from body when present (including in DMs). + # Strip mention from body. if self._is_bot_mentioned(body, source_content.get("formatted_body")): body = self._strip_mention(body) - # Auto-thread: create a thread for non-DM, non-threaded messages. + # Auto-thread. if not is_dm and not thread_id: auto_thread = os.getenv("MATRIX_AUTO_THREAD", "true").lower() in ("true", "1", "yes") if auto_thread: - thread_id = event.event_id + thread_id = event_id self._track_thread(thread_id) + display_name = await self._get_display_name(room_id, sender) source = self.build_source( - chat_id=room.room_id, + chat_id=room_id, chat_type=chat_type, - user_id=event.sender, - user_name=self._get_display_name(room, event.sender), + user_id=sender, + user_name=display_name, thread_id=thread_id, ) @@ -1402,8 +1154,8 @@ class MatrixAdapter(BasePlatformAdapter): text=body, message_type=msg_type, source=source, - raw_message=getattr(event, "source", {}), - message_id=event.event_id, + raw_message=source_content, + message_id=event_id, media_urls=media_urls, media_types=media_types, ) @@ -1411,43 +1163,44 @@ class MatrixAdapter(BasePlatformAdapter): if thread_id: self._track_thread(thread_id) - # Acknowledge receipt so the room shows as read (fire-and-forget). - self._background_read_receipt(room.room_id, event.event_id) + self._background_read_receipt(room_id, event_id) await self.handle_message(msg_event) - async def _on_invite(self, room: Any, event: Any) -> None: + async def _on_encrypted_event(self, event: Any) -> None: + """Handle encrypted events that could not be auto-decrypted.""" + room_id = str(getattr(event, "room_id", "")) + event_id = str(getattr(event, "event_id", "")) + + if self._is_duplicate_event(event_id): + return + + logger.warning( + "Matrix: could not decrypt event %s in %s — buffering for retry", + event_id, room_id, + ) + + self._pending_megolm.append((room_id, event, time.time())) + if len(self._pending_megolm) > _MAX_PENDING_EVENTS: + self._pending_megolm = self._pending_megolm[-_MAX_PENDING_EVENTS:] + + async def _on_invite(self, event: Any) -> None: """Auto-join rooms when invited.""" - import nio + from mautrix.types import RoomID - if not isinstance(event, nio.InviteMemberEvent): - return - - # Only process invites directed at us. - if event.state_key != self._user_id: - return - - if event.membership != "invite": - return + room_id = str(getattr(event, "room_id", "")) logger.info( - "Matrix: invited to %s by %s — joining", - room.room_id, event.sender, + "Matrix: invited to %s — joining", + room_id, ) try: - resp = await self._client.join(room.room_id) - if isinstance(resp, nio.JoinResponse): - self._joined_rooms.add(room.room_id) - logger.info("Matrix: joined %s", room.room_id) - # Refresh DM cache since new room may be a DM. - await self._refresh_dm_cache() - else: - logger.warning( - "Matrix: failed to join %s: %s", - room.room_id, getattr(resp, "message", resp), - ) + await self._client.join_room(RoomID(room_id)) + self._joined_rooms.add(room_id) + logger.info("Matrix: joined %s", room_id) + await self._refresh_dm_cache() except Exception as exc: - logger.warning("Matrix: error joining %s: %s", room.room_id, exc) + logger.warning("Matrix: error joining %s: %s", room_id, exc) # ------------------------------------------------------------------ # Reactions (send, receive, processing lifecycle) @@ -1459,7 +1212,7 @@ class MatrixAdapter(BasePlatformAdapter): """Send an emoji reaction to a message in a room. Returns the reaction event_id on success, None on failure. """ - import nio + from mautrix.types import EventType, RoomID if not self._client: return None @@ -1471,15 +1224,11 @@ class MatrixAdapter(BasePlatformAdapter): } } try: - resp = await self._client.room_send( - room_id, "m.reaction", content, - ignore_unverified_devices=True, + resp_event_id = await self._client.send_message_event( + RoomID(room_id), EventType.REACTION, content, ) - if isinstance(resp, nio.RoomSendResponse): - logger.debug("Matrix: sent reaction %s to %s", emoji, event_id) - return resp.event_id - logger.debug("Matrix: reaction send failed: %s", resp) - return None + logger.debug("Matrix: sent reaction %s to %s", emoji, event_id) + return str(resp_event_id) except Exception as exc: logger.debug("Matrix: reaction send error: %s", exc) return None @@ -1513,7 +1262,6 @@ class MatrixAdapter(BasePlatformAdapter): return if outcome == ProcessingOutcome.CANCELLED: return - # Remove the eyes reaction first, if we tracked its event_id. reaction_key = (room_id, msg_id) if reaction_key in self._pending_reactions: eyes_event_id = self._pending_reactions.pop(reaction_key) @@ -1525,42 +1273,91 @@ class MatrixAdapter(BasePlatformAdapter): "\u2705" if outcome == ProcessingOutcome.SUCCESS else "\u274c", ) - async def _on_reaction(self, room: Any, event: Any) -> None: + async def _on_reaction(self, event: Any) -> None: """Handle incoming reaction events.""" - if event.sender == self._user_id: + sender = str(getattr(event, "sender", "")) + if sender == self._user_id: return - if self._is_duplicate_event(getattr(event, "event_id", None)): + event_id = str(getattr(event, "event_id", "")) + if self._is_duplicate_event(event_id): return - # Log for now; future: trigger agent actions based on emoji. - reacts_to = getattr(event, "reacts_to", "") - key = getattr(event, "key", "") - logger.info( - "Matrix: reaction %s from %s on %s in %s", - key, event.sender, reacts_to, room.room_id, + + room_id = str(getattr(event, "room_id", "")) + content = getattr(event, "content", None) + if content: + relates_to = content.get("m.relates_to", {}) if isinstance(content, dict) else getattr(content, "relates_to", {}) + reacts_to = "" + key = "" + if isinstance(relates_to, dict): + reacts_to = relates_to.get("event_id", "") + key = relates_to.get("key", "") + elif hasattr(relates_to, "event_id"): + reacts_to = str(getattr(relates_to, "event_id", "")) + key = str(getattr(relates_to, "key", "")) + logger.info( + "Matrix: reaction %s from %s on %s in %s", + key, sender, reacts_to, room_id, + ) + + # ------------------------------------------------------------------ + # Text message aggregation (handles Matrix client-side splits) + # ------------------------------------------------------------------ + + def _text_batch_key(self, event: MessageEvent) -> str: + """Session-scoped key for text message batching.""" + from gateway.session import build_session_key + return build_session_key( + event.source, + group_sessions_per_user=self.config.extra.get("group_sessions_per_user", True), + thread_sessions_per_user=self.config.extra.get("thread_sessions_per_user", False), ) - async def _on_unknown_event(self, room: Any, event: Any) -> None: - """Fallback handler for events not natively parsed by matrix-nio. + def _enqueue_text_event(self, event: MessageEvent) -> None: + """Buffer a text event and reset the flush timer.""" + key = self._text_batch_key(event) + existing = self._pending_text_batches.get(key) + chunk_len = len(event.text or "") + if existing is None: + event._last_chunk_len = chunk_len # type: ignore[attr-defined] + self._pending_text_batches[key] = event + else: + if event.text: + existing.text = f"{existing.text}\n{event.text}" if existing.text else event.text + existing._last_chunk_len = chunk_len # type: ignore[attr-defined] + if event.media_urls: + existing.media_urls.extend(event.media_urls) + existing.media_types.extend(event.media_types) - Catches m.reaction on older nio versions that lack ReactionEvent. - """ - source = getattr(event, "source", {}) - if source.get("type") != "m.reaction": - return - content = source.get("content", {}) - relates_to = content.get("m.relates_to", {}) - if relates_to.get("rel_type") != "m.annotation": - return - if source.get("sender") == self._user_id: - return - logger.info( - "Matrix: reaction %s from %s on %s in %s", - relates_to.get("key", "?"), - source.get("sender", "?"), - relates_to.get("event_id", "?"), - room.room_id, + prior_task = self._pending_text_batch_tasks.get(key) + if prior_task and not prior_task.done(): + prior_task.cancel() + self._pending_text_batch_tasks[key] = asyncio.create_task( + self._flush_text_batch(key) ) + async def _flush_text_batch(self, key: str) -> None: + """Wait for the quiet period then dispatch the aggregated text.""" + current_task = asyncio.current_task() + try: + pending = self._pending_text_batches.get(key) + last_len = getattr(pending, "_last_chunk_len", 0) if pending else 0 + if last_len >= self._SPLIT_THRESHOLD: + delay = self._text_batch_split_delay_seconds + else: + delay = self._text_batch_delay_seconds + await asyncio.sleep(delay) + event = self._pending_text_batches.pop(key, None) + if not event: + return + logger.info( + "[Matrix] Flushing text batch %s (%d chars)", + key, len(event.text or ""), + ) + await self.handle_message(event) + finally: + if self._pending_text_batch_tasks.get(key) is current_task: + self._pending_text_batch_tasks.pop(key, None) + # ------------------------------------------------------------------ # Read receipts # ------------------------------------------------------------------ @@ -1575,25 +1372,16 @@ class MatrixAdapter(BasePlatformAdapter): asyncio.ensure_future(_send()) async def send_read_receipt(self, room_id: str, event_id: str) -> bool: - """Send a read receipt (m.read) for an event. - - Also sets the fully-read marker so the room is marked as read - in all clients. - """ + """Send a read receipt (m.read) for an event.""" if not self._client: return False try: - if hasattr(self._client, "room_read_markers"): - await self._client.room_read_markers( - room_id, - fully_read_event=event_id, - read_event=event_id, - ) - else: - # Fallback for older matrix-nio. - await self._client.room_send( - room_id, "m.receipt", {"event_id": event_id}, - ) + from mautrix.types import EventID, RoomID + await self._client.set_read_markers( + RoomID(room_id), + fully_read_event=EventID(event_id), + read_receipt=EventID(event_id), + ) logger.debug("Matrix: sent read receipt for %s in %s", event_id, room_id) return True except Exception as exc: @@ -1608,19 +1396,15 @@ class MatrixAdapter(BasePlatformAdapter): self, room_id: str, event_id: str, reason: str = "", ) -> bool: """Redact (delete) a message or event from a room.""" - import nio - if not self._client: return False try: - resp = await self._client.room_redact( - room_id, event_id, reason=reason, + from mautrix.types import EventID, RoomID + await self._client.redact( + RoomID(room_id), EventID(event_id), reason=reason or None, ) - if isinstance(resp, nio.RoomRedactResponse): - logger.info("Matrix: redacted %s in %s", event_id, room_id) - return True - logger.warning("Matrix: redact failed: %s", resp) - return False + logger.info("Matrix: redacted %s in %s", event_id, room_id) + return True except Exception as exc: logger.warning("Matrix: redact error: %s", exc) return False @@ -1635,40 +1419,39 @@ class MatrixAdapter(BasePlatformAdapter): limit: int = 50, start: str = "", ) -> list: - """Fetch recent messages from a room. - - Returns a list of dicts with keys: event_id, sender, body, - timestamp, type. Uses the ``room_messages()`` API. - """ - import nio - + """Fetch recent messages from a room.""" if not self._client: return [] try: - resp = await self._client.room_messages( - room_id, - start=start or "", + from mautrix.types import PaginationDirection, RoomID, SyncToken + resp = await self._client.get_messages( + RoomID(room_id), + direction=PaginationDirection.BACKWARD, + from_token=SyncToken(start) if start else None, limit=limit, - direction=nio.Api.MessageDirection.back - if hasattr(nio.Api, "MessageDirection") - else "b", ) except Exception as exc: - logger.warning("Matrix: room_messages failed for %s: %s", room_id, exc) + logger.warning("Matrix: get_messages failed for %s: %s", room_id, exc) return [] - if not isinstance(resp, nio.RoomMessagesResponse): - logger.warning("Matrix: room_messages returned %s", type(resp).__name__) + if not resp: return [] + events = getattr(resp, "chunk", []) or (resp.get("chunk", []) if isinstance(resp, dict) else []) messages = [] - for event in reversed(resp.chunk): - body = getattr(event, "body", "") or "" + for event in reversed(events): + body = "" + content = getattr(event, "content", None) + if content: + if hasattr(content, "body"): + body = content.body or "" + elif isinstance(content, dict): + body = content.get("body", "") messages.append({ - "event_id": getattr(event, "event_id", ""), - "sender": getattr(event, "sender", ""), + "event_id": str(getattr(event, "event_id", "")), + "sender": str(getattr(event, "sender", "")), "body": body, - "timestamp": getattr(event, "server_timestamp", 0), + "timestamp": getattr(event, "timestamp", 0) or getattr(event, "server_timestamp", 0), "type": type(event).__name__, }) return messages @@ -1685,56 +1468,41 @@ class MatrixAdapter(BasePlatformAdapter): is_direct: bool = False, preset: str = "private_chat", ) -> Optional[str]: - """Create a new Matrix room. - - Args: - name: Human-readable room name. - topic: Room topic. - invite: List of user IDs to invite. - is_direct: Mark as a DM room. - preset: One of private_chat, public_chat, trusted_private_chat. - - Returns the room_id on success, None on failure. - """ - import nio - + """Create a new Matrix room.""" if not self._client: return None try: - resp = await self._client.room_create( + from mautrix.types import RoomCreatePreset, UserID + preset_enum = { + "private_chat": RoomCreatePreset.PRIVATE, + "public_chat": RoomCreatePreset.PUBLIC, + "trusted_private_chat": RoomCreatePreset.TRUSTED_PRIVATE, + }.get(preset, RoomCreatePreset.PRIVATE) + invitees = [UserID(u) for u in (invite or [])] + room_id = await self._client.create_room( name=name or None, topic=topic or None, - invite=invite or [], + invitees=invitees, is_direct=is_direct, - preset=getattr( - nio.Api.RoomPreset if hasattr(nio.Api, "RoomPreset") else type("", (), {}), - preset, None, - ) or preset, + preset=preset_enum, ) - if isinstance(resp, nio.RoomCreateResponse): - room_id = resp.room_id - self._joined_rooms.add(room_id) - logger.info("Matrix: created room %s (%s)", room_id, name or "unnamed") - return room_id - logger.warning("Matrix: room_create failed: %s", resp) - return None + room_id_str = str(room_id) + self._joined_rooms.add(room_id_str) + logger.info("Matrix: created room %s (%s)", room_id_str, name or "unnamed") + return room_id_str except Exception as exc: - logger.warning("Matrix: room_create error: %s", exc) + logger.warning("Matrix: create_room error: %s", exc) return None async def invite_user(self, room_id: str, user_id: str) -> bool: """Invite a user to a room.""" - import nio - if not self._client: return False try: - resp = await self._client.room_invite(room_id, user_id) - if isinstance(resp, nio.RoomInviteResponse): - logger.info("Matrix: invited %s to %s", user_id, room_id) - return True - logger.warning("Matrix: invite failed: %s", resp) - return False + from mautrix.types import RoomID, UserID + await self._client.invite_user(RoomID(room_id), UserID(user_id)) + logger.info("Matrix: invited %s to %s", user_id, room_id) + return True except Exception as exc: logger.warning("Matrix: invite error: %s", exc) return False @@ -1753,13 +1521,21 @@ class MatrixAdapter(BasePlatformAdapter): logger.warning("Matrix: invalid presence state %r", state) return False try: - if hasattr(self._client, "set_presence"): - await self._client.set_presence(state, status_msg=status_msg or None) - logger.debug("Matrix: presence set to %s", state) - return True + from mautrix.types import PresenceState + presence_map = { + "online": PresenceState.ONLINE, + "offline": PresenceState.OFFLINE, + "unavailable": PresenceState.UNAVAILABLE, + } + await self._client.set_presence( + presence=presence_map[state], + status=status_msg or None, + ) + logger.debug("Matrix: presence set to %s", state) + return True except Exception as exc: logger.debug("Matrix: set_presence failed: %s", exc) - return False + return False # ------------------------------------------------------------------ # Emote & notice message types @@ -1769,7 +1545,7 @@ class MatrixAdapter(BasePlatformAdapter): self, chat_id: str, text: str, metadata: Optional[Dict[str, Any]] = None, ) -> SendResult: """Send an emote message (/me style action).""" - import nio + from mautrix.types import EventType, RoomID if not self._client or not text: return SendResult(success=False, error="No client or empty text") @@ -1784,13 +1560,10 @@ class MatrixAdapter(BasePlatformAdapter): msg_content["formatted_body"] = html try: - resp = await self._client.room_send( - chat_id, "m.room.message", msg_content, - ignore_unverified_devices=True, + event_id = await self._client.send_message_event( + RoomID(chat_id), EventType.ROOM_MESSAGE, msg_content, ) - if isinstance(resp, nio.RoomSendResponse): - return SendResult(success=True, message_id=resp.event_id) - return SendResult(success=False, error=str(resp)) + return SendResult(success=True, message_id=str(event_id)) except Exception as exc: return SendResult(success=False, error=str(exc)) @@ -1798,7 +1571,7 @@ class MatrixAdapter(BasePlatformAdapter): self, chat_id: str, text: str, metadata: Optional[Dict[str, Any]] = None, ) -> SendResult: """Send a notice message (bot-appropriate, non-alerting).""" - import nio + from mautrix.types import EventType, RoomID if not self._client or not text: return SendResult(success=False, error="No client or empty text") @@ -1813,13 +1586,10 @@ class MatrixAdapter(BasePlatformAdapter): msg_content["formatted_body"] = html try: - resp = await self._client.room_send( - chat_id, "m.room.message", msg_content, - ignore_unverified_devices=True, + event_id = await self._client.send_message_event( + RoomID(chat_id), EventType.ROOM_MESSAGE, msg_content, ) - if isinstance(resp, nio.RoomSendResponse): - return SendResult(success=True, message_id=resp.event_id) - return SendResult(success=False, error=str(resp)) + return SendResult(success=True, message_id=str(event_id)) except Exception as exc: return SendResult(success=False, error=str(exc)) @@ -1827,18 +1597,28 @@ class MatrixAdapter(BasePlatformAdapter): # Helpers # ------------------------------------------------------------------ - async def _refresh_dm_cache(self) -> None: - """Refresh the DM room cache from m.direct account data. + async def _is_dm_room(self, room_id: str) -> bool: + """Check if a room is a DM.""" + if self._dm_rooms.get(room_id, False): + return True + # Fallback: check member count via state store. + state_store = getattr(self._client, "state_store", None) if self._client else None + if state_store: + try: + members = await state_store.get_members(room_id) + if members and len(members) == 2: + return True + except Exception: + pass + return False - Tries the account_data API first, then falls back to parsing - the sync response's account_data for robustness. - """ + async def _refresh_dm_cache(self) -> None: + """Refresh the DM room cache from m.direct account data.""" if not self._client: return dm_data: Optional[Dict] = None - # Primary: try the dedicated account data endpoint. try: resp = await self._client.get_account_data("m.direct") if hasattr(resp, "content"): @@ -1846,21 +1626,7 @@ class MatrixAdapter(BasePlatformAdapter): elif isinstance(resp, dict): dm_data = resp except Exception as exc: - logger.debug("Matrix: get_account_data('m.direct') failed: %s — trying sync fallback", exc) - - # Fallback: parse from the client's account_data store (populated by sync). - if dm_data is None: - try: - # matrix-nio stores account data events on the client object - ad = getattr(self._client, "account_data", None) - if ad and isinstance(ad, dict) and "m.direct" in ad: - event = ad["m.direct"] - if hasattr(event, "content"): - dm_data = event.content - elif isinstance(event, dict): - dm_data = event - except Exception: - pass + logger.debug("Matrix: get_account_data('m.direct') failed: %s", exc) if dm_data is None: return @@ -1868,7 +1634,7 @@ class MatrixAdapter(BasePlatformAdapter): dm_room_ids: Set[str] = set() for user_id, rooms in dm_data.items(): if isinstance(rooms, list): - dm_room_ids.update(rooms) + dm_room_ids.update(str(r) for r in rooms) self._dm_rooms = { rid: (rid in dm_room_ids) @@ -1925,15 +1691,12 @@ class MatrixAdapter(BasePlatformAdapter): """Return True if the bot is mentioned in the message.""" if not body and not formatted_body: return False - # Check for full @user:server in body if self._user_id and self._user_id in body: return True - # Check for localpart with word boundaries (case-insensitive) if self._user_id and ":" in self._user_id: localpart = self._user_id.split(":")[0].lstrip("@") if localpart and re.search(r'\b' + re.escape(localpart) + r'\b', body, re.IGNORECASE): return True - # Check formatted_body for Matrix pill if formatted_body and self._user_id: if f"matrix.to/#/{self._user_id}" in formatted_body: return True @@ -1941,22 +1704,24 @@ class MatrixAdapter(BasePlatformAdapter): def _strip_mention(self, body: str) -> str: """Remove bot mention from message body.""" - # Remove full @user:server if self._user_id: body = body.replace(self._user_id, "") - # If still contains localpart mention, remove it if self._user_id and ":" in self._user_id: localpart = self._user_id.split(":")[0].lstrip("@") if localpart: body = re.sub(r'\b' + re.escape(localpart) + r'\b', '', body, flags=re.IGNORECASE) return body.strip() - def _get_display_name(self, room: Any, user_id: str) -> str: + async def _get_display_name(self, room_id: str, user_id: str) -> str: """Get a user's display name in a room, falling back to user_id.""" - if room and hasattr(room, "users"): - user = room.users.get(user_id) - if user and getattr(user, "display_name", None): - return user.display_name + state_store = getattr(self._client, "state_store", None) if self._client else None + if state_store: + try: + member = await state_store.get_member(room_id, user_id) + if member and getattr(member, "displayname", None): + return member.displayname + except Exception: + pass # Strip the @...:server format to just the localpart. if user_id.startswith("@") and ":" in user_id: return user_id[1:].split(":")[0] @@ -1964,13 +1729,9 @@ class MatrixAdapter(BasePlatformAdapter): def _mxc_to_http(self, mxc_url: str) -> str: """Convert mxc://server/media_id to an HTTP download URL.""" - # mxc://matrix.org/abc123 → https://matrix.org/_matrix/client/v1/media/download/matrix.org/abc123 - # Uses the authenticated client endpoint (spec v1.11+) instead of the - # deprecated /_matrix/media/v3/download/ path. if not mxc_url.startswith("mxc://"): return mxc_url parts = mxc_url[6:] # strip mxc:// - # Use our homeserver for download (federation handles the rest). return f"{self._homeserver}/_matrix/client/v1/media/download/{parts}" def _markdown_to_html(self, text: str) -> str: @@ -1988,16 +1749,12 @@ class MatrixAdapter(BasePlatformAdapter): md = _md.Markdown( extensions=["fenced_code", "tables", "nl2br", "sane_lists"], ) - # Remove the raw HTML preprocessor so