diff --git a/gateway/platforms/matrix.py b/gateway/platforms/matrix.py index 053a5e619..6c1041cf2 100644 --- a/gateway/platforms/matrix.py +++ b/gateway/platforms/matrix.py @@ -1,8 +1,8 @@ """Matrix gateway adapter. Connects to any Matrix homeserver (self-hosted or matrix.org) via the -matrix-nio Python SDK. Supports optional end-to-end encryption (E2EE) -when installed with ``pip install "matrix-nio[e2e]"``. +mautrix Python SDK. Supports optional end-to-end encryption (E2EE) +when installed with ``pip install "mautrix[encryption]"``. Environment variables: MATRIX_HOMESERVER Homeserver URL (e.g. https://matrix.example.org) @@ -24,7 +24,6 @@ Environment variables: from __future__ import annotations import asyncio -import io import json import logging import mimetypes @@ -59,26 +58,22 @@ _STORE_DIR = _get_hermes_dir("platforms/matrix/store", "matrix/store") # Grace period: ignore messages older than this many seconds before startup. _STARTUP_GRACE_SECONDS = 5 -# E2EE key export file for persistence across restarts. -_KEY_EXPORT_FILE = _STORE_DIR / "exported_keys.txt" -_KEY_EXPORT_PASSPHRASE = "hermes-matrix-e2ee-keys" - # Pending undecrypted events: cap and TTL for retry buffer. _MAX_PENDING_EVENTS = 100 _PENDING_EVENT_TTL = 300 # seconds — stop retrying after 5 min _E2EE_INSTALL_HINT = ( - "Install with: pip install 'matrix-nio[e2e]' " + "Install with: pip install 'mautrix[encryption]' " "(requires libolm C library)" ) def _check_e2ee_deps() -> bool: - """Return True if matrix-nio E2EE dependencies (python-olm) are available.""" + """Return True if mautrix E2EE dependencies (python-olm) are available.""" try: - from nio.crypto import ENCRYPTION_ENABLED - return bool(ENCRYPTION_ENABLED) + from mautrix.crypto import OlmMachine # noqa: F401 + return True except (ImportError, AttributeError): return False @@ -96,11 +91,11 @@ def check_matrix_requirements() -> bool: logger.warning("Matrix: MATRIX_HOMESERVER not set") return False try: - import nio # noqa: F401 + import mautrix # noqa: F401 except ImportError: logger.warning( - "Matrix: matrix-nio not installed. " - "Run: pip install 'matrix-nio[e2e]'" + "Matrix: mautrix not installed. " + "Run: pip install 'mautrix[encryption]'" ) return False @@ -152,7 +147,7 @@ class MatrixAdapter(BasePlatformAdapter): or os.getenv("MATRIX_DEVICE_ID", "") ) - self._client: Any = None # nio.AsyncClient + self._client: Any = None # mautrix.client.Client self._sync_task: Optional[asyncio.Task] = None self._closing = False self._startup_ts: float = 0.0 @@ -167,7 +162,7 @@ class MatrixAdapter(BasePlatformAdapter): self._processed_events_set: set = set() # Buffer for undecrypted events pending key receipt. - # Each entry: (room, event, timestamp) + # Each entry: (room_id, event, timestamp) self._pending_megolm: list = [] # Thread participation tracking (for require_mention bypass) @@ -208,21 +203,86 @@ class MatrixAdapter(BasePlatformAdapter): async def connect(self) -> bool: """Connect to the Matrix homeserver and start syncing.""" - import nio + from mautrix.api import HTTPAPI + from mautrix.client import Client + from mautrix.client.state_store import MemoryStateStore, MemorySyncStore + from mautrix.types import EventType, UserID if not self._homeserver: logger.error("Matrix: homeserver URL not configured") return False - # Determine store path and ensure it exists. - store_path = str(_STORE_DIR) + # Ensure store dir exists for E2EE key persistence. _STORE_DIR.mkdir(parents=True, exist_ok=True) + # Create the HTTP API layer. + api = HTTPAPI( + base_url=self._homeserver, + token=self._access_token or "", + ) + # Create the client. - # When a stable device_id is configured, pass it to the constructor - # so matrix-nio binds to it from the start (important for E2EE - # crypto-store persistence across restarts). - ctor_device_id = self._device_id or None + state_store = MemoryStateStore() + sync_store = MemorySyncStore() + client = Client( + mxid=UserID(self._user_id) if self._user_id else UserID(""), + device_id=self._device_id or None, + api=api, + state_store=state_store, + sync_store=sync_store, + ) + + self._client = client + + # Authenticate. + if self._access_token: + api.token = self._access_token + + # Validate the token and learn user_id / device_id. + try: + resp = await client.whoami() + resolved_user_id = getattr(resp, "user_id", "") or self._user_id + resolved_device_id = getattr(resp, "device_id", "") + if resolved_user_id: + self._user_id = str(resolved_user_id) + client.mxid = UserID(self._user_id) + + # Prefer user-configured device_id for stable E2EE identity. + effective_device_id = self._device_id or resolved_device_id + if effective_device_id: + client.device_id = effective_device_id + + logger.info( + "Matrix: using access token for %s%s", + self._user_id or "(unknown user)", + f" (device {effective_device_id})" if effective_device_id else "", + ) + except Exception as exc: + logger.error( + "Matrix: whoami failed — check MATRIX_ACCESS_TOKEN and MATRIX_HOMESERVER: %s", + exc, + ) + return False + elif self._password and self._user_id: + try: + resp = await client.login( + identifier=self._user_id, + password=self._password, + device_name="Hermes Agent", + device_id=self._device_id or None, + ) + # login() stores the token automatically. + if resp and hasattr(resp, "device_id"): + client.device_id = resp.device_id + logger.info("Matrix: logged in as %s", self._user_id) + except Exception as exc: + logger.error("Matrix: login failed — %s", exc) + return False + else: + logger.error("Matrix: need MATRIX_ACCESS_TOKEN or MATRIX_USER_ID + MATRIX_PASSWORD") + return False + + # Set up E2EE if requested. if self._encryption: if not _check_e2ee_deps(): logger.error( @@ -232,16 +292,24 @@ class MatrixAdapter(BasePlatformAdapter): ) return False try: - client = nio.AsyncClient( - self._homeserver, - self._user_id or "", - device_id=ctor_device_id, - store_path=store_path, - ) + from mautrix.crypto import OlmMachine + from mautrix.crypto.store import MemoryCryptoStore + + crypto_store = MemoryCryptoStore() + olm = OlmMachine(client, crypto_store, state_store) + + # Set trust policy: accept unverified devices so senders + # share Megolm session keys with us automatically. + from mautrix.types import TrustState + olm.share_keys_min_trust = TrustState.UNVERIFIED + olm.send_keys_min_trust = TrustState.UNVERIFIED + + await olm.load() + client.crypto = olm logger.info( "Matrix: E2EE enabled (store: %s%s)", - store_path, - f", device_id={self._device_id}" if self._device_id else "", + str(_STORE_DIR), + f", device_id={client.device_id}" if client.device_id else "", ) except Exception as exc: logger.error( @@ -249,158 +317,43 @@ class MatrixAdapter(BasePlatformAdapter): exc, _E2EE_INSTALL_HINT, ) return False - else: - client = nio.AsyncClient( - self._homeserver, - self._user_id or "", - device_id=ctor_device_id, - ) - self._client = client + # Register event handlers. + from mautrix.client import InternalEventType as IntEvt - # Authenticate. - if self._access_token: - client.access_token = self._access_token + client.add_event_handler(EventType.ROOM_MESSAGE, self._on_room_message) + client.add_event_handler(EventType.REACTION, self._on_reaction) + client.add_event_handler(IntEvt.INVITE, self._on_invite) - # With access-token auth, always resolve whoami so we validate the - # token and learn the device_id. The device_id matters for E2EE: - # without it, matrix-nio can send plain messages but may fail to - # decrypt inbound encrypted events or encrypt outbound room sends. - resp = await client.whoami() - if isinstance(resp, nio.WhoamiResponse): - resolved_user_id = getattr(resp, "user_id", "") or self._user_id - resolved_device_id = getattr(resp, "device_id", "") - if resolved_user_id: - self._user_id = resolved_user_id - - # Prefer the user-configured device_id (MATRIX_DEVICE_ID) so - # the bot reuses a stable identity across restarts. Fall back - # to whatever whoami returned. - effective_device_id = self._device_id or resolved_device_id - - # restore_login() is the matrix-nio path that binds the access - # token to a specific device and loads the crypto store. - if effective_device_id and hasattr(client, "restore_login"): - client.restore_login( - self._user_id or resolved_user_id, - effective_device_id, - self._access_token, - ) - else: - if self._user_id: - client.user_id = self._user_id - if effective_device_id: - client.device_id = effective_device_id - client.access_token = self._access_token - if self._encryption: - logger.warning( - "Matrix: access-token login did not restore E2EE state; " - "encrypted rooms may fail until a device_id is available. " - "Set MATRIX_DEVICE_ID to a stable value." - ) - - logger.info( - "Matrix: using access token for %s%s", - self._user_id or "(unknown user)", - f" (device {effective_device_id})" if effective_device_id else "", - ) - else: - logger.error( - "Matrix: whoami failed — check MATRIX_ACCESS_TOKEN and MATRIX_HOMESERVER" - ) - await client.close() - return False - elif self._password and self._user_id: - resp = await client.login( - self._password, - device_name="Hermes Agent", - ) - if isinstance(resp, nio.LoginResponse): - logger.info("Matrix: logged in as %s", self._user_id) - else: - logger.error("Matrix: login failed — %s", getattr(resp, "message", resp)) - await client.close() - return False - else: - logger.error("Matrix: need MATRIX_ACCESS_TOKEN or MATRIX_USER_ID + MATRIX_PASSWORD") - await client.close() - return False - - # If E2EE is enabled, load the crypto store. - if self._encryption and getattr(client, "olm", None): - try: - if client.should_upload_keys: - await client.keys_upload() - logger.info("Matrix: E2EE crypto initialized") - except Exception as exc: - logger.warning("Matrix: crypto init issue: %s", exc) - - # Import previously exported Megolm keys (survives restarts). - if _KEY_EXPORT_FILE.exists(): - try: - await client.import_keys( - str(_KEY_EXPORT_FILE), _KEY_EXPORT_PASSPHRASE, - ) - logger.info("Matrix: imported Megolm keys from backup") - except Exception as exc: - logger.debug("Matrix: could not import keys: %s", exc) - elif self._encryption: - # E2EE was requested but the crypto store failed to load — - # this means encrypted rooms will silently not work. Hard-fail. - logger.error( - "Matrix: E2EE requested but crypto store is not loaded — " - "cannot decrypt or encrypt messages. %s", - _E2EE_INSTALL_HINT, - ) - await client.close() - return False - - # Register event callbacks. - client.add_event_callback(self._on_room_message, nio.RoomMessageText) - client.add_event_callback(self._on_room_message_media, nio.RoomMessageImage) - client.add_event_callback(self._on_room_message_media, nio.RoomMessageAudio) - client.add_event_callback(self._on_room_message_media, nio.RoomMessageVideo) - client.add_event_callback(self._on_room_message_media, nio.RoomMessageFile) - for encrypted_media_cls in ( - getattr(nio, "RoomEncryptedImage", None), - getattr(nio, "RoomEncryptedAudio", None), - getattr(nio, "RoomEncryptedVideo", None), - getattr(nio, "RoomEncryptedFile", None), - ): - if encrypted_media_cls is not None: - client.add_event_callback(self._on_room_message_media, encrypted_media_cls) - client.add_event_callback(self._on_invite, nio.InviteMemberEvent) - - # Reaction events (m.reaction). - if hasattr(nio, "ReactionEvent"): - client.add_event_callback(self._on_reaction, nio.ReactionEvent) - else: - # Older matrix-nio versions: use UnknownEvent fallback. - client.add_event_callback(self._on_unknown_event, nio.UnknownEvent) - - # If E2EE: handle encrypted events. - if self._encryption and hasattr(client, "olm"): - client.add_event_callback( - self._on_room_message, nio.MegolmEvent - ) + if self._encryption and getattr(client, "crypto", None): + client.add_event_handler(EventType.ROOM_ENCRYPTED, self._on_encrypted_event) # Initial sync to catch up, then start background sync. self._startup_ts = time.time() self._closing = False - # Do an initial sync to populate room state. - resp = await client.sync(timeout=10000, full_state=True) - if isinstance(resp, nio.SyncResponse): - self._joined_rooms = set(resp.rooms.join.keys()) - logger.info( - "Matrix: initial sync complete, joined %d rooms", - len(self._joined_rooms), - ) - # Build DM room cache from m.direct account data. - await self._refresh_dm_cache() - await self._run_e2ee_maintenance() - else: - logger.warning("Matrix: initial sync returned %s", type(resp).__name__) + try: + sync_data = await client.sync(timeout=10000, full_state=True) + if isinstance(sync_data, dict): + rooms_join = sync_data.get("rooms", {}).get("join", {}) + self._joined_rooms = set(rooms_join.keys()) + logger.info( + "Matrix: initial sync complete, joined %d rooms", + len(self._joined_rooms), + ) + # Build DM room cache from m.direct account data. + await self._refresh_dm_cache() + else: + logger.warning("Matrix: initial sync returned unexpected type %s", type(sync_data).__name__) + except Exception as exc: + logger.warning("Matrix: initial sync error: %s", exc) + + # Share keys after initial sync if E2EE is enabled. + if self._encryption and getattr(client, "crypto", None): + try: + await client.crypto.share_keys() + except Exception as exc: + logger.warning("Matrix: initial key share failed: %s", exc) # Start the sync loop. self._sync_task = asyncio.create_task(self._sync_loop()) @@ -418,20 +371,11 @@ class MatrixAdapter(BasePlatformAdapter): except (asyncio.CancelledError, Exception): pass - # Export Megolm keys before closing so the next restart can decrypt - # events that used sessions from this run. - if self._client and self._encryption and getattr(self._client, "olm", None): - try: - _STORE_DIR.mkdir(parents=True, exist_ok=True) - await self._client.export_keys( - str(_KEY_EXPORT_FILE), _KEY_EXPORT_PASSPHRASE, - ) - logger.info("Matrix: exported Megolm keys for next restart") - except Exception as exc: - logger.debug("Matrix: could not export keys on disconnect: %s", exc) - if self._client: - await self._client.close() + try: + await self._client.api.session.close() + except Exception: + pass self._client = None logger.info("Matrix: disconnected") @@ -444,7 +388,7 @@ class MatrixAdapter(BasePlatformAdapter): metadata: Optional[Dict[str, Any]] = None, ) -> SendResult: """Send a message to a Matrix room.""" - import nio + from mautrix.types import EventType, RoomID if not content: return SendResult(success=True) @@ -482,52 +426,38 @@ class MatrixAdapter(BasePlatformAdapter): relates_to["m.in_reply_to"] = {"event_id": reply_to} msg_content["m.relates_to"] = relates_to - async def _room_send_once(*, ignore_unverified_devices: bool = False): - return await asyncio.wait_for( - self._client.room_send( - chat_id, - "m.room.message", + try: + event_id = await asyncio.wait_for( + self._client.send_message_event( + RoomID(chat_id), + EventType.ROOM_MESSAGE, msg_content, - ignore_unverified_devices=ignore_unverified_devices, ), timeout=45, ) - - try: - resp = await _room_send_once(ignore_unverified_devices=False) - except Exception as exc: - retryable = isinstance(exc, asyncio.TimeoutError) - olm_unverified = getattr(nio, "OlmUnverifiedDeviceError", None) - send_retry = getattr(nio, "SendRetryError", None) - if isinstance(olm_unverified, type) and isinstance(exc, olm_unverified): - retryable = True - if isinstance(send_retry, type) and isinstance(exc, send_retry): - retryable = True - - if not retryable: - logger.error("Matrix: failed to send to %s: %s", chat_id, exc) - return SendResult(success=False, error=str(exc)) - - logger.warning( - "Matrix: initial encrypted send to %s failed (%s); " - "retrying after E2EE maintenance with ignored unverified devices", - chat_id, - exc, - ) - await self._run_e2ee_maintenance() - try: - resp = await _room_send_once(ignore_unverified_devices=True) - except Exception as retry_exc: - logger.error("Matrix: failed to send to %s after retry: %s", chat_id, retry_exc) - return SendResult(success=False, error=str(retry_exc)) - - if isinstance(resp, nio.RoomSendResponse): - last_event_id = resp.event_id + last_event_id = str(event_id) logger.info("Matrix: sent event %s to %s", last_event_id, chat_id) - else: - err = getattr(resp, "message", str(resp)) - logger.error("Matrix: failed to send to %s: %s", chat_id, err) - return SendResult(success=False, error=err) + except Exception as exc: + # On E2EE errors, retry after sharing keys. + if self._encryption and getattr(self._client, "crypto", None): + try: + await self._client.crypto.share_keys() + event_id = await asyncio.wait_for( + self._client.send_message_event( + RoomID(chat_id), + EventType.ROOM_MESSAGE, + msg_content, + ), + timeout=45, + ) + last_event_id = str(event_id) + logger.info("Matrix: sent event %s to %s (after key share)", last_event_id, chat_id) + continue + except Exception as retry_exc: + logger.error("Matrix: failed to send to %s after retry: %s", chat_id, retry_exc) + return SendResult(success=False, error=str(retry_exc)) + logger.error("Matrix: failed to send to %s: %s", chat_id, exc) + return SendResult(success=False, error=str(exc)) return SendResult(success=True, message_id=last_event_id) @@ -537,14 +467,32 @@ class MatrixAdapter(BasePlatformAdapter): chat_type = "group" if self._client: - room = self._client.rooms.get(chat_id) - if room: - name = room.display_name or room.canonical_alias or chat_id - # Use DM cache. - if self._dm_rooms.get(chat_id, False): - chat_type = "dm" - elif room.member_count == 2: - chat_type = "dm" + # Try state store for member count. + state_store = getattr(self._client, "state_store", None) + if state_store: + try: + members = await state_store.get_members( + chat_id, + ) + if members and len(members) == 2: + chat_type = "dm" + except Exception: + pass + + # Use DM cache. + if self._dm_rooms.get(chat_id, False): + chat_type = "dm" + + # Try to get room name from state. + try: + from mautrix.types import EventType as ET, RoomID + name_evt = await self._client.get_state_event( + RoomID(chat_id), ET.ROOM_NAME, + ) + if name_evt and hasattr(name_evt, "name") and name_evt.name: + name = name_evt.name + except Exception: + pass return {"name": name, "type": chat_type} @@ -558,7 +506,8 @@ class MatrixAdapter(BasePlatformAdapter): """Send a typing indicator.""" if self._client: try: - await self._client.room_typing(chat_id, typing_state=True, timeout=30000) + from mautrix.types import RoomID + await self._client.set_typing(RoomID(chat_id), timeout=30000) except Exception: pass @@ -566,7 +515,7 @@ class MatrixAdapter(BasePlatformAdapter): self, chat_id: str, message_id: str, content: str ) -> SendResult: """Edit an existing message (via m.replace).""" - import nio + from mautrix.types import EventType, RoomID formatted = self.format_message(content) msg_content: Dict[str, Any] = { @@ -589,10 +538,13 @@ class MatrixAdapter(BasePlatformAdapter): msg_content["format"] = "org.matrix.custom.html" msg_content["formatted_body"] = f"* {html}" - resp = await self._client.room_send(chat_id, "m.room.message", msg_content) - if isinstance(resp, nio.RoomSendResponse): - return SendResult(success=True, message_id=resp.event_id) - return SendResult(success=False, error=getattr(resp, "message", str(resp))) + try: + event_id = await self._client.send_message_event( + RoomID(chat_id), EventType.ROOM_MESSAGE, msg_content, + ) + return SendResult(success=True, message_id=str(event_id)) + except Exception as exc: + return SendResult(success=False, error=str(exc)) async def send_image( self, @@ -665,7 +617,7 @@ class MatrixAdapter(BasePlatformAdapter): ) -> SendResult: """Upload an audio file as a voice message (MSC3245 native voice).""" return await self._send_local_file( - chat_id, audio_path, "m.audio", caption, reply_to, + chat_id, audio_path, "m.audio", caption, reply_to, metadata=metadata, is_voice=True ) @@ -703,29 +655,24 @@ class MatrixAdapter(BasePlatformAdapter): is_voice: bool = False, ) -> SendResult: """Upload bytes to Matrix and send as a media message.""" - import nio + from mautrix.types import EventType, RoomID # Upload to homeserver. - # nio expects a DataProvider (callable) or file-like object, not raw bytes. - # nio.upload() returns a tuple (UploadResponse|UploadError, Optional[Dict]) - resp, maybe_encryption_info = await self._client.upload( - io.BytesIO(data), - content_type=content_type, - filename=filename, - filesize=len(data), - ) - if not isinstance(resp, nio.UploadResponse): - err = getattr(resp, "message", str(resp)) - logger.error("Matrix: upload failed: %s", err) - return SendResult(success=False, error=err) - - mxc_url = resp.content_uri + try: + mxc_url = await self._client.upload_media( + data, + mime_type=content_type, + filename=filename, + ) + except Exception as exc: + logger.error("Matrix: upload failed: %s", exc) + return SendResult(success=False, error=str(exc)) # Build media message content. msg_content: Dict[str, Any] = { "msgtype": msgtype, "body": caption or filename, - "url": mxc_url, + "url": str(mxc_url), "info": { "mimetype": content_type, "size": len(data), @@ -749,10 +696,13 @@ class MatrixAdapter(BasePlatformAdapter): relates_to["is_falling_back"] = True msg_content["m.relates_to"] = relates_to - resp2 = await self._client.room_send(room_id, "m.room.message", msg_content) - if isinstance(resp2, nio.RoomSendResponse): - return SendResult(success=True, message_id=resp2.event_id) - return SendResult(success=False, error=getattr(resp2, "message", str(resp2))) + try: + event_id = await self._client.send_message_event( + RoomID(room_id), EventType.ROOM_MESSAGE, msg_content, + ) + return SendResult(success=True, message_id=str(event_id)) + except Exception as exc: + return SendResult(success=False, error=str(exc)) async def _send_local_file( self, @@ -784,37 +734,32 @@ class MatrixAdapter(BasePlatformAdapter): async def _sync_loop(self) -> None: """Continuously sync with the homeserver.""" - import nio - while not self._closing: try: - resp = await self._client.sync(timeout=30000) - if isinstance(resp, nio.SyncError): - if self._closing: - return - err_msg = str(getattr(resp, "message", resp)).lower() - if "m_unknown_token" in err_msg or "m_forbidden" in err_msg or "401" in err_msg: - logger.error( - "Matrix: permanent auth error from sync: %s — stopping sync", - getattr(resp, "message", resp), - ) - return - logger.warning( - "Matrix: sync returned %s: %s — retrying in 5s", - type(resp).__name__, - getattr(resp, "message", resp), - ) - await asyncio.sleep(5) - continue + sync_data = await self._client.sync(timeout=30000) + if isinstance(sync_data, dict): + # Update joined rooms from sync response. + rooms_join = sync_data.get("rooms", {}).get("join", {}) + if rooms_join: + self._joined_rooms.update(rooms_join.keys()) + + # Share keys periodically if E2EE is enabled. + if self._encryption and getattr(self._client, "crypto", None): + try: + await self._client.crypto.share_keys() + except Exception as exc: + logger.warning("Matrix: E2EE key share failed: %s", exc) + + # Retry any buffered undecrypted events. + if self._pending_megolm: + await self._retry_pending_decryptions() - await self._run_e2ee_maintenance() except asyncio.CancelledError: return except Exception as exc: if self._closing: return - # Detect permanent auth/permission failures that will never - # succeed on retry — stop syncing instead of looping forever. + # Detect permanent auth/permission failures. err_str = str(exc).lower() if "401" in err_str or "403" in err_str or "unauthorized" in err_str or "forbidden" in err_str: logger.error("Matrix: permanent auth error: %s — stopping sync", exc) @@ -822,98 +767,19 @@ class MatrixAdapter(BasePlatformAdapter): logger.warning("Matrix: sync error: %s — retrying in 5s", exc) await asyncio.sleep(5) - async def _run_e2ee_maintenance(self) -> None: - """Run matrix-nio E2EE housekeeping between syncs. - - Hermes uses a custom sync loop instead of matrix-nio's sync_forever(), - so we need to explicitly drive the key management work that sync_forever() - normally handles for encrypted rooms. - - Also auto-trusts all devices (so senders share session keys with us) - and retries decryption for any buffered MegolmEvents. - """ - client = self._client - if not client or not self._encryption or not getattr(client, "olm", None): - return - - did_query_keys = client.should_query_keys - - tasks = [asyncio.create_task(client.send_to_device_messages())] - - if client.should_upload_keys: - tasks.append(asyncio.create_task(client.keys_upload())) - - if did_query_keys: - tasks.append(asyncio.create_task(client.keys_query())) - - if client.should_claim_keys: - users = client.get_users_for_key_claiming() - if users: - tasks.append(asyncio.create_task(client.keys_claim(users))) - - for task in asyncio.as_completed(tasks): - try: - await task - except asyncio.CancelledError: - raise - except Exception as exc: - logger.warning("Matrix: E2EE maintenance task failed: %s", exc) - - # After key queries, auto-trust all devices so senders share keys with - # us. For a bot this is the right default — we want to decrypt - # everything, not enforce manual verification. - if did_query_keys: - self._auto_trust_devices() - - # Retry any buffered undecrypted events now that new keys may have - # arrived (from key requests, key queries, or to-device forwarding). - if self._pending_megolm: - await self._retry_pending_decryptions() - - def _auto_trust_devices(self) -> None: - """Trust/verify all unverified devices we know about. - - When other clients see our device as verified, they proactively share - Megolm session keys with us. Without this, many clients will refuse - to include an unverified device in key distributions. - """ - client = self._client - if not client: - return - - device_store = getattr(client, "device_store", None) - if not device_store: - return - - own_device = getattr(client, "device_id", None) - trusted_count = 0 - - try: - # DeviceStore.__iter__ yields OlmDevice objects directly. - for device in device_store: - if getattr(device, "device_id", None) == own_device: - continue - if not getattr(device, "verified", False): - client.verify_device(device) - trusted_count += 1 - except Exception as exc: - logger.debug("Matrix: auto-trust error: %s", exc) - - if trusted_count: - logger.info("Matrix: auto-trusted %d new device(s)", trusted_count) - async def _retry_pending_decryptions(self) -> None: - """Retry decrypting buffered MegolmEvents after new keys arrive.""" - import nio - + """Retry decrypting buffered encrypted events after new keys arrive.""" client = self._client if not client or not self._pending_megolm: return + crypto = getattr(client, "crypto", None) + if not crypto: + return now = time.time() still_pending: list = [] - for room, event, ts in self._pending_megolm: + for room_id, event, ts in self._pending_megolm: # Drop events that have aged past the TTL. if now - ts > _PENDING_EVENT_TTL: logger.debug( @@ -923,39 +789,23 @@ class MatrixAdapter(BasePlatformAdapter): continue try: - decrypted = client.decrypt_event(event) + decrypted = await crypto.decrypt_megolm_event(event) except Exception: - # Still missing the key — keep in buffer. - still_pending.append((room, event, ts)) + still_pending.append((room_id, event, ts)) continue - if isinstance(decrypted, nio.MegolmEvent): - # decrypt_event returned the same undecryptable event. - still_pending.append((room, event, ts)) + if decrypted is None or decrypted is event: + still_pending.append((room_id, event, ts)) continue logger.info( - "Matrix: decrypted buffered event %s (%s)", + "Matrix: decrypted buffered event %s", getattr(event, "event_id", "?"), - type(decrypted).__name__, ) - # Route to the appropriate handler based on decrypted type. + # Route to the appropriate handler. try: - if isinstance(decrypted, nio.RoomMessageText): - await self._on_room_message(room, decrypted) - elif isinstance( - decrypted, - (nio.RoomMessageImage, nio.RoomMessageAudio, - nio.RoomMessageVideo, nio.RoomMessageFile), - ): - await self._on_room_message_media(room, decrypted) - else: - logger.debug( - "Matrix: decrypted event %s has unhandled type %s", - getattr(event, "event_id", "?"), - type(decrypted).__name__, - ) + await self._on_room_message(decrypted) except Exception as exc: logger.warning( "Matrix: error processing decrypted event %s: %s", @@ -968,62 +818,78 @@ class MatrixAdapter(BasePlatformAdapter): # Event callbacks # ------------------------------------------------------------------ - async def _on_room_message(self, room: Any, event: Any) -> None: - """Handle incoming text messages (and decrypted megolm events).""" - import nio + async def _on_room_message(self, event: Any) -> None: + """Handle incoming room message events (text, media).""" + room_id = str(getattr(event, "room_id", "")) + sender = str(getattr(event, "sender", "")) # Ignore own messages. - if event.sender == self._user_id: + if sender == self._user_id: return - # Deduplicate by event ID (nio can fire the same event more than once). - if self._is_duplicate_event(getattr(event, "event_id", None)): + # Deduplicate by event ID. + event_id = str(getattr(event, "event_id", "")) + if self._is_duplicate_event(event_id): return # Startup grace: ignore old messages from initial sync. - event_ts = getattr(event, "server_timestamp", 0) / 1000.0 + event_ts = getattr(event, "timestamp", 0) / 1000.0 if getattr(event, "timestamp", 0) else 0 + # Also check server_timestamp for compatibility. + if not event_ts: + event_ts = getattr(event, "server_timestamp", 0) / 1000.0 if getattr(event, "server_timestamp", 0) else 0 if event_ts and event_ts < self._startup_ts - _STARTUP_GRACE_SECONDS: return - # Handle undecryptable MegolmEvents: request the missing session key - # and buffer the event for retry once the key arrives. - if isinstance(event, nio.MegolmEvent): - logger.warning( - "Matrix: could not decrypt event %s in %s — requesting key", - event.event_id, room.room_id, - ) - - # Ask other devices in the room to forward the session key. - try: - resp = await self._client.request_room_key(event) - if hasattr(resp, "event_id") or not isinstance(resp, Exception): - logger.debug( - "Matrix: room key request sent for session %s", - getattr(event, "session_id", "?"), - ) - except Exception as exc: - logger.debug("Matrix: room key request failed: %s", exc) - - # Buffer for retry on next maintenance cycle. - self._pending_megolm.append((room, event, time.time())) - if len(self._pending_megolm) > _MAX_PENDING_EVENTS: - self._pending_megolm = self._pending_megolm[-_MAX_PENDING_EVENTS:] + # Extract content from the event. + content = getattr(event, "content", None) + if content is None: return - # Skip edits (m.replace relation). - source_content = getattr(event, "source", {}).get("content", {}) + # Get msgtype — either from content object or raw dict. + if hasattr(content, "msgtype"): + msgtype = str(content.msgtype) + elif isinstance(content, dict): + msgtype = content.get("msgtype", "") + else: + msgtype = "" + + # Determine source content dict for relation/thread extraction. + if isinstance(content, dict): + source_content = content + elif hasattr(content, "serialize"): + source_content = content.serialize() + else: + source_content = {} + relates_to = source_content.get("m.relates_to", {}) + + # Skip edits (m.replace relation). if relates_to.get("rel_type") == "m.replace": return - body = getattr(event, "body", "") or "" + # Dispatch by msgtype. + media_msgtypes = ("m.image", "m.audio", "m.video", "m.file") + if msgtype in media_msgtypes: + await self._handle_media_message(room_id, sender, event_id, event_ts, source_content, relates_to, msgtype) + elif msgtype in ("m.text", "m.notice"): + await self._handle_text_message(room_id, sender, event_id, event_ts, source_content, relates_to) + + async def _handle_text_message( + self, + room_id: str, + sender: str, + event_id: str, + event_ts: float, + source_content: dict, + relates_to: dict, + ) -> None: + """Process a text message event.""" + body = source_content.get("body", "") or "" if not body: return # Determine chat type. - is_dm = self._dm_rooms.get(room.room_id, False) - if not is_dm and room.member_count == 2: - is_dm = True + is_dm = await self._is_dm_room(room_id) chat_type = "dm" if is_dm else "group" # Thread support. @@ -1036,7 +902,7 @@ class MatrixAdapter(BasePlatformAdapter): free_rooms_raw = os.getenv("MATRIX_FREE_RESPONSE_ROOMS", "") free_rooms = {r.strip() for r in free_rooms_raw.split(",") if r.strip()} require_mention = os.getenv("MATRIX_REQUIRE_MENTION", "true").lower() not in ("false", "0", "no") - is_free_room = room.room_id in free_rooms + is_free_room = room_id in free_rooms in_bot_thread = bool(thread_id and thread_id in self._bot_participated_threads) formatted_body = source_content.get("formatted_body") @@ -1044,22 +910,22 @@ class MatrixAdapter(BasePlatformAdapter): if not self._is_bot_mentioned(body, formatted_body): return - # DM mention-thread: when enabled, @mentioning bot in a DM creates a thread. + # DM mention-thread. if is_dm and not thread_id: dm_mention_threads = os.getenv("MATRIX_DM_MENTION_THREADS", "false").lower() in ("true", "1", "yes") if dm_mention_threads and self._is_bot_mentioned(body, source_content.get("formatted_body")): - thread_id = event.event_id + thread_id = event_id self._track_thread(thread_id) - # Strip mention from body when present (including in DMs). + # Strip mention from body. if self._is_bot_mentioned(body, source_content.get("formatted_body")): body = self._strip_mention(body) - # Auto-thread: create a thread for non-DM, non-threaded messages. + # Auto-thread. if not is_dm and not thread_id: auto_thread = os.getenv("MATRIX_AUTO_THREAD", "true").lower() in ("true", "1", "yes") if auto_thread: - thread_id = event.event_id + thread_id = event_id self._track_thread(thread_id) # Reply-to detection. @@ -1068,7 +934,7 @@ class MatrixAdapter(BasePlatformAdapter): if in_reply_to: reply_to = in_reply_to.get("event_id") - # Strip reply fallback from body (Matrix prepends "> ..." lines). + # Strip reply fallback from body. if reply_to and body.startswith("> "): lines = body.split("\n") stripped = [] @@ -1089,11 +955,12 @@ class MatrixAdapter(BasePlatformAdapter): if body.startswith(("!", "/")): msg_type = MessageType.COMMAND + display_name = await self._get_display_name(room_id, sender) source = self.build_source( - chat_id=room.room_id, + chat_id=room_id, chat_type=chat_type, - user_id=event.sender, - user_name=self._get_display_name(room, event.sender), + user_id=sender, + user_name=display_name, thread_id=thread_id, ) @@ -1101,218 +968,105 @@ class MatrixAdapter(BasePlatformAdapter): text=body, message_type=msg_type, source=source, - raw_message=getattr(event, "source", {}), - message_id=event.event_id, + raw_message=source_content, + message_id=event_id, reply_to_message_id=reply_to, ) if thread_id: self._track_thread(thread_id) - # Acknowledge receipt so the room shows as read (fire-and-forget). - self._background_read_receipt(room.room_id, event.event_id) + # Acknowledge receipt (fire-and-forget). + self._background_read_receipt(room_id, event_id) - # Only batch plain text messages — commands dispatch immediately. + # Batch plain text messages — commands dispatch immediately. if msg_type == MessageType.TEXT and self._text_batch_delay_seconds > 0: self._enqueue_text_event(msg_event) else: await self.handle_message(msg_event) - # ------------------------------------------------------------------ - # Text message aggregation (handles Matrix client-side splits) - # ------------------------------------------------------------------ - - def _text_batch_key(self, event: MessageEvent) -> str: - """Session-scoped key for text message batching.""" - from gateway.session import build_session_key - return build_session_key( - event.source, - group_sessions_per_user=self.config.extra.get("group_sessions_per_user", True), - thread_sessions_per_user=self.config.extra.get("thread_sessions_per_user", False), - ) - - def _enqueue_text_event(self, event: MessageEvent) -> None: - """Buffer a text event and reset the flush timer. - - When a Matrix client splits a long message, the chunks arrive within - a few hundred milliseconds. This merges them into a single event - before dispatching. - """ - key = self._text_batch_key(event) - existing = self._pending_text_batches.get(key) - chunk_len = len(event.text or "") - if existing is None: - event._last_chunk_len = chunk_len # type: ignore[attr-defined] - self._pending_text_batches[key] = event - else: - if event.text: - existing.text = f"{existing.text}\n{event.text}" if existing.text else event.text - existing._last_chunk_len = chunk_len # type: ignore[attr-defined] - # Merge any media that might be attached - if event.media_urls: - existing.media_urls.extend(event.media_urls) - existing.media_types.extend(event.media_types) - - # Cancel any pending flush and restart the timer - prior_task = self._pending_text_batch_tasks.get(key) - if prior_task and not prior_task.done(): - prior_task.cancel() - self._pending_text_batch_tasks[key] = asyncio.create_task( - self._flush_text_batch(key) - ) - - async def _flush_text_batch(self, key: str) -> None: - """Wait for the quiet period then dispatch the aggregated text. - - Uses a longer delay when the latest chunk is near Matrix's ~4000-char - split point, since a continuation chunk is almost certain. - """ - current_task = asyncio.current_task() - try: - pending = self._pending_text_batches.get(key) - last_len = getattr(pending, "_last_chunk_len", 0) if pending else 0 - if last_len >= self._SPLIT_THRESHOLD: - delay = self._text_batch_split_delay_seconds - else: - delay = self._text_batch_delay_seconds - await asyncio.sleep(delay) - event = self._pending_text_batches.pop(key, None) - if not event: - return - logger.info( - "[Matrix] Flushing text batch %s (%d chars)", - key, len(event.text or ""), - ) - await self.handle_message(event) - finally: - if self._pending_text_batch_tasks.get(key) is current_task: - self._pending_text_batch_tasks.pop(key, None) - - async def _on_room_message_media(self, room: Any, event: Any) -> None: - """Handle incoming media messages (images, audio, video, files).""" - import nio - - # Ignore own messages. - if event.sender == self._user_id: - return - - # Deduplicate by event ID. - if self._is_duplicate_event(getattr(event, "event_id", None)): - return - - # Startup grace. - event_ts = getattr(event, "server_timestamp", 0) / 1000.0 - if event_ts and event_ts < self._startup_ts - _STARTUP_GRACE_SECONDS: - return - - body = getattr(event, "body", "") or "" - url = getattr(event, "url", "") + async def _handle_media_message( + self, + room_id: str, + sender: str, + event_id: str, + event_ts: float, + source_content: dict, + relates_to: dict, + msgtype: str, + ) -> None: + """Process a media message event (image, audio, video, file).""" + body = source_content.get("body", "") or "" + url = source_content.get("url", "") # Convert mxc:// to HTTP URL for downstream processing. http_url = "" if url and url.startswith("mxc://"): http_url = self._mxc_to_http(url) - # Determine message type from event class. - # Use the MIME type from the event's content info when available, - # falling back to category-level MIME types for downstream matching - # (gateway/run.py checks startswith("image/"), startswith("audio/"), etc.) - source_content = getattr(event, "source", {}).get("content", {}) - if not isinstance(source_content, dict): - source_content = {} - event_content = getattr(event, "content", {}) - if not isinstance(event_content, dict): - event_content = {} - content_info = event_content.get("info") if isinstance(event_content, dict) else {} - if not isinstance(content_info, dict) or not content_info: - content_info = source_content.get("info", {}) if isinstance(source_content, dict) else {} - event_mimetype = ( - (content_info.get("mimetype") if isinstance(content_info, dict) else None) - or getattr(event, "mimetype", "") - or "" - ) - # For encrypted media, the URL may be in file.url instead of event.url. - file_content = source_content.get("file", {}) if isinstance(source_content, dict) else {} + # Extract MIME type from content info. + content_info = source_content.get("info", {}) + if not isinstance(content_info, dict): + content_info = {} + event_mimetype = content_info.get("mimetype", "") + + # For encrypted media, the URL may be in file.url. + file_content = source_content.get("file", {}) if not url and isinstance(file_content, dict): url = file_content.get("url", "") or "" if url and url.startswith("mxc://"): http_url = self._mxc_to_http(url) + is_encrypted_media = bool(file_content and isinstance(file_content, dict) and file_content.get("url")) + media_type = "application/octet-stream" msg_type = MessageType.DOCUMENT - - # Safely resolve encrypted media classes — they may not exist on older - # nio versions, and in test environments nio may be mocked (MagicMock - # auto-attributes are not valid types for isinstance). - def _safe_isinstance(obj, cls_name): - cls = getattr(nio, cls_name, None) - if cls is None or not isinstance(cls, type): - return False - return isinstance(obj, cls) - - is_encrypted_image = _safe_isinstance(event, "RoomEncryptedImage") - is_encrypted_audio = _safe_isinstance(event, "RoomEncryptedAudio") - is_encrypted_video = _safe_isinstance(event, "RoomEncryptedVideo") - is_encrypted_file = _safe_isinstance(event, "RoomEncryptedFile") - is_encrypted_media = any((is_encrypted_image, is_encrypted_audio, is_encrypted_video, is_encrypted_file)) is_voice_message = False - if isinstance(event, nio.RoomMessageImage) or is_encrypted_image: + if msgtype == "m.image": msg_type = MessageType.PHOTO media_type = event_mimetype or "image/png" - elif isinstance(event, nio.RoomMessageAudio) or is_encrypted_audio: + elif msgtype == "m.audio": if source_content.get("org.matrix.msc3245.voice") is not None: is_voice_message = True msg_type = MessageType.VOICE else: msg_type = MessageType.AUDIO media_type = event_mimetype or "audio/ogg" - elif isinstance(event, nio.RoomMessageVideo) or is_encrypted_video: + elif msgtype == "m.video": msg_type = MessageType.VIDEO media_type = event_mimetype or "video/mp4" elif event_mimetype: media_type = event_mimetype - # Cache media locally when downstream tools need a real file path: - # - photos (vision tools can't access MXC URLs) - # - voice messages (transcription tools need local files) - # - any encrypted media (HTTP fallback would point at ciphertext) + # Cache media locally when downstream tools need a real file path. cached_path = None should_cache_locally = ( msg_type == MessageType.PHOTO or is_voice_message or is_encrypted_media ) if should_cache_locally and url: try: - if is_voice_message: - download_resp = await self._client.download(mxc=url) - else: - download_resp = await self._client.download(url) - file_bytes = getattr(download_resp, "body", None) + from mautrix.types import ContentURI + file_bytes = await self._client.download_media(ContentURI(url)) if file_bytes is not None: if is_encrypted_media: - from nio.crypto.attachments import decrypt_attachment + from mautrix.crypto.attachments import decrypt_attachment - hashes_value = getattr(event, "hashes", None) - if hashes_value is None and isinstance(file_content, dict): - hashes_value = file_content.get("hashes") + hashes_value = file_content.get("hashes") if isinstance(file_content, dict) else None hash_value = hashes_value.get("sha256") if isinstance(hashes_value, dict) else None - key_value = getattr(event, "key", None) - if key_value is None and isinstance(file_content, dict): - key_value = file_content.get("key") + key_value = file_content.get("key") if isinstance(file_content, dict) else None if isinstance(key_value, dict): key_value = key_value.get("k") - iv_value = getattr(event, "iv", None) - if iv_value is None and isinstance(file_content, dict): - iv_value = file_content.get("iv") + iv_value = file_content.get("iv") if isinstance(file_content, dict) else None if key_value and hash_value and iv_value: file_bytes = decrypt_attachment(file_bytes, key_value, hash_value, iv_value) else: logger.warning( "[Matrix] Encrypted media event missing decryption metadata for %s", - event.event_id, + event_id, ) file_bytes = None @@ -1344,13 +1098,10 @@ class MatrixAdapter(BasePlatformAdapter): except Exception as e: logger.warning("[Matrix] Failed to cache media: %s", e) - is_dm = self._dm_rooms.get(room.room_id, False) - if not is_dm and room.member_count == 2: - is_dm = True + is_dm = await self._is_dm_room(room_id) chat_type = "dm" if is_dm else "group" # Thread/reply detection. - relates_to = source_content.get("m.relates_to", {}) thread_id = None if relates_to.get("rel_type") == "m.thread": thread_id = relates_to.get("event_id") @@ -1360,7 +1111,7 @@ class MatrixAdapter(BasePlatformAdapter): free_rooms_raw = os.getenv("MATRIX_FREE_RESPONSE_ROOMS", "") free_rooms = {r.strip() for r in free_rooms_raw.split(",") if r.strip()} require_mention = os.getenv("MATRIX_REQUIRE_MENTION", "true").lower() not in ("false", "0", "no") - is_free_room = room.room_id in free_rooms + is_free_room = room_id in free_rooms in_bot_thread = bool(thread_id and thread_id in self._bot_participated_threads) if require_mention and not is_free_room and not in_bot_thread: @@ -1368,29 +1119,30 @@ class MatrixAdapter(BasePlatformAdapter): if not self._is_bot_mentioned(body, formatted_body): return - # DM mention-thread: when enabled, @mentioning bot in a DM creates a thread. + # DM mention-thread. if is_dm and not thread_id: dm_mention_threads = os.getenv("MATRIX_DM_MENTION_THREADS", "false").lower() in ("true", "1", "yes") if dm_mention_threads and self._is_bot_mentioned(body, source_content.get("formatted_body")): - thread_id = event.event_id + thread_id = event_id self._track_thread(thread_id) - # Strip mention from body when present (including in DMs). + # Strip mention from body. if self._is_bot_mentioned(body, source_content.get("formatted_body")): body = self._strip_mention(body) - # Auto-thread: create a thread for non-DM, non-threaded messages. + # Auto-thread. if not is_dm and not thread_id: auto_thread = os.getenv("MATRIX_AUTO_THREAD", "true").lower() in ("true", "1", "yes") if auto_thread: - thread_id = event.event_id + thread_id = event_id self._track_thread(thread_id) + display_name = await self._get_display_name(room_id, sender) source = self.build_source( - chat_id=room.room_id, + chat_id=room_id, chat_type=chat_type, - user_id=event.sender, - user_name=self._get_display_name(room, event.sender), + user_id=sender, + user_name=display_name, thread_id=thread_id, ) @@ -1402,8 +1154,8 @@ class MatrixAdapter(BasePlatformAdapter): text=body, message_type=msg_type, source=source, - raw_message=getattr(event, "source", {}), - message_id=event.event_id, + raw_message=source_content, + message_id=event_id, media_urls=media_urls, media_types=media_types, ) @@ -1411,43 +1163,44 @@ class MatrixAdapter(BasePlatformAdapter): if thread_id: self._track_thread(thread_id) - # Acknowledge receipt so the room shows as read (fire-and-forget). - self._background_read_receipt(room.room_id, event.event_id) + self._background_read_receipt(room_id, event_id) await self.handle_message(msg_event) - async def _on_invite(self, room: Any, event: Any) -> None: + async def _on_encrypted_event(self, event: Any) -> None: + """Handle encrypted events that could not be auto-decrypted.""" + room_id = str(getattr(event, "room_id", "")) + event_id = str(getattr(event, "event_id", "")) + + if self._is_duplicate_event(event_id): + return + + logger.warning( + "Matrix: could not decrypt event %s in %s — buffering for retry", + event_id, room_id, + ) + + self._pending_megolm.append((room_id, event, time.time())) + if len(self._pending_megolm) > _MAX_PENDING_EVENTS: + self._pending_megolm = self._pending_megolm[-_MAX_PENDING_EVENTS:] + + async def _on_invite(self, event: Any) -> None: """Auto-join rooms when invited.""" - import nio + from mautrix.types import RoomID - if not isinstance(event, nio.InviteMemberEvent): - return - - # Only process invites directed at us. - if event.state_key != self._user_id: - return - - if event.membership != "invite": - return + room_id = str(getattr(event, "room_id", "")) logger.info( - "Matrix: invited to %s by %s — joining", - room.room_id, event.sender, + "Matrix: invited to %s — joining", + room_id, ) try: - resp = await self._client.join(room.room_id) - if isinstance(resp, nio.JoinResponse): - self._joined_rooms.add(room.room_id) - logger.info("Matrix: joined %s", room.room_id) - # Refresh DM cache since new room may be a DM. - await self._refresh_dm_cache() - else: - logger.warning( - "Matrix: failed to join %s: %s", - room.room_id, getattr(resp, "message", resp), - ) + await self._client.join_room(RoomID(room_id)) + self._joined_rooms.add(room_id) + logger.info("Matrix: joined %s", room_id) + await self._refresh_dm_cache() except Exception as exc: - logger.warning("Matrix: error joining %s: %s", room.room_id, exc) + logger.warning("Matrix: error joining %s: %s", room_id, exc) # ------------------------------------------------------------------ # Reactions (send, receive, processing lifecycle) @@ -1459,7 +1212,7 @@ class MatrixAdapter(BasePlatformAdapter): """Send an emoji reaction to a message in a room. Returns the reaction event_id on success, None on failure. """ - import nio + from mautrix.types import EventType, RoomID if not self._client: return None @@ -1471,15 +1224,11 @@ class MatrixAdapter(BasePlatformAdapter): } } try: - resp = await self._client.room_send( - room_id, "m.reaction", content, - ignore_unverified_devices=True, + resp_event_id = await self._client.send_message_event( + RoomID(room_id), EventType.REACTION, content, ) - if isinstance(resp, nio.RoomSendResponse): - logger.debug("Matrix: sent reaction %s to %s", emoji, event_id) - return resp.event_id - logger.debug("Matrix: reaction send failed: %s", resp) - return None + logger.debug("Matrix: sent reaction %s to %s", emoji, event_id) + return str(resp_event_id) except Exception as exc: logger.debug("Matrix: reaction send error: %s", exc) return None @@ -1513,7 +1262,6 @@ class MatrixAdapter(BasePlatformAdapter): return if outcome == ProcessingOutcome.CANCELLED: return - # Remove the eyes reaction first, if we tracked its event_id. reaction_key = (room_id, msg_id) if reaction_key in self._pending_reactions: eyes_event_id = self._pending_reactions.pop(reaction_key) @@ -1525,42 +1273,91 @@ class MatrixAdapter(BasePlatformAdapter): "\u2705" if outcome == ProcessingOutcome.SUCCESS else "\u274c", ) - async def _on_reaction(self, room: Any, event: Any) -> None: + async def _on_reaction(self, event: Any) -> None: """Handle incoming reaction events.""" - if event.sender == self._user_id: + sender = str(getattr(event, "sender", "")) + if sender == self._user_id: return - if self._is_duplicate_event(getattr(event, "event_id", None)): + event_id = str(getattr(event, "event_id", "")) + if self._is_duplicate_event(event_id): return - # Log for now; future: trigger agent actions based on emoji. - reacts_to = getattr(event, "reacts_to", "") - key = getattr(event, "key", "") - logger.info( - "Matrix: reaction %s from %s on %s in %s", - key, event.sender, reacts_to, room.room_id, + + room_id = str(getattr(event, "room_id", "")) + content = getattr(event, "content", None) + if content: + relates_to = content.get("m.relates_to", {}) if isinstance(content, dict) else getattr(content, "relates_to", {}) + reacts_to = "" + key = "" + if isinstance(relates_to, dict): + reacts_to = relates_to.get("event_id", "") + key = relates_to.get("key", "") + elif hasattr(relates_to, "event_id"): + reacts_to = str(getattr(relates_to, "event_id", "")) + key = str(getattr(relates_to, "key", "")) + logger.info( + "Matrix: reaction %s from %s on %s in %s", + key, sender, reacts_to, room_id, + ) + + # ------------------------------------------------------------------ + # Text message aggregation (handles Matrix client-side splits) + # ------------------------------------------------------------------ + + def _text_batch_key(self, event: MessageEvent) -> str: + """Session-scoped key for text message batching.""" + from gateway.session import build_session_key + return build_session_key( + event.source, + group_sessions_per_user=self.config.extra.get("group_sessions_per_user", True), + thread_sessions_per_user=self.config.extra.get("thread_sessions_per_user", False), ) - async def _on_unknown_event(self, room: Any, event: Any) -> None: - """Fallback handler for events not natively parsed by matrix-nio. + def _enqueue_text_event(self, event: MessageEvent) -> None: + """Buffer a text event and reset the flush timer.""" + key = self._text_batch_key(event) + existing = self._pending_text_batches.get(key) + chunk_len = len(event.text or "") + if existing is None: + event._last_chunk_len = chunk_len # type: ignore[attr-defined] + self._pending_text_batches[key] = event + else: + if event.text: + existing.text = f"{existing.text}\n{event.text}" if existing.text else event.text + existing._last_chunk_len = chunk_len # type: ignore[attr-defined] + if event.media_urls: + existing.media_urls.extend(event.media_urls) + existing.media_types.extend(event.media_types) - Catches m.reaction on older nio versions that lack ReactionEvent. - """ - source = getattr(event, "source", {}) - if source.get("type") != "m.reaction": - return - content = source.get("content", {}) - relates_to = content.get("m.relates_to", {}) - if relates_to.get("rel_type") != "m.annotation": - return - if source.get("sender") == self._user_id: - return - logger.info( - "Matrix: reaction %s from %s on %s in %s", - relates_to.get("key", "?"), - source.get("sender", "?"), - relates_to.get("event_id", "?"), - room.room_id, + prior_task = self._pending_text_batch_tasks.get(key) + if prior_task and not prior_task.done(): + prior_task.cancel() + self._pending_text_batch_tasks[key] = asyncio.create_task( + self._flush_text_batch(key) ) + async def _flush_text_batch(self, key: str) -> None: + """Wait for the quiet period then dispatch the aggregated text.""" + current_task = asyncio.current_task() + try: + pending = self._pending_text_batches.get(key) + last_len = getattr(pending, "_last_chunk_len", 0) if pending else 0 + if last_len >= self._SPLIT_THRESHOLD: + delay = self._text_batch_split_delay_seconds + else: + delay = self._text_batch_delay_seconds + await asyncio.sleep(delay) + event = self._pending_text_batches.pop(key, None) + if not event: + return + logger.info( + "[Matrix] Flushing text batch %s (%d chars)", + key, len(event.text or ""), + ) + await self.handle_message(event) + finally: + if self._pending_text_batch_tasks.get(key) is current_task: + self._pending_text_batch_tasks.pop(key, None) + # ------------------------------------------------------------------ # Read receipts # ------------------------------------------------------------------ @@ -1575,25 +1372,16 @@ class MatrixAdapter(BasePlatformAdapter): asyncio.ensure_future(_send()) async def send_read_receipt(self, room_id: str, event_id: str) -> bool: - """Send a read receipt (m.read) for an event. - - Also sets the fully-read marker so the room is marked as read - in all clients. - """ + """Send a read receipt (m.read) for an event.""" if not self._client: return False try: - if hasattr(self._client, "room_read_markers"): - await self._client.room_read_markers( - room_id, - fully_read_event=event_id, - read_event=event_id, - ) - else: - # Fallback for older matrix-nio. - await self._client.room_send( - room_id, "m.receipt", {"event_id": event_id}, - ) + from mautrix.types import EventID, RoomID + await self._client.set_read_markers( + RoomID(room_id), + fully_read_event=EventID(event_id), + read_receipt=EventID(event_id), + ) logger.debug("Matrix: sent read receipt for %s in %s", event_id, room_id) return True except Exception as exc: @@ -1608,19 +1396,15 @@ class MatrixAdapter(BasePlatformAdapter): self, room_id: str, event_id: str, reason: str = "", ) -> bool: """Redact (delete) a message or event from a room.""" - import nio - if not self._client: return False try: - resp = await self._client.room_redact( - room_id, event_id, reason=reason, + from mautrix.types import EventID, RoomID + await self._client.redact( + RoomID(room_id), EventID(event_id), reason=reason or None, ) - if isinstance(resp, nio.RoomRedactResponse): - logger.info("Matrix: redacted %s in %s", event_id, room_id) - return True - logger.warning("Matrix: redact failed: %s", resp) - return False + logger.info("Matrix: redacted %s in %s", event_id, room_id) + return True except Exception as exc: logger.warning("Matrix: redact error: %s", exc) return False @@ -1635,40 +1419,39 @@ class MatrixAdapter(BasePlatformAdapter): limit: int = 50, start: str = "", ) -> list: - """Fetch recent messages from a room. - - Returns a list of dicts with keys: event_id, sender, body, - timestamp, type. Uses the ``room_messages()`` API. - """ - import nio - + """Fetch recent messages from a room.""" if not self._client: return [] try: - resp = await self._client.room_messages( - room_id, - start=start or "", + from mautrix.types import PaginationDirection, RoomID, SyncToken + resp = await self._client.get_messages( + RoomID(room_id), + direction=PaginationDirection.BACKWARD, + from_token=SyncToken(start) if start else None, limit=limit, - direction=nio.Api.MessageDirection.back - if hasattr(nio.Api, "MessageDirection") - else "b", ) except Exception as exc: - logger.warning("Matrix: room_messages failed for %s: %s", room_id, exc) + logger.warning("Matrix: get_messages failed for %s: %s", room_id, exc) return [] - if not isinstance(resp, nio.RoomMessagesResponse): - logger.warning("Matrix: room_messages returned %s", type(resp).__name__) + if not resp: return [] + events = getattr(resp, "chunk", []) or (resp.get("chunk", []) if isinstance(resp, dict) else []) messages = [] - for event in reversed(resp.chunk): - body = getattr(event, "body", "") or "" + for event in reversed(events): + body = "" + content = getattr(event, "content", None) + if content: + if hasattr(content, "body"): + body = content.body or "" + elif isinstance(content, dict): + body = content.get("body", "") messages.append({ - "event_id": getattr(event, "event_id", ""), - "sender": getattr(event, "sender", ""), + "event_id": str(getattr(event, "event_id", "")), + "sender": str(getattr(event, "sender", "")), "body": body, - "timestamp": getattr(event, "server_timestamp", 0), + "timestamp": getattr(event, "timestamp", 0) or getattr(event, "server_timestamp", 0), "type": type(event).__name__, }) return messages @@ -1685,56 +1468,41 @@ class MatrixAdapter(BasePlatformAdapter): is_direct: bool = False, preset: str = "private_chat", ) -> Optional[str]: - """Create a new Matrix room. - - Args: - name: Human-readable room name. - topic: Room topic. - invite: List of user IDs to invite. - is_direct: Mark as a DM room. - preset: One of private_chat, public_chat, trusted_private_chat. - - Returns the room_id on success, None on failure. - """ - import nio - + """Create a new Matrix room.""" if not self._client: return None try: - resp = await self._client.room_create( + from mautrix.types import RoomCreatePreset, UserID + preset_enum = { + "private_chat": RoomCreatePreset.PRIVATE, + "public_chat": RoomCreatePreset.PUBLIC, + "trusted_private_chat": RoomCreatePreset.TRUSTED_PRIVATE, + }.get(preset, RoomCreatePreset.PRIVATE) + invitees = [UserID(u) for u in (invite or [])] + room_id = await self._client.create_room( name=name or None, topic=topic or None, - invite=invite or [], + invitees=invitees, is_direct=is_direct, - preset=getattr( - nio.Api.RoomPreset if hasattr(nio.Api, "RoomPreset") else type("", (), {}), - preset, None, - ) or preset, + preset=preset_enum, ) - if isinstance(resp, nio.RoomCreateResponse): - room_id = resp.room_id - self._joined_rooms.add(room_id) - logger.info("Matrix: created room %s (%s)", room_id, name or "unnamed") - return room_id - logger.warning("Matrix: room_create failed: %s", resp) - return None + room_id_str = str(room_id) + self._joined_rooms.add(room_id_str) + logger.info("Matrix: created room %s (%s)", room_id_str, name or "unnamed") + return room_id_str except Exception as exc: - logger.warning("Matrix: room_create error: %s", exc) + logger.warning("Matrix: create_room error: %s", exc) return None async def invite_user(self, room_id: str, user_id: str) -> bool: """Invite a user to a room.""" - import nio - if not self._client: return False try: - resp = await self._client.room_invite(room_id, user_id) - if isinstance(resp, nio.RoomInviteResponse): - logger.info("Matrix: invited %s to %s", user_id, room_id) - return True - logger.warning("Matrix: invite failed: %s", resp) - return False + from mautrix.types import RoomID, UserID + await self._client.invite_user(RoomID(room_id), UserID(user_id)) + logger.info("Matrix: invited %s to %s", user_id, room_id) + return True except Exception as exc: logger.warning("Matrix: invite error: %s", exc) return False @@ -1753,13 +1521,21 @@ class MatrixAdapter(BasePlatformAdapter): logger.warning("Matrix: invalid presence state %r", state) return False try: - if hasattr(self._client, "set_presence"): - await self._client.set_presence(state, status_msg=status_msg or None) - logger.debug("Matrix: presence set to %s", state) - return True + from mautrix.types import PresenceState + presence_map = { + "online": PresenceState.ONLINE, + "offline": PresenceState.OFFLINE, + "unavailable": PresenceState.UNAVAILABLE, + } + await self._client.set_presence( + presence=presence_map[state], + status=status_msg or None, + ) + logger.debug("Matrix: presence set to %s", state) + return True except Exception as exc: logger.debug("Matrix: set_presence failed: %s", exc) - return False + return False # ------------------------------------------------------------------ # Emote & notice message types @@ -1769,7 +1545,7 @@ class MatrixAdapter(BasePlatformAdapter): self, chat_id: str, text: str, metadata: Optional[Dict[str, Any]] = None, ) -> SendResult: """Send an emote message (/me style action).""" - import nio + from mautrix.types import EventType, RoomID if not self._client or not text: return SendResult(success=False, error="No client or empty text") @@ -1784,13 +1560,10 @@ class MatrixAdapter(BasePlatformAdapter): msg_content["formatted_body"] = html try: - resp = await self._client.room_send( - chat_id, "m.room.message", msg_content, - ignore_unverified_devices=True, + event_id = await self._client.send_message_event( + RoomID(chat_id), EventType.ROOM_MESSAGE, msg_content, ) - if isinstance(resp, nio.RoomSendResponse): - return SendResult(success=True, message_id=resp.event_id) - return SendResult(success=False, error=str(resp)) + return SendResult(success=True, message_id=str(event_id)) except Exception as exc: return SendResult(success=False, error=str(exc)) @@ -1798,7 +1571,7 @@ class MatrixAdapter(BasePlatformAdapter): self, chat_id: str, text: str, metadata: Optional[Dict[str, Any]] = None, ) -> SendResult: """Send a notice message (bot-appropriate, non-alerting).""" - import nio + from mautrix.types import EventType, RoomID if not self._client or not text: return SendResult(success=False, error="No client or empty text") @@ -1813,13 +1586,10 @@ class MatrixAdapter(BasePlatformAdapter): msg_content["formatted_body"] = html try: - resp = await self._client.room_send( - chat_id, "m.room.message", msg_content, - ignore_unverified_devices=True, + event_id = await self._client.send_message_event( + RoomID(chat_id), EventType.ROOM_MESSAGE, msg_content, ) - if isinstance(resp, nio.RoomSendResponse): - return SendResult(success=True, message_id=resp.event_id) - return SendResult(success=False, error=str(resp)) + return SendResult(success=True, message_id=str(event_id)) except Exception as exc: return SendResult(success=False, error=str(exc)) @@ -1827,18 +1597,28 @@ class MatrixAdapter(BasePlatformAdapter): # Helpers # ------------------------------------------------------------------ - async def _refresh_dm_cache(self) -> None: - """Refresh the DM room cache from m.direct account data. + async def _is_dm_room(self, room_id: str) -> bool: + """Check if a room is a DM.""" + if self._dm_rooms.get(room_id, False): + return True + # Fallback: check member count via state store. + state_store = getattr(self._client, "state_store", None) if self._client else None + if state_store: + try: + members = await state_store.get_members(room_id) + if members and len(members) == 2: + return True + except Exception: + pass + return False - Tries the account_data API first, then falls back to parsing - the sync response's account_data for robustness. - """ + async def _refresh_dm_cache(self) -> None: + """Refresh the DM room cache from m.direct account data.""" if not self._client: return dm_data: Optional[Dict] = None - # Primary: try the dedicated account data endpoint. try: resp = await self._client.get_account_data("m.direct") if hasattr(resp, "content"): @@ -1846,21 +1626,7 @@ class MatrixAdapter(BasePlatformAdapter): elif isinstance(resp, dict): dm_data = resp except Exception as exc: - logger.debug("Matrix: get_account_data('m.direct') failed: %s — trying sync fallback", exc) - - # Fallback: parse from the client's account_data store (populated by sync). - if dm_data is None: - try: - # matrix-nio stores account data events on the client object - ad = getattr(self._client, "account_data", None) - if ad and isinstance(ad, dict) and "m.direct" in ad: - event = ad["m.direct"] - if hasattr(event, "content"): - dm_data = event.content - elif isinstance(event, dict): - dm_data = event - except Exception: - pass + logger.debug("Matrix: get_account_data('m.direct') failed: %s", exc) if dm_data is None: return @@ -1868,7 +1634,7 @@ class MatrixAdapter(BasePlatformAdapter): dm_room_ids: Set[str] = set() for user_id, rooms in dm_data.items(): if isinstance(rooms, list): - dm_room_ids.update(rooms) + dm_room_ids.update(str(r) for r in rooms) self._dm_rooms = { rid: (rid in dm_room_ids) @@ -1925,15 +1691,12 @@ class MatrixAdapter(BasePlatformAdapter): """Return True if the bot is mentioned in the message.""" if not body and not formatted_body: return False - # Check for full @user:server in body if self._user_id and self._user_id in body: return True - # Check for localpart with word boundaries (case-insensitive) if self._user_id and ":" in self._user_id: localpart = self._user_id.split(":")[0].lstrip("@") if localpart and re.search(r'\b' + re.escape(localpart) + r'\b', body, re.IGNORECASE): return True - # Check formatted_body for Matrix pill if formatted_body and self._user_id: if f"matrix.to/#/{self._user_id}" in formatted_body: return True @@ -1941,22 +1704,24 @@ class MatrixAdapter(BasePlatformAdapter): def _strip_mention(self, body: str) -> str: """Remove bot mention from message body.""" - # Remove full @user:server if self._user_id: body = body.replace(self._user_id, "") - # If still contains localpart mention, remove it if self._user_id and ":" in self._user_id: localpart = self._user_id.split(":")[0].lstrip("@") if localpart: body = re.sub(r'\b' + re.escape(localpart) + r'\b', '', body, flags=re.IGNORECASE) return body.strip() - def _get_display_name(self, room: Any, user_id: str) -> str: + async def _get_display_name(self, room_id: str, user_id: str) -> str: """Get a user's display name in a room, falling back to user_id.""" - if room and hasattr(room, "users"): - user = room.users.get(user_id) - if user and getattr(user, "display_name", None): - return user.display_name + state_store = getattr(self._client, "state_store", None) if self._client else None + if state_store: + try: + member = await state_store.get_member(room_id, user_id) + if member and getattr(member, "displayname", None): + return member.displayname + except Exception: + pass # Strip the @...:server format to just the localpart. if user_id.startswith("@") and ":" in user_id: return user_id[1:].split(":")[0] @@ -1964,13 +1729,9 @@ class MatrixAdapter(BasePlatformAdapter): def _mxc_to_http(self, mxc_url: str) -> str: """Convert mxc://server/media_id to an HTTP download URL.""" - # mxc://matrix.org/abc123 → https://matrix.org/_matrix/client/v1/media/download/matrix.org/abc123 - # Uses the authenticated client endpoint (spec v1.11+) instead of the - # deprecated /_matrix/media/v3/download/ path. if not mxc_url.startswith("mxc://"): return mxc_url parts = mxc_url[6:] # strip mxc:// - # Use our homeserver for download (federation handles the rest). return f"{self._homeserver}/_matrix/client/v1/media/download/{parts}" def _markdown_to_html(self, text: str) -> str: @@ -1988,16 +1749,12 @@ class MatrixAdapter(BasePlatformAdapter): md = _md.Markdown( extensions=["fenced_code", "tables", "nl2br", "sane_lists"], ) - # Remove the raw HTML preprocessor so