fix(matrix): replace pickle crypto store with SQLite, fix E2EE decryption (#7981)

Fixes #7952 — Matrix E2EE completely broken after mautrix migration.

- Replace MemoryCryptoStore + pickle/HMAC persistence with mautrix's
  PgCryptoStore backed by SQLite via aiosqlite. Crypto state now
  persists reliably across restarts without fragile serialization.

- Add handle_sync() call on initial sync response so to-device events
  (queued Megolm key shares) are dispatched to OlmMachine instead of
  being silently dropped.

- Add _verify_device_keys_on_server() after loading crypto state.
  Detects missing keys (re-uploads), stale keys from migration
  (attempts re-upload), and corrupted state (refuses E2EE).

- Add _CryptoStateStore adapter wrapping MemoryStateStore to satisfy
  mautrix crypto's StateStore interface (is_encrypted,
  get_encryption_info, find_shared_rooms).

- Remove redundant share_keys() call from sync loop — OlmMachine
  already handles this via DEVICE_OTK_COUNT event handler.

- Fix datetime vs float TypeError in session.py suspend_recently_active()
  that crashed gateway startup.

- Add aiosqlite and asyncpg to [matrix] extra in pyproject.toml.

- Update test mocks for PgCryptoStore/Database and add query_keys mock
  for key verification. 174 tests pass.

- Add E2EE upgrade/migration docs to Matrix user guide.
This commit is contained in:
Siddharth Balyan 2026-04-11 18:54:46 -07:00 committed by GitHub
parent 27eeea0555
commit 50d86b3c71
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 298 additions and 64 deletions

View file

