diff --git a/gateway/platforms/matrix.py b/gateway/platforms/matrix.py index 349f962d2e..75d7e9c9f6 100644 --- a/gateway/platforms/matrix.py +++ b/gateway/platforms/matrix.py @@ -104,7 +104,7 @@ MAX_MESSAGE_LENGTH = 4000 # Uses get_hermes_home() so each profile gets its own Matrix store. from hermes_constants import get_hermes_dir as _get_hermes_dir _STORE_DIR = _get_hermes_dir("platforms/matrix/store", "matrix/store") -_CRYPTO_PICKLE_PATH = _STORE_DIR / "crypto_store.pickle" +_CRYPTO_DB_PATH = _STORE_DIR / "crypto.db" # Grace period: ignore messages older than this many seconds before startup. _STARTUP_GRACE_SECONDS = 5 @@ -165,6 +165,33 @@ def check_matrix_requirements() -> bool: return True +class _CryptoStateStore: + """Adapter that satisfies the mautrix crypto StateStore interface. + + OlmMachine requires a StateStore with ``is_encrypted``, + ``get_encryption_info``, and ``find_shared_rooms``. The basic + ``MemoryStateStore`` from ``mautrix.client`` doesn't implement these, + so we provide simple implementations that consult the client's room + state. + """ + + def __init__(self, client_state_store: Any, joined_rooms: set): + self._ss = client_state_store + self._joined_rooms = joined_rooms + + async def is_encrypted(self, room_id: str) -> bool: + return (await self.get_encryption_info(room_id)) is not None + + async def get_encryption_info(self, room_id: str): + if hasattr(self._ss, "get_encryption_info"): + return await self._ss.get_encryption_info(room_id) + return None + + async def find_shared_rooms(self, user_id: str) -> list: + # Return all joined rooms — simple but correct for a single-user bot. + return list(self._joined_rooms) + + class MatrixAdapter(BasePlatformAdapter): """Gateway adapter for Matrix (any homeserver).""" @@ -199,6 +226,7 @@ class MatrixAdapter(BasePlatformAdapter): ) self._client: Any = None # mautrix.client.Client + self._crypto_db: Any = None # mautrix.util.async_db.Database self._sync_task: Optional[asyncio.Task] = None self._closing = False self._startup_ts: float = 0.0 @@ -252,6 +280,92 @@ class MatrixAdapter(BasePlatformAdapter): self._processed_events_set.add(event_id) return False + # ------------------------------------------------------------------ + # E2EE helpers + # ------------------------------------------------------------------ + + async def _verify_device_keys_on_server(self, client: Any, olm: Any) -> bool: + """Verify our device keys are on the homeserver after loading crypto state. + + Returns True if keys are valid or were successfully re-uploaded. + Returns False if verification fails (caller should refuse E2EE). + """ + try: + resp = await client.query_keys({client.mxid: [client.device_id]}) + except Exception as exc: + logger.error( + "Matrix: cannot verify device keys on server: %s — refusing E2EE", exc, + ) + return False + + # query_keys returns typed objects (QueryKeysResponse, DeviceKeys + # with KeyID keys). Normalise to plain strings for comparison. + device_keys_map = getattr(resp, "device_keys", {}) or {} + our_user_devices = device_keys_map.get(str(client.mxid)) or {} + our_keys = our_user_devices.get(str(client.device_id)) + + if not our_keys: + logger.warning("Matrix: device keys missing from server — re-uploading") + olm.account.shared = False + try: + await olm.share_keys() + except Exception as exc: + logger.error("Matrix: failed to re-upload device keys: %s", exc) + return False + return True + + # DeviceKeys.keys is a dict[KeyID, str]. Iterate to find the + # ed25519 key rather than constructing a KeyID for lookup. + server_ed25519 = None + keys_dict = getattr(our_keys, "keys", {}) or {} + for key_id, key_value in keys_dict.items(): + if str(key_id).startswith("ed25519:"): + server_ed25519 = str(key_value) + break + local_ed25519 = olm.account.identity_keys.get("ed25519") + + if server_ed25519 != local_ed25519: + if olm.account.shared: + # Restored account from DB but server has different keys — corrupted state. + logger.error( + "Matrix: server has different identity keys for device %s — " + "local crypto state is stale. Delete %s and restart.", + client.device_id, + _CRYPTO_DB_PATH, + ) + return False + + # Fresh account (never uploaded). Server has stale keys from a + # previous installation. Try to delete the old device and re-upload. + logger.warning( + "Matrix: server has stale keys for device %s — attempting re-upload", + client.device_id, + ) + try: + await client.api.request( + client.api.Method.DELETE + if hasattr(client.api, "Method") + else "DELETE", + f"/_matrix/client/v3/devices/{client.device_id}", + ) + logger.info("Matrix: deleted stale device %s from server", client.device_id) + except Exception: + # Device deletion often requires UIA or may simply not be + # permitted — that's fine, share_keys will try to overwrite. + pass + try: + await olm.share_keys() + except Exception as exc: + logger.error( + "Matrix: cannot upload device keys for %s: %s. " + "Try generating a new access token to get a fresh device.", + client.device_id, + exc, + ) + return False + + return True + # ------------------------------------------------------------------ # Required overrides # ------------------------------------------------------------------ @@ -350,54 +464,54 @@ class MatrixAdapter(BasePlatformAdapter): return False try: from mautrix.crypto import OlmMachine - from mautrix.crypto.store import MemoryCryptoStore + from mautrix.crypto.store.asyncpg import PgCryptoStore + from mautrix.util.async_db import Database + + _STORE_DIR.mkdir(parents=True, exist_ok=True) + + # Remove legacy pickle file from pre-SQLite era. + legacy_pickle = _STORE_DIR / "crypto_store.pickle" + if legacy_pickle.exists(): + logger.info("Matrix: removing legacy crypto_store.pickle (migrated to SQLite)") + legacy_pickle.unlink() + + # Open SQLite-backed crypto store. + crypto_db = Database.create( + f"sqlite:///{_CRYPTO_DB_PATH}", + upgrade_table=PgCryptoStore.upgrade_table, + ) + await crypto_db.start() + self._crypto_db = crypto_db - # account_id and pickle_key are required by mautrix ≥0.21. - # Use the Matrix user ID as account_id for stable identity. - # pickle_key secures in-memory serialisation; derive from - # the same user_id:device_id pair used for the on-disk HMAC. _acct_id = self._user_id or "hermes" - _pickle_key = f"{_acct_id}:{self._device_id}" - crypto_store = MemoryCryptoStore( + _pickle_key = f"{_acct_id}:{self._device_id or 'default'}" + crypto_store = PgCryptoStore( account_id=_acct_id, pickle_key=_pickle_key, + db=crypto_db, ) + await crypto_store.open() - # Restore persisted crypto state from a previous run. - # Uses HMAC to verify integrity before unpickling. - pickle_path = _CRYPTO_PICKLE_PATH - if pickle_path.exists(): - try: - import hashlib, hmac, pickle - raw = pickle_path.read_bytes() - # Format: 32-byte HMAC-SHA256 signature + pickle data. - if len(raw) > 32: - sig, payload = raw[:32], raw[32:] - # Key is derived from the device_id + user_id (stable per install). - hmac_key = f"{self._user_id}:{self._device_id}".encode() - expected = hmac.new(hmac_key, payload, hashlib.sha256).digest() - if hmac.compare_digest(sig, expected): - saved = pickle.loads(payload) # noqa: S301 - if isinstance(saved, MemoryCryptoStore): - crypto_store = saved - logger.info("Matrix: restored E2EE crypto store from %s", pickle_path) - else: - logger.warning("Matrix: crypto store HMAC mismatch — ignoring stale/tampered file") - except Exception as exc: - logger.warning("Matrix: could not restore crypto store: %s", exc) + crypto_state = _CryptoStateStore(state_store, self._joined_rooms) + olm = OlmMachine(client, crypto_store, crypto_state) - olm = OlmMachine(client, crypto_store, state_store) - - # Set trust policy: accept unverified devices so senders - # share Megolm session keys with us automatically. + # Accept unverified devices so senders share Megolm + # session keys with us automatically. olm.share_keys_min_trust = TrustState.UNVERIFIED olm.send_keys_min_trust = TrustState.UNVERIFIED await olm.load() + + # Verify our device keys are still on the homeserver. + if not await self._verify_device_keys_on_server(client, olm): + await crypto_db.stop() + await api.session.close() + return False + client.crypto = olm logger.info( "Matrix: E2EE enabled (store: %s%s)", - str(_STORE_DIR), + str(_CRYPTO_DB_PATH), f", device_id={client.device_id}" if client.device_id else "", ) except Exception as exc: @@ -438,6 +552,15 @@ class MatrixAdapter(BasePlatformAdapter): ) # Build DM room cache from m.direct account data. await self._refresh_dm_cache() + + # Dispatch events from the initial sync so the OlmMachine + # receives to-device key shares queued while we were offline. + try: + tasks = client.handle_sync(sync_data) + if tasks: + await asyncio.gather(*tasks) + except Exception as exc: + logger.warning("Matrix: initial sync event dispatch error: %s", exc) else: logger.warning("Matrix: initial sync returned unexpected type %s", type(sync_data).__name__) except Exception as exc: @@ -466,21 +589,12 @@ class MatrixAdapter(BasePlatformAdapter): except (asyncio.CancelledError, Exception): pass - # Persist E2EE crypto store before closing so the next restart - # can decrypt events using sessions from this run. - if self._client and self._encryption and getattr(self._client, "crypto", None): + # Close the SQLite crypto store database. + if hasattr(self, "_crypto_db") and self._crypto_db: try: - import hashlib, hmac, pickle - crypto_store = self._client.crypto.crypto_store - _STORE_DIR.mkdir(parents=True, exist_ok=True) - pickle_path = _CRYPTO_PICKLE_PATH - payload = pickle.dumps(crypto_store) - hmac_key = f"{self._user_id}:{self._device_id}".encode() - sig = hmac.new(hmac_key, payload, hashlib.sha256).digest() - pickle_path.write_bytes(sig + payload) - logger.info("Matrix: persisted E2EE crypto store to %s", pickle_path) + await self._crypto_db.stop() except Exception as exc: - logger.debug("Matrix: could not persist crypto store on disconnect: %s", exc) + logger.debug("Matrix: could not close crypto DB on disconnect: %s", exc) if self._client: try: @@ -853,13 +967,6 @@ class MatrixAdapter(BasePlatformAdapter): except Exception as exc: logger.warning("Matrix: sync event dispatch error: %s", exc) - # Share keys periodically if E2EE is enabled. - if self._encryption and getattr(client, "crypto", None): - try: - await client.crypto.share_keys() - except Exception as exc: - logger.warning("Matrix: E2EE key share failed: %s", exc) - # Retry any buffered undecrypted events. if self._pending_megolm: await self._retry_pending_decryptions() diff --git a/gateway/session.py b/gateway/session.py index 96013df513..a11ade898e 100644 --- a/gateway/session.py +++ b/gateway/session.py @@ -807,9 +807,9 @@ class SessionStore: to avoid resetting long-idle sessions that are harmless to resume. Returns the number of sessions that were suspended. """ - import time as _time + from datetime import timedelta - cutoff = _time.time() - max_age_seconds + cutoff = _now() - timedelta(seconds=max_age_seconds) count = 0 with self._lock: self._ensure_loaded_locked() diff --git a/pyproject.toml b/pyproject.toml index 28a4a300a7..95a1dfddd7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -43,7 +43,7 @@ dev = ["debugpy>=1.8.0,<2", "pytest>=9.0.2,<10", "pytest-asyncio>=1.3.0,<2", "py messaging = ["python-telegram-bot[webhooks]>=22.6,<23", "discord.py[voice]>=2.7.1,<3", "aiohttp>=3.13.3,<4", "slack-bolt>=1.18.0,<2", "slack-sdk>=3.27.0,<4"] cron = ["croniter>=6.0.0,<7"] slack = ["slack-bolt>=1.18.0,<2", "slack-sdk>=3.27.0,<4"] -matrix = ["mautrix[encryption]>=0.20,<1", "Markdown>=3.6,<4"] +matrix = ["mautrix[encryption]>=0.20,<1", "Markdown>=3.6,<4", "aiosqlite>=0.20", "asyncpg>=0.29"] cli = ["simple-term-menu>=1.0,<2"] tts-premium = ["elevenlabs>=1.0,<2"] voice = [ diff --git a/tests/gateway/test_matrix.py b/tests/gateway/test_matrix.py index 4bde50b638..d5db07c645 100644 --- a/tests/gateway/test_matrix.py +++ b/tests/gateway/test_matrix.py @@ -157,12 +157,44 @@ def _make_fake_mautrix(): mautrix_crypto_store = types.ModuleType("mautrix.crypto.store") class MemoryCryptoStore: - def __init__(self, account_id="", pickle_key=""): + def __init__(self, account_id="", pickle_key=""): # noqa: S301 self.account_id = account_id self.pickle_key = pickle_key mautrix_crypto_store.MemoryCryptoStore = MemoryCryptoStore + # --- mautrix.crypto.store.asyncpg --- + mautrix_crypto_store_asyncpg = types.ModuleType("mautrix.crypto.store.asyncpg") + + class PgCryptoStore: + upgrade_table = MagicMock() + + def __init__(self, account_id="", pickle_key="", db=None): # noqa: S301 + self.account_id = account_id + self.pickle_key = pickle_key + self.db = db + + async def open(self): + pass + + mautrix_crypto_store_asyncpg.PgCryptoStore = PgCryptoStore + + # --- mautrix.util --- + mautrix_util = types.ModuleType("mautrix.util") + + # --- mautrix.util.async_db --- + mautrix_util_async_db = types.ModuleType("mautrix.util.async_db") + + class Database: + @classmethod + def create(cls, url, upgrade_table=None): + db = MagicMock() + db.start = AsyncMock() + db.stop = AsyncMock() + return db + + mautrix_util_async_db.Database = Database + return { "mautrix": mautrix, "mautrix.api": mautrix_api, @@ -171,6 +203,9 @@ def _make_fake_mautrix(): "mautrix.client.state_store": mautrix_client_state_store, "mautrix.crypto": mautrix_crypto, "mautrix.crypto.store": mautrix_crypto_store, + "mautrix.crypto.store.asyncpg": mautrix_crypto_store_asyncpg, + "mautrix.util": mautrix_util, + "mautrix.util.async_db": mautrix_util_async_db, } @@ -740,6 +775,12 @@ class TestMatrixAccessTokenAuth: mock_client.whoami = AsyncMock(return_value=FakeWhoamiResponse("@bot:example.org", "DEV123")) mock_client.sync = AsyncMock(return_value={"rooms": {"join": {"!room:server": {}}}}) mock_client.add_event_handler = MagicMock() + mock_client.handle_sync = MagicMock(return_value=[]) + mock_client.query_keys = AsyncMock(return_value={ + "device_keys": {"@bot:example.org": {"DEV123": { + "keys": {"ed25519:DEV123": "fake_ed25519_key"}, + }}}, + }) mock_client.api = MagicMock() mock_client.api.token = "syt_test_access_token" mock_client.api.session = MagicMock() @@ -751,6 +792,8 @@ class TestMatrixAccessTokenAuth: mock_olm.share_keys = AsyncMock() mock_olm.share_keys_min_trust = None mock_olm.send_keys_min_trust = None + mock_olm.account = MagicMock() + mock_olm.account.identity_keys = {"ed25519": "fake_ed25519_key"} # Patch Client constructor to return our mock fake_mautrix_mods["mautrix.client"].Client = MagicMock(return_value=mock_client) @@ -924,6 +967,12 @@ class TestMatrixDeviceId: mock_client.whoami = AsyncMock(return_value=MagicMock(user_id="@bot:example.org", device_id="WHOAMI_DEV")) mock_client.sync = AsyncMock(return_value={"rooms": {"join": {"!room:server": {}}}}) mock_client.add_event_handler = MagicMock() + mock_client.handle_sync = MagicMock(return_value=[]) + mock_client.query_keys = AsyncMock(return_value={ + "device_keys": {"@bot:example.org": {"MY_STABLE_DEVICE": { + "keys": {"ed25519:MY_STABLE_DEVICE": "fake_ed25519_key"}, + }}}, + }) mock_client.api = MagicMock() mock_client.api.token = "syt_test_access_token" mock_client.api.session = MagicMock() @@ -934,6 +983,8 @@ class TestMatrixDeviceId: mock_olm.share_keys = AsyncMock() mock_olm.share_keys_min_trust = None mock_olm.send_keys_min_trust = None + mock_olm.account = MagicMock() + mock_olm.account.identity_keys = {"ed25519": "fake_ed25519_key"} fake_mautrix_mods["mautrix.client"].Client = MagicMock(return_value=mock_client) fake_mautrix_mods["mautrix.crypto"].OlmMachine = MagicMock(return_value=mock_olm) @@ -1030,8 +1081,8 @@ class TestMatrixDeviceIdConfig: class TestMatrixSyncLoop: @pytest.mark.asyncio - async def test_sync_loop_shares_keys_when_encryption_enabled(self): - """_sync_loop should call crypto.share_keys() after each sync.""" + async def test_sync_loop_dispatches_events_and_stores_token(self): + """_sync_loop should call handle_sync() and persist next_batch.""" adapter = _make_adapter() adapter._encryption = True adapter._closing = False @@ -1046,7 +1097,6 @@ class TestMatrixSyncLoop: return {"rooms": {"join": {"!room:example.org": {}}}, "next_batch": "s1234"} mock_crypto = MagicMock() - mock_crypto.share_keys = AsyncMock() mock_sync_store = MagicMock() mock_sync_store.get_next_batch = AsyncMock(return_value=None) @@ -1062,7 +1112,6 @@ class TestMatrixSyncLoop: await adapter._sync_loop() fake_client.sync.assert_awaited_once() - mock_crypto.share_keys.assert_awaited_once() fake_client.handle_sync.assert_called_once() mock_sync_store.put_next_batch.assert_awaited_once_with("s1234") @@ -1248,6 +1297,12 @@ class TestMatrixEncryptedEventHandler: mock_client.whoami = AsyncMock(return_value=MagicMock(user_id="@bot:example.org", device_id="DEV123")) mock_client.sync = AsyncMock(return_value={"rooms": {"join": {"!room:server": {}}}}) mock_client.add_event_handler = MagicMock() + mock_client.handle_sync = MagicMock(return_value=[]) + mock_client.query_keys = AsyncMock(return_value={ + "device_keys": {"@bot:example.org": {"DEV123": { + "keys": {"ed25519:DEV123": "fake_ed25519_key"}, + }}}, + }) mock_client.api = MagicMock() mock_client.api.token = "syt_test_token" mock_client.api.session = MagicMock() @@ -1258,6 +1313,8 @@ class TestMatrixEncryptedEventHandler: mock_olm.share_keys = AsyncMock() mock_olm.share_keys_min_trust = None mock_olm.send_keys_min_trust = None + mock_olm.account = MagicMock() + mock_olm.account.identity_keys = {"ed25519": "fake_ed25519_key"} fake_mautrix_mods["mautrix.client"].Client = MagicMock(return_value=mock_client) fake_mautrix_mods["mautrix.crypto"].OlmMachine = MagicMock(return_value=mock_olm) diff --git a/website/docs/user-guide/messaging/matrix.md b/website/docs/user-guide/messaging/matrix.md index 2c9bdb2291..ccde0740d6 100644 --- a/website/docs/user-guide/messaging/matrix.md +++ b/website/docs/user-guide/messaging/matrix.md @@ -344,9 +344,79 @@ pip install 'hermes-agent[matrix]' **Fix**: 1. Verify `libolm` is installed on your system (see the E2EE section above). 2. Make sure `MATRIX_ENCRYPTION=true` is set in your `.env`. -3. In your Matrix client (Element), go to the bot's profile → **Sessions** → verify/trust the bot's device. +3. In your Matrix client (Element), go to the bot's profile -> Sessions -> verify/trust the bot's device. 4. If the bot just joined an encrypted room, it can only decrypt messages sent *after* it joined. Older messages are inaccessible. +### Upgrading from a previous version with E2EE + +If you previously used Hermes with `MATRIX_ENCRYPTION=true` and are upgrading to +a version that uses the new SQLite-based crypto store, the bot's encryption +identity has changed. Your Matrix client (Element) may cache the old device keys +and refuse to share encryption sessions with the bot. + +**Symptoms**: The bot connects and shows "E2EE enabled" in the logs, but all +messages show "could not decrypt event" and the bot never responds. + +**What's happening**: The old encryption state (from the previous `matrix-nio` or +serialization-based `mautrix` backend) is incompatible with the new SQLite crypto +store. The bot creates a fresh encryption identity, but your Matrix client still +has the old keys cached and won't share the room's encryption session with a +device whose keys changed. This is a Matrix security feature -- clients treat +changed identity keys for the same device as suspicious. + +**Fix** (one-time migration): + +1. **Generate a new access token** to get a fresh device ID. The simplest way: + + ```bash + curl -X POST https://your-server/_matrix/client/v3/login \ + -H "Content-Type: application/json" \ + -d '{ + "type": "m.login.password", + "identifier": {"type": "m.id.user", "user": "@hermes:your-server.org"}, + "password": "your-password", + "initial_device_display_name": "Hermes Agent" + }' + ``` + + Copy the new `access_token` and update `MATRIX_ACCESS_TOKEN` in `~/.hermes/.env`. + +2. **Delete old encryption state**: + + ```bash + rm -f ~/.hermes/platforms/matrix/store/crypto.db + rm -f ~/.hermes/platforms/matrix/store/crypto_store.* + ``` + +3. **Force your Matrix client to rotate the encryption session**. In Element, + open the DM room with the bot and type `/discardsession`. This forces Element + to create a new encryption session and share it with the bot's new device. + +4. **Restart the gateway**: + + ```bash + hermes gateway run + ``` + +5. **Send a new message**. The bot should decrypt and respond normally. + +:::note +After migration, messages sent *before* the upgrade cannot be decrypted -- the old +encryption keys are gone. This only affects the transition; new messages work +normally. +::: + +:::tip +**New installations are not affected.** This migration is only needed if you had +a working E2EE setup with a previous version of Hermes and are upgrading. + +**Why a new access token?** Each Matrix access token is bound to a specific device +ID. Reusing the same device ID with new encryption keys causes other Matrix +clients to distrust the device (they see changed identity keys as a potential +security breach). A new access token gets a new device ID with no stale key +history, so other clients trust it immediately. +::: + ### Sync issues / bot falls behind **Cause**: Long-running tool executions can delay the sync loop, or the homeserver is slow.