@ -104,7 +104,7 @@ MAX_MESSAGE_LENGTH = 4000
# Uses get_hermes_home() so each profile gets its own Matrix store.
from hermes_constants import get_hermes_dir as _get_hermes_dir
_STORE_DIR = _get_hermes_dir("platforms/matrix/store", "matrix/store")
_CRYPTO_PICKLE_PATH = _STORE_DIR / "crypto_store.pickle"
_CRYPTO_DB_PATH = _STORE_DIR / "crypto.db"
# Grace period: ignore messages older than this many seconds before startup.
_STARTUP_GRACE_SECONDS = 5
@ -165,6 +165,33 @@ def check_matrix_requirements() -> bool:
return True
class _CryptoStateStore:
"""Adapter that satisfies the mautrix crypto StateStore interface.
OlmMachine requires a StateStore with ``is_encrypted``,
``get_encryption_info``, and ``find_shared_rooms``. The basic
``MemoryStateStore`` from ``mautrix.client`` doesn't implement these,
so we provide simple implementations that consult the client's room
state.
"""
def __init__(self, client_state_store: Any, joined_rooms: set):
self._ss = client_state_store
self._joined_rooms = joined_rooms
async def is_encrypted(self, room_id: str) -> bool:
return (await self.get_encryption_info(room_id)) is not None
async def get_encryption_info(self, room_id: str):
if hasattr(self._ss, "get_encryption_info"):
return await self._ss.get_encryption_info(room_id)
return None
async def find_shared_rooms(self, user_id: str) -> list:
# Return all joined rooms — simple but correct for a single-user bot.
return list(self._joined_rooms)
class MatrixAdapter(BasePlatformAdapter):
"""Gateway adapter for Matrix (any homeserver)."""
@ -199,6 +226,7 @@ class MatrixAdapter(BasePlatformAdapter):
)
self._client: Any = None # mautrix.client.Client
self._crypto_db: Any = None # mautrix.util.async_db.Database
self._sync_task: Optional[asyncio.Task] = None
self._closing = False
self._startup_ts: float = 0.0
@ -252,6 +280,92 @@ class MatrixAdapter(BasePlatformAdapter):
self._processed_events_set.add(event_id)
return False
# ------------------------------------------------------------------
# E2EE helpers
# ------------------------------------------------------------------
async def _verify_device_keys_on_server(self, client: Any, olm: Any) -> bool:
"""Verify our device keys are on the homeserver after loading crypto state.
Returns True if keys are valid or were successfully re-uploaded.
Returns False if verification fails (caller should refuse E2EE).
"""
try:
resp = await client.query_keys({client.mxid: [client.device_id]})
except Exception as exc:
logger.error(
"Matrix: cannot verify device keys on server: %s — refusing E2EE", exc,
)
return False
# query_keys returns typed objects (QueryKeysResponse, DeviceKeys
# with KeyID keys). Normalise to plain strings for comparison.
device_keys_map = getattr(resp, "device_keys", {}) or {}
our_user_devices = device_keys_map.get(str(client.mxid)) or {}
our_keys = our_user_devices.get(str(client.device_id))
if not our_keys:
logger.warning("Matrix: device keys missing from server — re-uploading")
olm.account.shared = False
try:
await olm.share_keys()
except Exception as exc:
logger.error("Matrix: failed to re-upload device keys: %s", exc)
return False
return True
# DeviceKeys.keys is a dict[KeyID, str]. Iterate to find the
# ed25519 key rather than constructing a KeyID for lookup.
server_ed25519 = None
keys_dict = getattr(our_keys, "keys", {}) or {}
for key_id, key_value in keys_dict.items():
if str(key_id).startswith("ed25519:"):
server_ed25519 = str(key_value)
break
local_ed25519 = olm.account.identity_keys.get("ed25519")
if server_ed25519 != local_ed25519:
if olm.account.shared:
# Restored account from DB but server has different keys — corrupted state.
logger.error(
"Matrix: server has different identity keys for device %s"
"local crypto state is stale. Delete %s and restart.",
client.device_id,
_CRYPTO_DB_PATH,
)
return False
# Fresh account (never uploaded). Server has stale keys from a
# previous installation. Try to delete the old device and re-upload.
logger.warning(
"Matrix: server has stale keys for device %s — attempting re-upload",
client.device_id,
)
try:
await client.api.request(
client.api.Method.DELETE
if hasattr(client.api, "Method")
else "DELETE",
f"/_matrix/client/v3/devices/{client.device_id}",
)
logger.info("Matrix: deleted stale device %s from server", client.device_id)
except Exception:
# Device deletion often requires UIA or may simply not be
# permitted — that's fine, share_keys will try to overwrite.
pass
try:
await olm.share_keys()
except Exception as exc:
logger.error(
"Matrix: cannot upload device keys for %s: %s. "
"Try generating a new access token to get a fresh device.",
client.device_id,
exc,
)
return False
return True
# ------------------------------------------------------------------
# Required overrides
# ------------------------------------------------------------------
@ -350,54 +464,54 @@ class MatrixAdapter(BasePlatformAdapter):
return False
try:
from mautrix.crypto import OlmMachine
from mautrix.crypto.store import MemoryCryptoStore
from mautrix.crypto.store.asyncpg import PgCryptoStore
from mautrix.util.async_db import Database
_STORE_DIR.mkdir(parents=True, exist_ok=True)
# Remove legacy pickle file from pre-SQLite era.
legacy_pickle = _STORE_DIR / "crypto_store.pickle"
if legacy_pickle.exists():
logger.info("Matrix: removing legacy crypto_store.pickle (migrated to SQLite)")
legacy_pickle.unlink()
# Open SQLite-backed crypto store.
crypto_db = Database.create(
f"sqlite:///{_CRYPTO_DB_PATH}",
upgrade_table=PgCryptoStore.upgrade_table,
)
await crypto_db.start()
self._crypto_db = crypto_db
# account_id and pickle_key are required by mautrix ≥0.21.
# Use the Matrix user ID as account_id for stable identity.
# pickle_key secures in-memory serialisation; derive from
# the same user_id:device_id pair used for the on-disk HMAC.
_acct_id = self._user_id or "hermes"
_pickle_key = f"{_acct_id}:{self._device_id}"
crypto_store = MemoryCryptoStore(
_pickle_key = f"{_acct_id}:{self._device_id or 'default'}"
crypto_store = PgCryptoStore(
account_id=_acct_id,
pickle_key=_pickle_key,
db=crypto_db,
)
await crypto_store.open()
# Restore persisted crypto state from a previous run.
# Uses HMAC to verify integrity before unpickling.
pickle_path = _CRYPTO_PICKLE_PATH
if pickle_path.exists():
try:
import hashlib, hmac, pickle
raw = pickle_path.read_bytes()
# Format: 32-byte HMAC-SHA256 signature + pickle data.
if len(raw) > 32:
sig, payload = raw[:32], raw[32:]
# Key is derived from the device_id + user_id (stable per install).
hmac_key = f"{self._user_id}:{self._device_id}".encode()
expected = hmac.new(hmac_key, payload, hashlib.sha256).digest()
if hmac.compare_digest(sig, expected):
saved = pickle.loads(payload) # noqa: S301
if isinstance(saved, MemoryCryptoStore):
crypto_store = saved
logger.info("Matrix: restored E2EE crypto store from %s", pickle_path)
else:
logger.warning("Matrix: crypto store HMAC mismatch — ignoring stale/tampered file")
except Exception as exc:
logger.warning("Matrix: could not restore crypto store: %s", exc)
crypto_state = _CryptoStateStore(state_store, self._joined_rooms)
olm = OlmMachine(client, crypto_store, crypto_state)
olm = OlmMachine(client, crypto_store, state_store)
# Set trust policy: accept unverified devices so senders
# share Megolm session keys with us automatically.
# Accept unverified devices so senders share Megolm
# session keys with us automatically.
olm.share_keys_min_trust = TrustState.UNVERIFIED
olm.send_keys_min_trust = TrustState.UNVERIFIED
await olm.load()
# Verify our device keys are still on the homeserver.
if not await self._verify_device_keys_on_server(client, olm):
await crypto_db.stop()
await api.session.close()
return False
client.crypto = olm
logger.info(
"Matrix: E2EE enabled (store: %s%s)",
str(_STORE_DIR),
str(_CRYPTO_DB_PATH),
f", device_id={client.device_id}" if client.device_id else "",
)
except Exception as exc:
@ -438,6 +552,15 @@ class MatrixAdapter(BasePlatformAdapter):
)
# Build DM room cache from m.direct account data.
await self._refresh_dm_cache()
# Dispatch events from the initial sync so the OlmMachine
# receives to-device key shares queued while we were offline.
try:
tasks = client.handle_sync(sync_data)
if tasks:
await asyncio.gather(*tasks)
except Exception as exc:
logger.warning("Matrix: initial sync event dispatch error: %s", exc)
else:
logger.warning("Matrix: initial sync returned unexpected type %s", type(sync_data).__name__)
except Exception as exc:
@ -466,21 +589,12 @@ class MatrixAdapter(BasePlatformAdapter):
except (asyncio.CancelledError, Exception):
pass
# Persist E2EE crypto store before closing so the next restart
# can decrypt events using sessions from this run.
if self._client and self._encryption and getattr(self._client, "crypto", None):
# Close the SQLite crypto store database.
if hasattr(self, "_crypto_db") and self._crypto_db:
try:
import hashlib, hmac, pickle
crypto_store = self._client.crypto.crypto_store
_STORE_DIR.mkdir(parents=True, exist_ok=True)
pickle_path = _CRYPTO_PICKLE_PATH
payload = pickle.dumps(crypto_store)
hmac_key = f"{self._user_id}:{self._device_id}".encode()
sig = hmac.new(hmac_key, payload, hashlib.sha256).digest()
pickle_path.write_bytes(sig + payload)
logger.info("Matrix: persisted E2EE crypto store to %s", pickle_path)
await self._crypto_db.stop()
except Exception as exc:
logger.debug("Matrix: could not persist crypto store on disconnect: %s", exc)
logger.debug("Matrix: could not close crypto DB on disconnect: %s", exc)
if self._client:
try:
@ -853,13 +967,6 @@ class MatrixAdapter(BasePlatformAdapter):
except Exception as exc:
logger.warning("Matrix: sync event dispatch error: %s", exc)
# Share keys periodically if E2EE is enabled.
if self._encryption and getattr(client, "crypto", None):
try:
await client.crypto.share_keys()
except Exception as exc:
logger.warning("Matrix: E2EE key share failed: %s", exc)
# Retry any buffered undecrypted events.
if self._pending_megolm:
await self._retry_pending_decryptions()