hermes-agent/tests/gateway/test_matrix.py
Chris 4717989c10
fix(matrix): isolate room context and restore reliable inbound dispatch (#18505)
* fix(matrix): isolate room context and inbound dispatch

* test(matrix): cover room isolation and dispatch regressions

* docs(matrix): document room isolation and session scope

* fix(matrix): stabilize CI requirement checks

* test(matrix): isolate mautrix stubs in requirements tests

* fix(matrix): port room-scoped status and resume to slash commands mixin

Move Matrix /status scope output and /resume same-room guards from the
pre-refactor gateway/run.py into gateway/slash_commands.py so PR #18505
foundation behavior survives the upstream god-file decomposition.

Uses i18n keys for Matrix resume/status messages. Preserves upstream
session.py fixes (role_authorized, DM user_id isolation).

* docs(matrix): explain inbound dispatch via handle_sync loop

Document why Hermes uses an explicit sync loop with handle_sync() rather than
client.start(), aligning with upstream #7914 diagnostics while preserving
Hermes background maintenance tasks.

* fix(i18n): add Matrix resume/status keys to all locale catalogs

The Matrix /resume and /status slash-command keys added in the foundation
PR must exist in every supported locale file. tests/agent/test_i18n.py
asserts key and placeholder parity across catalogs.

Non-English locales use English strings as interim placeholders until
community translators can localize them.

* fix(matrix): restore gateway authz for allowed_users; honor config require_mention

Revert the early MATRIX_ALLOWED_USERS gate in _on_room_message so inbound
sender authorization stays in gateway authz like main. Parse require_mention
from config.extra (platforms.matrix / top-level matrix yaml) with env fallback,
matching thread_require_mention and fixing Forge when require_mention is set
only in profile config.yaml.

* fix(matrix): harden status scope and allowlisted DMs

* fix(matrix): use session store lookup for resume scope
2026-06-11 07:41:43 -04:00

4343 lines
167 KiB
Python

"""Tests for Matrix platform adapter (mautrix-python backend)."""
import asyncio
import re
import stat
import sys
import time
import types
import pytest
from unittest.mock import MagicMock, patch, AsyncMock
from gateway.config import Platform, PlatformConfig
from gateway.platforms.base import MessageType
def _make_fake_mautrix():
"""Create a lightweight set of fake ``mautrix`` modules.
The adapter does ``from mautrix.api import HTTPAPI``,
``from mautrix.client import Client``, ``from mautrix.types import ...``
at import time and inside methods. We provide just enough stubs for
tests that need to mock the mautrix import chain.
Use via ``patch.dict("sys.modules", _make_fake_mautrix())``.
"""
# --- mautrix (root) ---
mautrix = types.ModuleType("mautrix")
# --- mautrix.api ---
mautrix_api = types.ModuleType("mautrix.api")
class HTTPAPI:
def __init__(self, base_url="", token="", **kwargs):
self.base_url = base_url
self.token = token
self.session = MagicMock()
self.session.close = AsyncMock()
mautrix_api.HTTPAPI = HTTPAPI
mautrix.api = mautrix_api
# --- mautrix.types ---
mautrix_types = types.ModuleType("mautrix.types")
class EventType:
ROOM_MESSAGE = "m.room.message"
REACTION = "m.reaction"
ROOM_ENCRYPTED = "m.room.encrypted"
ROOM_NAME = "m.room.name"
class UserID(str):
pass
class RoomID(str):
pass
class EventID(str):
pass
class ContentURI(str):
pass
class SyncToken(str):
pass
class RoomCreatePreset:
PRIVATE = "private_chat"
PUBLIC = "public_chat"
TRUSTED_PRIVATE = "trusted_private_chat"
class PresenceState:
ONLINE = "online"
OFFLINE = "offline"
UNAVAILABLE = "unavailable"
class TrustState:
UNVERIFIED = 0
VERIFIED = 1
class PaginationDirection:
BACKWARD = "b"
FORWARD = "f"
mautrix_types.EventType = EventType
mautrix_types.UserID = UserID
mautrix_types.RoomID = RoomID
mautrix_types.EventID = EventID
mautrix_types.ContentURI = ContentURI
mautrix_types.SyncToken = SyncToken
mautrix_types.RoomCreatePreset = RoomCreatePreset
mautrix_types.PresenceState = PresenceState
mautrix_types.TrustState = TrustState
mautrix_types.PaginationDirection = PaginationDirection
mautrix.types = mautrix_types
# --- mautrix.client ---
mautrix_client = types.ModuleType("mautrix.client")
class Client:
def __init__(self, mxid=None, device_id=None, api=None,
state_store=None, sync_store=None, **kwargs):
self.mxid = mxid
self.device_id = device_id
self.api = api
self.state_store = state_store
self.sync_store = sync_store
self.crypto = None
self._event_handlers = {}
def add_event_handler(self, event_type, handler):
self._event_handlers.setdefault(event_type, []).append(handler)
def add_dispatcher(self, dispatcher_type):
pass
class InternalEventType:
INVITE = "internal.invite"
mautrix_client.Client = Client
mautrix_client.InternalEventType = InternalEventType
mautrix.client = mautrix_client
# --- mautrix.client.dispatcher ---
mautrix_client_dispatcher = types.ModuleType("mautrix.client.dispatcher")
class MembershipEventDispatcher:
pass
mautrix_client_dispatcher.MembershipEventDispatcher = MembershipEventDispatcher
# --- mautrix.client.state_store ---
mautrix_client_state_store = types.ModuleType("mautrix.client.state_store")
class MemoryStateStore:
async def get_member(self, room_id, user_id):
return None
async def get_members(self, room_id):
return []
async def get_member_profiles(self, room_id):
return {}
class MemorySyncStore:
def __init__(self):
self.next_batch = None
async def get_next_batch(self):
return self.next_batch
async def put_next_batch(self, token):
self.next_batch = token
mautrix_client_state_store.MemoryStateStore = MemoryStateStore
mautrix_client_state_store.MemorySyncStore = MemorySyncStore
# --- mautrix.crypto ---
mautrix_crypto = types.ModuleType("mautrix.crypto")
class OlmMachine:
def __init__(self, client=None, crypto_store=None, state_store=None):
self.share_keys_min_trust = None
self.send_keys_min_trust = None
async def load(self):
pass
async def share_keys(self):
pass
async def decrypt_megolm_event(self, event):
return event
mautrix_crypto.OlmMachine = OlmMachine
# --- mautrix.crypto.store ---
mautrix_crypto_store = types.ModuleType("mautrix.crypto.store")
class MemoryCryptoStore:
def __init__(self, account_id="", pickle_key=""): # noqa: S301
self.account_id = account_id
self.pickle_key = pickle_key
mautrix_crypto_store.MemoryCryptoStore = MemoryCryptoStore
# --- mautrix.crypto.attachments ---
mautrix_crypto_attachments = types.ModuleType("mautrix.crypto.attachments")
def encrypt_attachment(data):
encrypted_file = MagicMock()
encrypted_file.serialize.return_value = {
"key": {"k": "testkey"}, "iv": "testiv",
"hashes": {"sha256": "testhash"}, "v": "v2",
}
return (b"ciphertext_" + data, encrypted_file)
mautrix_crypto_attachments.encrypt_attachment = encrypt_attachment
# --- mautrix.crypto.store.asyncpg ---
mautrix_crypto_store_asyncpg = types.ModuleType("mautrix.crypto.store.asyncpg")
class PgCryptoStore:
upgrade_table = MagicMock()
def __init__(self, account_id="", pickle_key="", db=None): # noqa: S301
self.account_id = account_id
self.pickle_key = pickle_key
self.db = db
self._device_id = ""
async def open(self):
pass
async def put_device_id(self, device_id):
self._device_id = device_id
mautrix_crypto_store_asyncpg.PgCryptoStore = PgCryptoStore
# --- mautrix.util ---
mautrix_util = types.ModuleType("mautrix.util")
# --- mautrix.util.async_db ---
mautrix_util_async_db = types.ModuleType("mautrix.util.async_db")
class Database:
@classmethod
def create(cls, url, upgrade_table=None):
db = MagicMock()
db.start = AsyncMock()
db.stop = AsyncMock()
return db
mautrix_util_async_db.Database = Database
return {
"mautrix": mautrix,
"mautrix.api": mautrix_api,
"mautrix.types": mautrix_types,
"mautrix.client": mautrix_client,
"mautrix.client.dispatcher": mautrix_client_dispatcher,
"mautrix.client.state_store": mautrix_client_state_store,
"mautrix.crypto": mautrix_crypto,
"mautrix.crypto.attachments": mautrix_crypto_attachments,
"mautrix.crypto.store": mautrix_crypto_store,
"mautrix.crypto.store.asyncpg": mautrix_crypto_store_asyncpg,
"mautrix.util": mautrix_util,
"mautrix.util.async_db": mautrix_util_async_db,
}
# ---------------------------------------------------------------------------
# Platform & Config
# ---------------------------------------------------------------------------
class TestMatrixConfigLoading:
def test_apply_env_overrides_with_access_token(self, monkeypatch):
monkeypatch.setenv("MATRIX_ACCESS_TOKEN", "syt_abc123")
monkeypatch.setenv("MATRIX_HOMESERVER", "https://matrix.example.org")
from gateway.config import GatewayConfig, _apply_env_overrides
config = GatewayConfig()
_apply_env_overrides(config)
assert Platform.MATRIX in config.platforms
mc = config.platforms[Platform.MATRIX]
assert mc.enabled is True
assert mc.token == "syt_abc123"
assert mc.extra.get("homeserver") == "https://matrix.example.org"
def test_apply_env_overrides_with_password(self, monkeypatch):
monkeypatch.delenv("MATRIX_ACCESS_TOKEN", raising=False)
monkeypatch.setenv("MATRIX_PASSWORD", "secret123")
monkeypatch.setenv("MATRIX_HOMESERVER", "https://matrix.example.org")
monkeypatch.setenv("MATRIX_USER_ID", "@bot:example.org")
from gateway.config import GatewayConfig, _apply_env_overrides
config = GatewayConfig()
_apply_env_overrides(config)
assert Platform.MATRIX in config.platforms
mc = config.platforms[Platform.MATRIX]
assert mc.enabled is True
assert mc.extra.get("password") == "secret123"
assert mc.extra.get("user_id") == "@bot:example.org"
def test_matrix_not_loaded_without_creds(self, monkeypatch):
monkeypatch.delenv("MATRIX_ACCESS_TOKEN", raising=False)
monkeypatch.delenv("MATRIX_PASSWORD", raising=False)
monkeypatch.delenv("MATRIX_HOMESERVER", raising=False)
from gateway.config import GatewayConfig, _apply_env_overrides
config = GatewayConfig()
_apply_env_overrides(config)
assert Platform.MATRIX not in config.platforms
def test_matrix_encryption_flag(self, monkeypatch):
monkeypatch.setenv("MATRIX_ACCESS_TOKEN", "syt_abc123")
monkeypatch.setenv("MATRIX_HOMESERVER", "https://matrix.example.org")
monkeypatch.setenv("MATRIX_ENCRYPTION", "true")
from gateway.config import GatewayConfig, _apply_env_overrides
config = GatewayConfig()
_apply_env_overrides(config)
mc = config.platforms[Platform.MATRIX]
assert mc.extra.get("encryption") is True
def test_matrix_e2ee_mode_optional_sets_config(self, monkeypatch):
monkeypatch.setenv("MATRIX_ACCESS_TOKEN", "syt_abc123")
monkeypatch.setenv("MATRIX_HOMESERVER", "https://matrix.example.org")
monkeypatch.setenv("MATRIX_E2EE_MODE", "optional")
monkeypatch.delenv("MATRIX_ENCRYPTION", raising=False)
from gateway.config import GatewayConfig, _apply_env_overrides
config = GatewayConfig()
_apply_env_overrides(config)
mc = config.platforms[Platform.MATRIX]
assert mc.extra.get("encryption") is True
assert mc.extra.get("e2ee_mode") == "optional"
def test_matrix_encryption_default_off(self, monkeypatch):
monkeypatch.setenv("MATRIX_ACCESS_TOKEN", "syt_abc123")
monkeypatch.setenv("MATRIX_HOMESERVER", "https://matrix.example.org")
monkeypatch.delenv("MATRIX_ENCRYPTION", raising=False)
from gateway.config import GatewayConfig, _apply_env_overrides
config = GatewayConfig()
_apply_env_overrides(config)
mc = config.platforms[Platform.MATRIX]
assert mc.extra.get("encryption") is False
def test_matrix_home_room(self, monkeypatch):
monkeypatch.setenv("MATRIX_ACCESS_TOKEN", "syt_abc123")
monkeypatch.setenv("MATRIX_HOMESERVER", "https://matrix.example.org")
monkeypatch.setenv("MATRIX_HOME_ROOM", "!room123:example.org")
monkeypatch.setenv("MATRIX_HOME_ROOM_NAME", "Bot Room")
from gateway.config import GatewayConfig, _apply_env_overrides
config = GatewayConfig()
_apply_env_overrides(config)
home = config.get_home_channel(Platform.MATRIX)
assert home is not None
assert home.chat_id == "!room123:example.org"
assert home.name == "Bot Room"
def test_matrix_user_id_stored_in_extra(self, monkeypatch):
monkeypatch.setenv("MATRIX_ACCESS_TOKEN", "syt_abc123")
monkeypatch.setenv("MATRIX_HOMESERVER", "https://matrix.example.org")
monkeypatch.setenv("MATRIX_USER_ID", "@hermes:example.org")
from gateway.config import GatewayConfig, _apply_env_overrides
config = GatewayConfig()
_apply_env_overrides(config)
mc = config.platforms[Platform.MATRIX]
assert mc.extra.get("user_id") == "@hermes:example.org"
# ---------------------------------------------------------------------------
# Adapter helpers
# ---------------------------------------------------------------------------
def _make_adapter():
"""Create a MatrixAdapter with mocked config."""
from gateway.platforms.matrix import MatrixAdapter
config = PlatformConfig(
enabled=True,
token="syt_test_token",
extra={
"homeserver": "https://matrix.example.org",
"user_id": "@bot:example.org",
},
)
adapter = MatrixAdapter(config)
return adapter
# ---------------------------------------------------------------------------
# Typing indicator
# ---------------------------------------------------------------------------
class TestMatrixTypingIndicator:
def setup_method(self):
self.adapter = _make_adapter()
self.adapter._client = MagicMock()
self.adapter._client.set_typing = AsyncMock()
@pytest.mark.asyncio
async def test_stop_typing_clears_matrix_typing_state(self):
"""stop_typing() should send typing=false instead of waiting for timeout expiry."""
from gateway.platforms.matrix import RoomID
await self.adapter.stop_typing("!room:example.org")
self.adapter._client.set_typing.assert_awaited_once_with(
RoomID("!room:example.org"),
timeout=0,
)
@pytest.mark.asyncio
async def test_stop_typing_no_client_is_noop(self):
self.adapter._client = None
await self.adapter.stop_typing("!room:example.org") # should not raise
@pytest.mark.asyncio
async def test_stop_typing_suppresses_exceptions(self):
self.adapter._client.set_typing = AsyncMock(side_effect=Exception("network"))
await self.adapter.stop_typing("!room:example.org") # should not raise
# ---------------------------------------------------------------------------
# mxc:// URL conversion
# ---------------------------------------------------------------------------
class TestMatrixMxcToHttp:
def setup_method(self):
self.adapter = _make_adapter()
def test_basic_mxc_conversion(self):
"""mxc://server/media_id should become an authenticated HTTP URL."""
mxc = "mxc://matrix.org/abc123"
result = self.adapter._mxc_to_http(mxc)
assert result == "https://matrix.example.org/_matrix/client/v1/media/download/matrix.org/abc123"
def test_mxc_with_different_server(self):
"""mxc:// from a different server should still use our homeserver."""
mxc = "mxc://other.server/media456"
result = self.adapter._mxc_to_http(mxc)
assert result.startswith("https://matrix.example.org/")
assert "other.server/media456" in result
def test_non_mxc_url_passthrough(self):
"""Non-mxc URLs should be returned unchanged."""
url = "https://example.com/image.png"
assert self.adapter._mxc_to_http(url) == url
def test_mxc_uses_client_v1_endpoint(self):
"""Should use /_matrix/client/v1/media/download/ not the deprecated path."""
mxc = "mxc://example.com/test123"
result = self.adapter._mxc_to_http(mxc)
assert "/_matrix/client/v1/media/download/" in result
assert "/_matrix/media/v3/download/" not in result
# ---------------------------------------------------------------------------
# DM detection
# ---------------------------------------------------------------------------
class TestMatrixDmDetection:
def setup_method(self):
self.adapter = _make_adapter()
def test_room_in_m_direct_is_dm(self):
"""A room listed in m.direct should be detected as DM."""
self.adapter._joined_rooms = {"!dm_room:ex.org", "!group_room:ex.org"}
self.adapter._dm_rooms = {
"!dm_room:ex.org": True,
"!group_room:ex.org": False,
}
assert self.adapter._dm_rooms.get("!dm_room:ex.org") is True
assert self.adapter._dm_rooms.get("!group_room:ex.org") is False
def test_unknown_room_not_in_cache(self):
"""Unknown rooms should not be in the DM cache."""
self.adapter._dm_rooms = {}
assert self.adapter._dm_rooms.get("!unknown:ex.org") is None
@pytest.mark.asyncio
async def test_refresh_dm_cache_with_m_direct(self):
"""_refresh_dm_cache should populate _dm_rooms from m.direct data."""
self.adapter._joined_rooms = {"!room_a:ex.org", "!room_b:ex.org", "!room_c:ex.org"}
mock_client = MagicMock()
mock_resp = MagicMock()
mock_resp.content = {
"@alice:ex.org": ["!room_a:ex.org"],
"@bob:ex.org": ["!room_b:ex.org"],
}
mock_client.get_account_data = AsyncMock(return_value=mock_resp)
self.adapter._client = mock_client
await self.adapter._refresh_dm_cache()
assert self.adapter._dm_rooms["!room_a:ex.org"] is True
assert self.adapter._dm_rooms["!room_b:ex.org"] is True
assert self.adapter._dm_rooms["!room_c:ex.org"] is False
@pytest.mark.asyncio
async def test_m_direct_room_is_dm(self):
"""m.direct account data is the authoritative DM signal."""
self.adapter._joined_rooms = {"!dm_room:ex.org"}
self.adapter._dm_rooms = {"!dm_room:ex.org": True}
self.adapter._client = MagicMock()
self.adapter._client.get_state_event = AsyncMock(side_effect=Exception("no state"))
self.adapter._client.state_store = MagicMock()
self.adapter._client.state_store.get_members = AsyncMock(return_value=["@bot:ex.org", "@alice:ex.org"])
assert await self.adapter._is_dm_room("!dm_room:ex.org") is True
@pytest.mark.asyncio
async def test_named_two_member_room_is_not_dm(self):
"""A named two-member room must remain a room, not a DM."""
self.adapter._joined_rooms = {"!project:ex.org"}
self.adapter._dm_rooms = {}
self.adapter._client = MagicMock()
self.adapter._client.get_state_event = AsyncMock(
side_effect=lambda room_id, event_type: {"name": "Project Room"}
if event_type == "m.room.name"
else (_ for _ in ()).throw(Exception("no alias"))
)
self.adapter._client.state_store = MagicMock()
self.adapter._client.state_store.get_members = AsyncMock(
return_value=["@bot:ex.org", "@alice:ex.org"]
)
identity = await self.adapter._resolve_room_identity("!project:ex.org")
assert identity.chat_type == "room"
assert identity.display_name == "Project Room"
assert identity.joined_member_count == 2
assert await self.adapter._is_dm_room("!project:ex.org") is False
@pytest.mark.asyncio
async def test_named_room_overrides_stale_dm_cache(self):
"""Explicit room names should defeat stale/conflicting m.direct data."""
self.adapter._joined_rooms = {"!stale:ex.org"}
self.adapter._dm_rooms = {"!stale:ex.org": True}
self.adapter._client = MagicMock()
self.adapter._client.get_state_event = AsyncMock(
side_effect=lambda room_id, event_type: {"content": {"name": "Ops Room"}}
if event_type == "m.room.name"
else (_ for _ in ()).throw(Exception("no alias"))
)
self.adapter._client.state_store = MagicMock()
self.adapter._client.state_store.get_members = AsyncMock(return_value=["@bot:ex.org", "@alice:ex.org"])
identity = await self.adapter._resolve_room_identity("!stale:ex.org")
assert identity.chat_type == "room"
assert identity.conflict is True
assert await self.adapter._is_dm_room("!stale:ex.org") is False
@pytest.mark.asyncio
async def test_canonical_alias_used_when_name_missing(self):
self.adapter._joined_rooms = {"!alias:ex.org"}
self.adapter._dm_rooms = {}
self.adapter._client = MagicMock()
async def get_state_event(room_id, event_type):
if event_type == "m.room.name":
raise Exception("no name")
if event_type == "m.room.canonical_alias":
return {"content": {"alias": "#hermes:ex.org"}}
raise Exception("unknown")
self.adapter._client.get_state_event = AsyncMock(side_effect=get_state_event)
self.adapter._client.state_store = MagicMock()
self.adapter._client.state_store.get_members = AsyncMock(return_value=None)
identity = await self.adapter._resolve_room_identity("!alias:ex.org")
assert identity.display_name == "#hermes:ex.org"
assert identity.chat_type == "room"
@pytest.mark.asyncio
async def test_non_string_m_direct_entries_ignored(self):
self.adapter._joined_rooms = {"!room_a:ex.org", "!room_b:ex.org"}
mock_client = MagicMock()
mock_resp = MagicMock()
mock_resp.content = {
"@alice:ex.org": ["!room_a:ex.org", 42, None],
}
mock_client.get_account_data = AsyncMock(return_value=mock_resp)
self.adapter._client = mock_client
await self.adapter._refresh_dm_cache()
assert self.adapter._dm_rooms == {
"!room_a:ex.org": True,
"!room_b:ex.org": False,
}
# ---------------------------------------------------------------------------
# Reply fallback stripping
# ---------------------------------------------------------------------------
class TestMatrixReplyFallbackStripping:
"""Test that Matrix reply fallback lines ('> ' prefix) are stripped."""
def setup_method(self):
self.adapter = _make_adapter()
self.adapter._user_id = "@bot:example.org"
self.adapter._startup_ts = 0.0
self.adapter._dm_rooms = {}
self.adapter._message_handler = AsyncMock()
def _strip_fallback(self, body: str, has_reply: bool = True) -> str:
"""Simulate the reply fallback stripping logic from _on_room_message."""
reply_to = "some_event_id" if has_reply else None
if reply_to and body.startswith("> "):
lines = body.split("\n")
stripped = []
past_fallback = False
for line in lines:
if not past_fallback:
if line.startswith("> ") or line == ">":
continue
if line == "":
past_fallback = True
continue
past_fallback = True
stripped.append(line)
body = "\n".join(stripped) if stripped else body
return body
def test_simple_reply_fallback(self):
body = "> <@alice:ex.org> Original message\n\nActual reply"
result = self._strip_fallback(body)
assert result == "Actual reply"
def test_multiline_reply_fallback(self):
body = "> <@alice:ex.org> Line 1\n> Line 2\n\nMy response"
result = self._strip_fallback(body)
assert result == "My response"
def test_no_reply_fallback_preserved(self):
body = "Just a normal message"
result = self._strip_fallback(body, has_reply=False)
assert result == "Just a normal message"
def test_quote_without_reply_preserved(self):
"""'> ' lines without a reply_to context should be preserved."""
body = "> This is a blockquote"
result = self._strip_fallback(body, has_reply=False)
assert result == "> This is a blockquote"
def test_empty_fallback_separator(self):
"""The blank line between fallback and actual content should be stripped."""
body = "> <@alice:ex.org> hi\n>\n\nResponse"
result = self._strip_fallback(body)
assert result == "Response"
def test_multiline_response_after_fallback(self):
body = "> <@alice:ex.org> Original\n\nLine 1\nLine 2\nLine 3"
result = self._strip_fallback(body)
assert result == "Line 1\nLine 2\nLine 3"
# ---------------------------------------------------------------------------
# Matrix-friendly command aliases
# ---------------------------------------------------------------------------
class TestMatrixBangCommandAlias:
"""Matrix clients may reserve /commands, so Hermes supports !commands."""
def setup_method(self):
self.adapter = _make_adapter()
self.adapter._is_dm_room = AsyncMock(return_value=True)
self.adapter._get_display_name = AsyncMock(return_value="Alice")
self.adapter._background_read_receipt = MagicMock()
self.adapter._text_batch_delay_seconds = 0
async def _dispatch_text(self, body: str, *, is_dm: bool = True):
captured_event = None
self.adapter._is_dm_room = AsyncMock(return_value=is_dm)
self.adapter._require_mention = True
self.adapter._free_rooms = set()
async def capture(msg_event):
nonlocal captured_event
captured_event = msg_event
self.adapter.handle_message = capture
await self.adapter._handle_text_message(
room_id="!room:example.org",
sender="@alice:example.org",
event_id="$matrix-command-test",
event_ts=0.0,
source_content={"msgtype": "m.text", "body": body},
relates_to={},
)
return captured_event
async def _dispatch_text_reply(self, body: str, *, is_dm: bool = True):
"""Dispatch a message that is a Matrix reply (m.in_reply_to set), so
the reply-fallback quote stripping path runs before command detection.
"""
captured_event = None
self.adapter._is_dm_room = AsyncMock(return_value=is_dm)
self.adapter._require_mention = True
self.adapter._free_rooms = set()
async def capture(msg_event):
nonlocal captured_event
captured_event = msg_event
self.adapter.handle_message = capture
await self.adapter._handle_text_message(
room_id="!room:example.org",
sender="@alice:example.org",
event_id="$matrix-reply-command-test",
event_ts=0.0,
source_content={"msgtype": "m.text", "body": body},
relates_to={"m.in_reply_to": {"event_id": "$parent-event"}},
)
return captured_event
def test_known_bang_command_normalizes_to_slash_command(self):
from gateway.platforms.matrix import _normalize_matrix_bang_command
assert _normalize_matrix_bang_command("!model") == "/model"
assert (
_normalize_matrix_bang_command("!queue continue the plan")
== "/queue continue the plan"
)
assert (
_normalize_matrix_bang_command("!btw research this")
== "/btw research this"
)
assert _normalize_matrix_bang_command("!tasks") == "/tasks"
def test_unknown_bang_text_is_not_treated_as_command(self):
from gateway.platforms.matrix import _normalize_matrix_bang_command
assert _normalize_matrix_bang_command("!important note") == "!important note"
assert _normalize_matrix_bang_command("! wow") == "! wow"
assert _normalize_matrix_bang_command("plain text") == "plain text"
assert _normalize_matrix_bang_command("/model") == "/model"
@pytest.mark.asyncio
async def test_bang_model_reaches_gateway_as_slash_command(self):
captured_event = await self._dispatch_text("!model")
assert captured_event is not None
assert captured_event.text == "/model"
assert captured_event.message_type == MessageType.COMMAND
assert captured_event.get_command() == "model"
@pytest.mark.asyncio
async def test_bang_queue_preserves_arguments(self):
captured_event = await self._dispatch_text("!queue keep going")
assert captured_event is not None
assert captured_event.text == "/queue keep going"
assert captured_event.message_type == MessageType.COMMAND
assert captured_event.get_command() == "queue"
assert captured_event.get_command_args() == "keep going"
@pytest.mark.asyncio
async def test_unknown_bang_text_stays_normal_text(self):
captured_event = await self._dispatch_text("!important note")
assert captured_event is not None
assert captured_event.text == "!important note"
assert captured_event.message_type == MessageType.TEXT
assert captured_event.get_command() is None
@pytest.mark.asyncio
async def test_bang_command_bypasses_room_mention_requirement(self):
captured_event = await self._dispatch_text("!commands", is_dm=False)
assert captured_event is not None
assert captured_event.text == "/commands"
assert captured_event.message_type == MessageType.COMMAND
@pytest.mark.asyncio
async def test_slash_command_bypasses_room_mention_requirement(self):
captured_event = await self._dispatch_text("/sethome", is_dm=False)
assert captured_event is not None
assert captured_event.text == "/sethome"
assert captured_event.message_type == MessageType.COMMAND
@pytest.mark.asyncio
async def test_unknown_bang_text_does_not_bypass_room_mention_requirement(self):
captured_event = await self._dispatch_text("!important note", is_dm=False)
assert captured_event is None
def test_bang_alias_underscore_resolves_to_hyphen_form(self):
"""!set_home must emit a dispatchable token even though set_home is
not itself registered — the hyphenated alias set-home is."""
from gateway.platforms.matrix import _normalize_matrix_bang_command
# set_home (underscore) is NOT a registered command/alias, but
# set-home (hyphen) is. The normalizer must emit the resolvable form.
assert _normalize_matrix_bang_command("!set_home") == "/set-home"
# The hyphen alias passes through unchanged.
assert _normalize_matrix_bang_command("!set-home") == "/set-home"
# The canonical command resolves directly.
assert _normalize_matrix_bang_command("!sethome") == "/sethome"
def test_bang_skill_command_normalizes(self):
"""The get_skill_commands() branch normalizes installed skill
commands, not just built-in gateway commands. Skill keys are stored
slash-prefixed (e.g. "/arxiv"), which the resolver must account for."""
import agent.skill_commands as skill_commands_mod
fake_skills = {"/arxiv": {}, "/obsidian": {}}
with patch.object(
skill_commands_mod, "get_skill_commands", return_value=fake_skills
):
from gateway.platforms.matrix import _normalize_matrix_bang_command
# is_gateway_known_command won't know these; the skill branch must.
assert _normalize_matrix_bang_command("!arxiv") == "/arxiv"
assert (
_normalize_matrix_bang_command("!obsidian search foo")
== "/obsidian search foo"
)
# A name in neither registry stays plain text.
assert (
_normalize_matrix_bang_command("!definitelynotacommand")
== "!definitelynotacommand"
)
@pytest.mark.asyncio
async def test_bang_command_in_quoted_reply_normalizes(self):
"""A bang command that follows a Matrix reply-fallback quote is
normalized after the quote is stripped, matching /command behavior."""
captured_event = await self._dispatch_text_reply(
"> <@bob:example.org> earlier message\n\n!model"
)
assert captured_event is not None
assert captured_event.text == "/model"
assert captured_event.message_type == MessageType.COMMAND
assert captured_event.get_command() == "model"
@pytest.mark.asyncio
async def test_slash_command_in_quoted_reply_normalizes(self):
"""Sanity: the slash equivalent already works post-strip — the bang
form above must reach parity with this."""
captured_event = await self._dispatch_text_reply(
"> <@bob:example.org> earlier message\n\n/model"
)
assert captured_event is not None
assert captured_event.text == "/model"
assert captured_event.message_type == MessageType.COMMAND
# ---------------------------------------------------------------------------
# Thread detection
# ---------------------------------------------------------------------------
class TestMatrixThreadDetection:
def test_thread_id_from_m_relates_to(self):
"""m.relates_to with rel_type=m.thread should extract the event_id."""
relates_to = {
"rel_type": "m.thread",
"event_id": "$thread_root_event",
"is_falling_back": True,
"m.in_reply_to": {"event_id": "$some_event"},
}
# Simulate the extraction logic from _on_room_message
thread_id = None
if relates_to.get("rel_type") == "m.thread":
thread_id = relates_to.get("event_id")
assert thread_id == "$thread_root_event"
def test_no_thread_for_reply(self):
"""m.in_reply_to without m.thread should not set thread_id."""
relates_to = {
"m.in_reply_to": {"event_id": "$reply_event"},
}
thread_id = None
if relates_to.get("rel_type") == "m.thread":
thread_id = relates_to.get("event_id")
assert thread_id is None
def test_no_thread_for_edit(self):
"""m.replace relation should not set thread_id."""
relates_to = {
"rel_type": "m.replace",
"event_id": "$edited_event",
}
thread_id = None
if relates_to.get("rel_type") == "m.thread":
thread_id = relates_to.get("event_id")
assert thread_id is None
def test_empty_relates_to(self):
"""Empty m.relates_to should not set thread_id."""
relates_to = {}
thread_id = None
if relates_to.get("rel_type") == "m.thread":
thread_id = relates_to.get("event_id")
assert thread_id is None
# ---------------------------------------------------------------------------
# Format message
# ---------------------------------------------------------------------------
class TestMatrixFormatMessage:
def setup_method(self):
self.adapter = _make_adapter()
def test_image_markdown_stripped(self):
"""![alt](url) should be converted to just the URL."""
result = self.adapter.format_message("![cat](https://img.example.com/cat.png)")
assert result == "https://img.example.com/cat.png"
def test_regular_markdown_preserved(self):
"""Standard markdown should be preserved (Matrix supports it)."""
content = "**bold** and *italic* and `code`"
assert self.adapter.format_message(content) == content
def test_plain_text_unchanged(self):
content = "Hello, world!"
assert self.adapter.format_message(content) == content
def test_multiple_images_stripped(self):
content = "![a](http://a.com/1.png) and ![b](http://b.com/2.png)"
result = self.adapter.format_message(content)
assert "![" not in result
assert "http://a.com/1.png" in result
assert "http://b.com/2.png" in result
# ---------------------------------------------------------------------------
# Rendering payloads
# ---------------------------------------------------------------------------
class TestMatrixRenderingPayloads:
def setup_method(self):
self.adapter = _make_adapter()
self.mock_client = MagicMock()
self.mock_client.send_message_event = AsyncMock(return_value="$evt")
self.adapter._client = self.mock_client
def _sent_contents(self):
return [
call.args[2] if len(call.args) > 2 else call.kwargs["content"]
for call in self.mock_client.send_message_event.await_args_list
]
@pytest.mark.asyncio
async def test_render_plain_and_html_body(self):
result = await self.adapter.send("!room:example.org", "**Bold** and plain")
assert result.success is True
content = self._sent_contents()[0]
assert content["body"] == "**Bold** and plain"
assert content["format"] == "org.matrix.custom.html"
assert "<strong>Bold</strong>" in content["formatted_body"]
@pytest.mark.asyncio
async def test_thread_payload_uses_m_thread_with_reply_fallback(self):
result = await self.adapter.send(
"!room:example.org",
"threaded",
metadata={"thread_id": "$root"},
)
assert result.success is True
relates_to = self._sent_contents()[0]["m.relates_to"]
assert relates_to == {
"rel_type": "m.thread",
"event_id": "$root",
"is_falling_back": True,
"m.in_reply_to": {"event_id": "$root"},
}
@pytest.mark.asyncio
async def test_thread_payload_preserves_explicit_reply_target(self):
result = await self.adapter.send(
"!room:example.org",
"threaded reply",
reply_to="$reply",
metadata={"thread_id": "$root"},
)
assert result.success is True
relates_to = self._sent_contents()[0]["m.relates_to"]
assert relates_to["event_id"] == "$root"
assert relates_to["m.in_reply_to"] == {"event_id": "$reply"}
@pytest.mark.asyncio
async def test_edit_payload_uses_m_replace(self):
result = await self.adapter.edit_message(
"!room:example.org",
"$original",
"edited **body**",
)
assert result.success is True
content = self._sent_contents()[0]
assert content["m.relates_to"] == {
"rel_type": "m.replace",
"event_id": "$original",
}
assert content["m.new_content"]["body"] == "edited **body**"
assert content["body"] == "* edited **body**"
@pytest.mark.asyncio
async def test_long_response_split_preserves_thread_context(self):
long_text = "Intro\n```python\n" + ("print('hello')\n" * 500) + "```\nDone"
result = await self.adapter.send(
"!room:example.org",
long_text,
metadata={"thread_id": "$root"},
)
assert result.success is True
contents = self._sent_contents()
assert len(contents) > 1
for content in contents:
assert content["m.relates_to"]["rel_type"] == "m.thread"
assert content["m.relates_to"]["event_id"] == "$root"
assert content["m.relates_to"]["m.in_reply_to"] == {"event_id": "$root"}
assert content["body"].count("```") % 2 == 0
# ---------------------------------------------------------------------------
# Markdown to HTML conversion
# ---------------------------------------------------------------------------
class TestMatrixMarkdownToHtml:
def setup_method(self):
self.adapter = _make_adapter()
def test_bold_conversion(self):
"""**bold** should produce <strong> tags."""
result = self.adapter._markdown_to_html("**bold**")
assert "<strong>" in result or "<b>" in result
assert "bold" in result
def test_italic_conversion(self):
"""*italic* should produce <em> tags."""
result = self.adapter._markdown_to_html("*italic*")
assert "<em>" in result or "<i>" in result
def test_inline_code(self):
"""`code` should produce <code> tags."""
result = self.adapter._markdown_to_html("`code`")
assert "<code>" in result
def test_plain_text_returns_html(self):
"""Plain text should still be returned (possibly with <br> or <p>)."""
result = self.adapter._markdown_to_html("Hello world")
assert "Hello world" in result
def test_matrix_markdown_strips_script_tag(self):
result = self.adapter._markdown_to_html("Hello <script>alert(1)</script>")
assert "<script" not in result.lower()
assert "alert(1)" not in result
def test_matrix_markdown_strips_inline_event_handlers(self):
result = self.adapter._markdown_to_html('<b onclick="alert(1)">bold</b>')
assert "onclick" not in result.lower()
assert "bold" in result
def test_matrix_markdown_rejects_javascript_links(self):
result = self.adapter._markdown_to_html("[click](javascript:alert(1))")
assert "javascript:" not in result.lower()
assert "<a" in result
def test_matrix_markdown_rejects_file_links(self):
result = self.adapter._markdown_to_html("[click](file:///etc/passwd)")
assert "file:" not in result.lower()
def test_matrix_markdown_rejects_ftp_links(self):
result = self.adapter._markdown_to_html("[click](ftp://example.com/file)")
assert "ftp:" not in result.lower()
def test_matrix_markdown_rejects_blob_links(self):
result = self.adapter._markdown_to_html("[click](blob:https://example.com/id)")
assert "blob:" not in result.lower()
def test_matrix_markdown_rejects_obfuscated_javascript_links(self):
from gateway.platforms.matrix import _sanitize_matrix_html
result = _sanitize_matrix_html('<a href="java\nscript:alert(1)">click</a>')
assert "javascript:" not in result.lower()
assert "href=" not in result.lower()
assert "click" in result
def test_matrix_markdown_preserves_code_fences(self):
result = self.adapter._markdown_to_html("```python\nprint('x')\n```")
assert "<pre>" in result
assert "<code" in result
assert "print" in result
# ---------------------------------------------------------------------------
# Helper: display name extraction
# ---------------------------------------------------------------------------
class TestMatrixDisplayName:
def setup_method(self):
self.adapter = _make_adapter()
@pytest.mark.asyncio
async def test_get_display_name_from_state_store(self):
"""Should get display name from state_store.get_member()."""
mock_member = MagicMock()
mock_member.displayname = "Alice"
mock_state_store = MagicMock()
mock_state_store.get_member = AsyncMock(return_value=mock_member)
mock_client = MagicMock()
mock_client.state_store = mock_state_store
self.adapter._client = mock_client
name = await self.adapter._get_display_name("!room:ex.org", "@alice:ex.org")
assert name == "Alice"
@pytest.mark.asyncio
async def test_get_display_name_fallback_to_localpart(self):
"""Should extract localpart from @user:server format."""
mock_state_store = MagicMock()
mock_state_store.get_member = AsyncMock(return_value=None)
mock_client = MagicMock()
mock_client.state_store = mock_state_store
self.adapter._client = mock_client
name = await self.adapter._get_display_name("!room:ex.org", "@bob:example.org")
assert name == "bob"
@pytest.mark.asyncio
async def test_get_display_name_no_client(self):
"""Should handle None client gracefully."""
self.adapter._client = None
name = await self.adapter._get_display_name("!room:ex.org", "@charlie:ex.org")
assert name == "charlie"
# ---------------------------------------------------------------------------
# Requirements check
# ---------------------------------------------------------------------------
class TestMatrixModuleImport:
def test_module_importable_without_mautrix(self):
"""gateway.platforms.matrix must be importable even when mautrix is
not installed — otherwise the gateway crashes for ALL platforms.
This test uses a subprocess to avoid polluting the current process's
sys.modules (reimporting a module creates a second module object whose
classes don't share globals with the original — breaking patch.object
in subsequent tests).
"""
import subprocess
result = subprocess.run(
[sys.executable, "-c", (
"import sys\n"
"# Block mautrix completely\n"
"class _Blocker:\n"
" def find_module(self, name, path=None):\n"
" if name.startswith('mautrix'): return self\n"
" def load_module(self, name):\n"
" raise ImportError(f'blocked: {name}')\n"
"sys.meta_path.insert(0, _Blocker())\n"
"for k in list(sys.modules):\n"
" if k.startswith('mautrix'): del sys.modules[k]\n"
"from unittest.mock import patch\n"
"from gateway.platforms.matrix import check_matrix_requirements\n"
"with patch('tools.lazy_deps.ensure', side_effect=ImportError('blocked')):\n"
" assert not check_matrix_requirements()\n"
"print('OK')\n"
)],
capture_output=True, text=True, timeout=10,
)
assert result.returncode == 0, (
f"Subprocess failed:\nstdout: {result.stdout}\nstderr: {result.stderr}"
)
class TestMatrixRequirements:
def test_check_requirements_with_token(self, monkeypatch):
monkeypatch.setenv("MATRIX_ACCESS_TOKEN", "syt_test")
monkeypatch.setenv("MATRIX_HOMESERVER", "https://matrix.example.org")
monkeypatch.delenv("MATRIX_ENCRYPTION", raising=False)
from gateway.platforms.matrix import check_matrix_requirements
with patch("tools.lazy_deps.feature_missing", return_value=()):
assert check_matrix_requirements() is True
def test_check_requirements_without_creds(self, monkeypatch):
monkeypatch.delenv("MATRIX_ACCESS_TOKEN", raising=False)
monkeypatch.delenv("MATRIX_PASSWORD", raising=False)
monkeypatch.delenv("MATRIX_HOMESERVER", raising=False)
from gateway.platforms.matrix import check_matrix_requirements
assert check_matrix_requirements() is False
def test_check_requirements_without_homeserver(self, monkeypatch):
monkeypatch.setenv("MATRIX_ACCESS_TOKEN", "syt_test")
monkeypatch.delenv("MATRIX_HOMESERVER", raising=False)
from gateway.platforms.matrix import check_matrix_requirements
assert check_matrix_requirements() is False
def test_check_requirements_encryption_true_no_e2ee_deps(self, monkeypatch):
"""MATRIX_ENCRYPTION=true should fail if python-olm is not installed."""
monkeypatch.setenv("MATRIX_ACCESS_TOKEN", "syt_test")
monkeypatch.setenv("MATRIX_HOMESERVER", "https://matrix.example.org")
monkeypatch.setenv("MATRIX_ENCRYPTION", "true")
from gateway.platforms import matrix as matrix_mod
with patch.object(matrix_mod, "_check_e2ee_deps", return_value=False), \
patch("tools.lazy_deps.feature_missing", return_value=()):
assert matrix_mod.check_matrix_requirements() is False
def test_check_requirements_e2ee_optional_no_deps_ok(self, monkeypatch):
"""MATRIX_E2EE_MODE=optional should not block startup without python-olm."""
monkeypatch.setenv("MATRIX_ACCESS_TOKEN", "syt_test")
monkeypatch.setenv("MATRIX_HOMESERVER", "https://matrix.example.org")
monkeypatch.setenv("MATRIX_E2EE_MODE", "optional")
monkeypatch.delenv("MATRIX_ENCRYPTION", raising=False)
from gateway.platforms import matrix as matrix_mod
with patch.object(matrix_mod, "_check_e2ee_deps", return_value=False), \
patch("tools.lazy_deps.feature_missing", return_value=()), \
patch("tools.lazy_deps.ensure_and_bind", return_value=True):
assert matrix_mod.check_matrix_requirements() is True
def test_check_requirements_encryption_false_no_e2ee_deps_ok(self, monkeypatch):
"""Without encryption, missing E2EE deps should not block startup."""
monkeypatch.setenv("MATRIX_ACCESS_TOKEN", "syt_test")
monkeypatch.setenv("MATRIX_HOMESERVER", "https://matrix.example.org")
monkeypatch.delenv("MATRIX_ENCRYPTION", raising=False)
from gateway.platforms import matrix as matrix_mod
with patch.object(matrix_mod, "_check_e2ee_deps", return_value=False), \
patch("tools.lazy_deps.feature_missing", return_value=()):
assert matrix_mod.check_matrix_requirements() is True
def test_check_requirements_encryption_true_with_e2ee_deps(self, monkeypatch):
"""MATRIX_ENCRYPTION=true should pass if E2EE deps are available."""
monkeypatch.setenv("MATRIX_ACCESS_TOKEN", "syt_test")
monkeypatch.setenv("MATRIX_HOMESERVER", "https://matrix.example.org")
monkeypatch.setenv("MATRIX_ENCRYPTION", "true")
from gateway.platforms import matrix as matrix_mod
with patch.object(matrix_mod, "_check_e2ee_deps", return_value=True), \
patch("tools.lazy_deps.feature_missing", return_value=()):
assert matrix_mod.check_matrix_requirements() is True
def test_check_e2ee_deps_requires_asyncpg(self, monkeypatch):
"""E2EE deps check must reject when asyncpg is missing — even if olm is present.
Regression for #31116: ``mautrix[encryption]`` extra installs python-olm
but NOT asyncpg/aiosqlite, which are required by mautrix's crypto store
at connect time. ``_check_e2ee_deps`` previously only tested
``OlmMachine`` import and returned True, so the failure manifested as
a confusing ``No module named 'asyncpg'`` deep in
``MatrixAdapter.connect()``.
"""
from gateway.platforms.matrix import _check_e2ee_deps
import builtins
real_import = builtins.__import__
def _blocking_import(name, *args, **kwargs):
if name == "asyncpg" or name.startswith("asyncpg."):
raise ImportError("blocked for test")
return real_import(name, *args, **kwargs)
with patch.object(builtins, "__import__", _blocking_import):
assert _check_e2ee_deps() is False
def test_check_e2ee_deps_requires_aiosqlite(self):
"""E2EE deps check must reject when aiosqlite is missing.
Mautrix's ``Database.create("sqlite:///...")`` driver lookup imports
aiosqlite lazily — without it, connect fails at ``crypto_db.start()``.
"""
from gateway.platforms.matrix import _check_e2ee_deps
import builtins
real_import = builtins.__import__
def _blocking_import(name, *args, **kwargs):
if name == "aiosqlite" or name.startswith("aiosqlite."):
raise ImportError("blocked for test")
return real_import(name, *args, **kwargs)
with patch.object(builtins, "__import__", _blocking_import):
assert _check_e2ee_deps() is False
def test_check_requirements_runs_lazy_install_when_partial(self, monkeypatch):
"""When mautrix is installed but asyncpg/aiosqlite are missing,
check_matrix_requirements must still run the lazy installer.
Regression for #31116: the previous ``try: import mautrix`` gate
short-circuited the install of the OTHER 4 platform.matrix packages,
so a partial install (mautrix only) was treated as fully installed.
"""
monkeypatch.setenv("MATRIX_ACCESS_TOKEN", "syt_test")
monkeypatch.setenv("MATRIX_HOMESERVER", "https://matrix.example.org")
monkeypatch.delenv("MATRIX_ENCRYPTION", raising=False)
from gateway.platforms import matrix as matrix_mod
# Simulate "mautrix installed, asyncpg missing" → feature_missing
# returns a non-empty tuple → ensure_and_bind MUST be called.
called = {"ensure_and_bind": False}
def _fake_ensure_and_bind(feature, importer, target_globals, **kwargs):
called["ensure_and_bind"] = True
assert feature == "platform.matrix"
return True # Pretend install succeeded.
with patch("tools.lazy_deps.feature_missing", return_value=("asyncpg==0.31.0",)), \
patch("tools.lazy_deps.ensure_and_bind", side_effect=_fake_ensure_and_bind):
matrix_mod.check_matrix_requirements()
assert called["ensure_and_bind"], (
"check_matrix_requirements must call ensure_and_bind whenever ANY "
"platform.matrix dep is missing, not just when mautrix itself is "
"missing (#31116)"
)
# ---------------------------------------------------------------------------
# Access-token auth / E2EE bootstrap
# ---------------------------------------------------------------------------
class TestMatrixAccessTokenAuth:
@pytest.mark.asyncio
async def test_connect_with_access_token_and_encryption(self):
"""connect() should call whoami, set user_id/device_id, set up crypto."""
from gateway.platforms.matrix import MatrixAdapter
config = PlatformConfig(
enabled=True,
token="syt_test_access_token",
extra={
"homeserver": "https://matrix.example.org",
"user_id": "@bot:example.org",
"encryption": True,
},
)
adapter = MatrixAdapter(config)
class FakeWhoamiResponse:
def __init__(self, user_id, device_id):
self.user_id = user_id
self.device_id = device_id
fake_mautrix_mods = _make_fake_mautrix()
# Create a mock client that returns from the mautrix.client.Client constructor
mock_client = MagicMock()
mock_client.mxid = "@bot:example.org"
mock_client.device_id = None
mock_client.state_store = MagicMock()
mock_client.sync_store = MagicMock()
mock_client.crypto = None
mock_client.whoami = AsyncMock(return_value=FakeWhoamiResponse("@bot:example.org", "DEV123"))
mock_client.sync = AsyncMock(return_value={"rooms": {"join": {"!room:server": {}}}})
mock_client.add_event_handler = MagicMock()
mock_client.handle_sync = MagicMock(return_value=[])
mock_client.query_keys = AsyncMock(return_value={
"device_keys": {"@bot:example.org": {"DEV123": {
"keys": {"ed25519:DEV123": "fake_ed25519_key"},
}}},
})
mock_client.api = MagicMock()
mock_client.api.token = "syt_test_access_token"
mock_client.api.session = MagicMock()
mock_client.api.session.close = AsyncMock()
# Mock the crypto setup
mock_olm = MagicMock()
mock_olm.load = AsyncMock()
mock_olm.share_keys = AsyncMock()
mock_olm.share_keys_min_trust = None
mock_olm.send_keys_min_trust = None
mock_olm.account = MagicMock()
mock_olm.account.identity_keys = {"ed25519": "fake_ed25519_key"}
# Patch Client constructor to return our mock
fake_mautrix_mods["mautrix.client"].Client = MagicMock(return_value=mock_client)
fake_mautrix_mods["mautrix.crypto"].OlmMachine = MagicMock(return_value=mock_olm)
from gateway.platforms import matrix as matrix_mod
with patch.object(matrix_mod, "_check_e2ee_deps", return_value=True):
with patch.dict("sys.modules", fake_mautrix_mods):
with patch.object(adapter, "_refresh_dm_cache", AsyncMock()):
with patch.object(adapter, "_sync_loop", AsyncMock(return_value=None)):
assert await adapter.connect() is True
mock_client.whoami.assert_awaited_once()
assert adapter._user_id == "@bot:example.org"
await adapter.disconnect()
class TestDeviceKeyReVerification:
@pytest.mark.asyncio
async def test_verify_fails_when_server_keys_mismatch_after_upload(self):
"""share_keys() succeeds but server still has old keys -> should return False."""
adapter = _make_adapter()
mock_client = MagicMock()
mock_client.mxid = "@bot:example.org"
mock_client.device_id = "TESTDEVICE"
# First query: keys missing -> triggers share_keys
# Second query: keys still don't match -> should fail
mock_keys_missing = MagicMock()
mock_keys_missing.device_keys = {"@bot:example.org": {}}
mock_keys_mismatch = MagicMock()
mock_device = MagicMock()
mock_device.keys = {"ed25519:TESTDEVICE": "server_old_key"}
mock_keys_mismatch.device_keys = {"@bot:example.org": {"TESTDEVICE": mock_device}}
mock_client.query_keys = AsyncMock(side_effect=[mock_keys_missing, mock_keys_mismatch])
mock_olm = MagicMock()
mock_olm.account = MagicMock()
mock_olm.account.shared = False
mock_olm.account.identity_keys = {"ed25519": "local_new_key"}
mock_olm.share_keys = AsyncMock()
result = await adapter._verify_device_keys_on_server(mock_client, mock_olm)
assert result is False
mock_olm.share_keys.assert_awaited_once()
class TestMatrixE2EEHardFail:
"""connect() must refuse to start when E2EE is requested but deps are missing."""
@pytest.mark.asyncio
async def test_connect_fails_when_encryption_true_but_no_e2ee_deps(self):
from gateway.platforms.matrix import MatrixAdapter
config = PlatformConfig(
enabled=True,
token="syt_test_access_token",
extra={
"homeserver": "https://matrix.example.org",
"user_id": "@bot:example.org",
"encryption": True,
},
)
adapter = MatrixAdapter(config)
fake_mautrix_mods = _make_fake_mautrix()
mock_client = MagicMock()
mock_client.whoami = AsyncMock(return_value=MagicMock(user_id="@bot:example.org", device_id="DEV123"))
mock_client.api = MagicMock()
mock_client.api.token = "syt_test_access_token"
mock_client.api.session = MagicMock()
mock_client.api.session.close = AsyncMock()
mock_client.mxid = "@bot:example.org"
mock_client.device_id = None
mock_client.crypto = None
fake_mautrix_mods["mautrix.client"].Client = MagicMock(return_value=mock_client)
from gateway.platforms import matrix as matrix_mod
with patch.object(matrix_mod, "_check_e2ee_deps", return_value=False):
with patch.dict("sys.modules", fake_mautrix_mods):
with patch.object(adapter, "_sync_loop", AsyncMock(return_value=None)):
result = await adapter.connect()
assert result is False
@pytest.mark.asyncio
async def test_connect_continues_when_e2ee_optional_but_no_deps(self):
from gateway.platforms.matrix import MatrixAdapter
config = PlatformConfig(
enabled=True,
token="syt_test_access_token",
extra={
"homeserver": "https://matrix.example.org",
"user_id": "@bot:example.org",
"e2ee_mode": "optional",
},
)
adapter = MatrixAdapter(config)
fake_mautrix_mods = _make_fake_mautrix()
mock_sync_store = MagicMock()
mock_sync_store.get_next_batch = AsyncMock(return_value=None)
mock_sync_store.put_next_batch = AsyncMock()
mock_client = MagicMock()
mock_client.whoami = AsyncMock(return_value=MagicMock(user_id="@bot:example.org", device_id="DEV123"))
mock_client.api = MagicMock()
mock_client.api.token = "syt_test_access_token"
mock_client.api.session = MagicMock()
mock_client.api.session.close = AsyncMock()
mock_client.mxid = "@bot:example.org"
mock_client.device_id = None
mock_client.crypto = None
mock_client.sync_store = mock_sync_store
mock_client.sync = AsyncMock(return_value={"rooms": {"join": {}}, "next_batch": "s1"})
mock_client.get_account_data = AsyncMock(return_value=MagicMock(content={}))
mock_client.add_dispatcher = MagicMock()
mock_client.add_event_handler = MagicMock()
mock_client.handle_sync = MagicMock(return_value=[])
fake_mautrix_mods["mautrix.client"].Client = MagicMock(return_value=mock_client)
from gateway.platforms import matrix as matrix_mod
with patch.object(matrix_mod, "_check_e2ee_deps", return_value=False):
with patch.dict("sys.modules", fake_mautrix_mods):
with patch.object(matrix_mod, "_create_matrix_session", return_value=MagicMock()):
with patch.object(adapter, "_sync_loop", AsyncMock(return_value=None)):
result = await adapter.connect()
assert result is True
assert adapter._encryption is False
await adapter.disconnect()
@pytest.mark.asyncio
async def test_connect_fails_when_crypto_setup_raises(self):
"""Even if _check_e2ee_deps passes, if OlmMachine raises, hard-fail."""
from gateway.platforms.matrix import MatrixAdapter
config = PlatformConfig(
enabled=True,
token="syt_test_access_token",
extra={
"homeserver": "https://matrix.example.org",
"user_id": "@bot:example.org",
"encryption": True,
},
)
adapter = MatrixAdapter(config)
fake_mautrix_mods = _make_fake_mautrix()
mock_client = MagicMock()
mock_client.whoami = AsyncMock(return_value=MagicMock(user_id="@bot:example.org", device_id="DEV123"))
mock_client.api = MagicMock()
mock_client.api.token = "syt_test_access_token"
mock_client.api.session = MagicMock()
mock_client.api.session.close = AsyncMock()
mock_client.mxid = "@bot:example.org"
mock_client.device_id = None
mock_client.crypto = None
fake_mautrix_mods["mautrix.client"].Client = MagicMock(return_value=mock_client)
fake_mautrix_mods["mautrix.crypto"].OlmMachine = MagicMock(side_effect=Exception("olm init failed"))
from gateway.platforms import matrix as matrix_mod
with patch.object(matrix_mod, "_check_e2ee_deps", return_value=True):
with patch.dict("sys.modules", fake_mautrix_mods):
result = await adapter.connect()
assert result is False
class TestMatrixDeviceId:
"""MATRIX_DEVICE_ID should be used for stable device identity."""
def test_device_id_from_config_extra(self):
from gateway.platforms.matrix import MatrixAdapter
config = PlatformConfig(
enabled=True,
token="syt_test",
extra={
"homeserver": "https://matrix.example.org",
"device_id": "HERMES_BOT_STABLE",
},
)
adapter = MatrixAdapter(config)
assert adapter._device_id == "HERMES_BOT_STABLE"
def test_device_id_from_env(self, monkeypatch):
monkeypatch.setenv("MATRIX_DEVICE_ID", "FROM_ENV")
from gateway.platforms.matrix import MatrixAdapter
config = PlatformConfig(
enabled=True,
token="syt_test",
extra={
"homeserver": "https://matrix.example.org",
},
)
adapter = MatrixAdapter(config)
assert adapter._device_id == "FROM_ENV"
def test_device_id_config_takes_precedence_over_env(self, monkeypatch):
monkeypatch.setenv("MATRIX_DEVICE_ID", "FROM_ENV")
from gateway.platforms.matrix import MatrixAdapter
config = PlatformConfig(
enabled=True,
token="syt_test",
extra={
"homeserver": "https://matrix.example.org",
"device_id": "FROM_CONFIG",
},
)
adapter = MatrixAdapter(config)
assert adapter._device_id == "FROM_CONFIG"
@pytest.mark.asyncio
async def test_connect_uses_configured_device_id_over_whoami(self):
"""When MATRIX_DEVICE_ID is set, it should be used instead of whoami device_id."""
from gateway.platforms.matrix import MatrixAdapter
config = PlatformConfig(
enabled=True,
token="syt_test_access_token",
extra={
"homeserver": "https://matrix.example.org",
"user_id": "@bot:example.org",
"encryption": True,
"device_id": "MY_STABLE_DEVICE",
},
)
adapter = MatrixAdapter(config)
fake_mautrix_mods = _make_fake_mautrix()
mock_client = MagicMock()
mock_client.mxid = "@bot:example.org"
mock_client.device_id = None
mock_client.state_store = MagicMock()
mock_client.sync_store = MagicMock()
mock_client.crypto = None
mock_client.whoami = AsyncMock(return_value=MagicMock(user_id="@bot:example.org", device_id="WHOAMI_DEV"))
mock_client.sync = AsyncMock(return_value={"rooms": {"join": {"!room:server": {}}}})
mock_client.add_event_handler = MagicMock()
mock_client.handle_sync = MagicMock(return_value=[])
mock_client.query_keys = AsyncMock(return_value={
"device_keys": {"@bot:example.org": {"MY_STABLE_DEVICE": {
"keys": {"ed25519:MY_STABLE_DEVICE": "fake_ed25519_key"},
}}},
})
mock_client.api = MagicMock()
mock_client.api.token = "syt_test_access_token"
mock_client.api.session = MagicMock()
mock_client.api.session.close = AsyncMock()
mock_olm = MagicMock()
mock_olm.load = AsyncMock()
mock_olm.share_keys = AsyncMock()
mock_olm.share_keys_min_trust = None
mock_olm.send_keys_min_trust = None
mock_olm.account = MagicMock()
mock_olm.account.identity_keys = {"ed25519": "fake_ed25519_key"}
fake_mautrix_mods["mautrix.client"].Client = MagicMock(return_value=mock_client)
fake_mautrix_mods["mautrix.crypto"].OlmMachine = MagicMock(return_value=mock_olm)
from gateway.platforms import matrix as matrix_mod
with patch.object(matrix_mod, "_check_e2ee_deps", return_value=True):
with patch.dict("sys.modules", fake_mautrix_mods):
with patch.object(adapter, "_refresh_dm_cache", AsyncMock()):
with patch.object(adapter, "_sync_loop", AsyncMock(return_value=None)):
assert await adapter.connect() is True
# The configured device_id should override the whoami device_id.
# In mautrix, the adapter sets client.device_id directly.
assert adapter._device_id == "MY_STABLE_DEVICE"
await adapter.disconnect()
class TestMatrixPasswordLoginDeviceId:
"""MATRIX_DEVICE_ID should be passed to mautrix Client even with password login."""
@pytest.mark.asyncio
async def test_password_login_uses_device_id(self):
from gateway.platforms.matrix import MatrixAdapter
config = PlatformConfig(
enabled=True,
extra={
"homeserver": "https://matrix.example.org",
"user_id": "@bot:example.org",
"password": "secret",
"device_id": "STABLE_PW_DEVICE",
},
)
adapter = MatrixAdapter(config)
fake_mautrix_mods = _make_fake_mautrix()
mock_client = MagicMock()
mock_client.mxid = "@bot:example.org"
mock_client.device_id = None
mock_client.state_store = MagicMock()
mock_client.sync_store = MagicMock()
mock_client.crypto = None
mock_client.login = AsyncMock(return_value=MagicMock(device_id="STABLE_PW_DEVICE", access_token="tok"))
mock_client.sync = AsyncMock(return_value={"rooms": {"join": {}}})
mock_client.add_event_handler = MagicMock()
mock_client.api = MagicMock()
mock_client.api.token = ""
mock_client.api.session = MagicMock()
mock_client.api.session.close = AsyncMock()
fake_mautrix_mods["mautrix.client"].Client = MagicMock(return_value=mock_client)
with patch.dict("sys.modules", fake_mautrix_mods):
with patch.object(adapter, "_refresh_dm_cache", AsyncMock()):
with patch.object(adapter, "_sync_loop", AsyncMock(return_value=None)):
assert await adapter.connect() is True
mock_client.login.assert_awaited_once()
assert adapter._device_id == "STABLE_PW_DEVICE"
await adapter.disconnect()
class TestMatrixDeviceIdConfig:
"""MATRIX_DEVICE_ID should be plumbed through gateway config."""
def test_device_id_in_config_extra(self, monkeypatch):
monkeypatch.setenv("MATRIX_ACCESS_TOKEN", "syt_abc123")
monkeypatch.setenv("MATRIX_HOMESERVER", "https://matrix.example.org")
monkeypatch.setenv("MATRIX_DEVICE_ID", "HERMES_BOT")
from gateway.config import GatewayConfig, _apply_env_overrides
config = GatewayConfig()
_apply_env_overrides(config)
mc = config.platforms[Platform.MATRIX]
assert mc.extra.get("device_id") == "HERMES_BOT"
def test_device_id_not_set_when_env_empty(self, monkeypatch):
monkeypatch.setenv("MATRIX_ACCESS_TOKEN", "syt_abc123")
monkeypatch.setenv("MATRIX_HOMESERVER", "https://matrix.example.org")
monkeypatch.delenv("MATRIX_DEVICE_ID", raising=False)
from gateway.config import GatewayConfig, _apply_env_overrides
config = GatewayConfig()
_apply_env_overrides(config)
mc = config.platforms[Platform.MATRIX]
assert "device_id" not in mc.extra
class TestMatrixSyncLoop:
@pytest.mark.asyncio
async def test_sync_loop_dispatches_events_and_stores_token(self):
"""_sync_loop should call handle_sync() and persist next_batch."""
adapter = _make_adapter()
adapter._encryption = True
adapter._closing = False
call_count = 0
async def _sync_once(**kwargs):
nonlocal call_count
call_count += 1
if call_count >= 1:
adapter._closing = True
return {"rooms": {"join": {"!room:example.org": {}}}, "next_batch": "s1234"}
mock_crypto = MagicMock()
mock_sync_store = MagicMock()
mock_sync_store.get_next_batch = AsyncMock(return_value=None)
mock_sync_store.put_next_batch = AsyncMock()
fake_client = MagicMock()
fake_client.sync = AsyncMock(side_effect=_sync_once)
fake_client.crypto = mock_crypto
fake_client.sync_store = mock_sync_store
fake_client.handle_sync = MagicMock(return_value=[])
adapter._client = fake_client
await adapter._sync_loop()
fake_client.sync.assert_awaited_once()
fake_client.handle_sync.assert_called_once()
mock_sync_store.put_next_batch.assert_awaited_once_with("s1234")
@pytest.mark.asyncio
async def test_sync_loop_reconciles_pending_invites(self):
"""Pending rooms.invite entries should be joined if callbacks were missed."""
adapter = _make_adapter()
adapter._closing = False
async def _sync_once(**kwargs):
adapter._closing = True
return {
"rooms": {
"join": {"!joined:example.org": {}},
"invite": {"!invited:example.org": {}},
},
"next_batch": "s1234",
}
mock_sync_store = MagicMock()
mock_sync_store.get_next_batch = AsyncMock(return_value=None)
mock_sync_store.put_next_batch = AsyncMock()
fake_client = MagicMock()
fake_client.sync = AsyncMock(side_effect=_sync_once)
fake_client.join_room = AsyncMock()
fake_client.sync_store = mock_sync_store
fake_client.handle_sync = MagicMock(return_value=[])
adapter._client = fake_client
with patch.object(adapter, "_refresh_dm_cache", AsyncMock()):
await adapter._sync_loop()
fake_client.join_room.assert_awaited_once()
assert "!joined:example.org" in adapter._joined_rooms
assert "!invited:example.org" in adapter._joined_rooms
@pytest.mark.asyncio
async def test_dispatch_sync_accepts_async_handle_sync(self):
"""Some fake clients expose handle_sync as an async dispatcher."""
adapter = _make_adapter()
called = False
async def handle_sync(sync_data):
nonlocal called
called = sync_data["next_batch"] == "s1"
return []
adapter._client = types.SimpleNamespace(handle_sync=handle_sync)
await adapter._dispatch_sync({"next_batch": "s1"})
assert called is True
@pytest.mark.asyncio
async def test_sync_loop_dispatches_registered_room_message_handler(self):
"""Inbound sync data should flow through handle_sync into message handling."""
adapter = _make_adapter()
adapter._closing = False
adapter._user_id = "@bot:example.org"
adapter._startup_ts = time.time() - 10
adapter._dm_rooms = {"!dm:example.org": True}
adapter._text_batch_delay_seconds = 0
adapter._background_read_receipt = MagicMock()
captured = []
async def capture(event):
captured.append(event)
adapter.handle_message = capture
event = types.SimpleNamespace(
sender="@alice:example.org",
event_id="$dm1",
room_id="!dm:example.org",
timestamp=int(time.time() * 1000),
content={"msgtype": "m.text", "body": "hello"},
)
async def _sync_once(**kwargs):
adapter._closing = True
return {"rooms": {"join": {"!dm:example.org": {}}}, "next_batch": "s1234"}
mock_sync_store = MagicMock()
mock_sync_store.get_next_batch = AsyncMock(return_value=None)
mock_sync_store.put_next_batch = AsyncMock()
fake_client = MagicMock()
fake_client.sync = AsyncMock(side_effect=_sync_once)
fake_client.sync_store = mock_sync_store
fake_client.get_state_event = AsyncMock(side_effect=Exception("no state"))
fake_client.state_store = MagicMock()
fake_client.state_store.get_members = AsyncMock(return_value=["@bot:example.org", "@alice:example.org"])
fake_client.state_store.get_member = AsyncMock(return_value=None)
def handle_sync(sync_data):
return [asyncio.create_task(adapter._on_room_message(event))]
fake_client.handle_sync = MagicMock(side_effect=handle_sync)
adapter._client = fake_client
await adapter._sync_loop()
assert len(captured) == 1
assert captured[0].text == "hello"
assert captured[0].source.chat_type == "dm"
@pytest.mark.asyncio
async def test_connect_receives_dm_from_initial_sync_dispatch(self):
"""A DM delivered by initial sync should reach the message handler after connect."""
from gateway.platforms.matrix import MatrixAdapter
adapter = MatrixAdapter(
PlatformConfig(
enabled=True,
token="syt_test_access_token",
extra={
"homeserver": "https://matrix.example.org",
"user_id": "@bot:example.org",
"encryption": False,
},
)
)
adapter._text_batch_delay_seconds = 0
adapter._background_read_receipt = MagicMock()
captured = []
async def capture(event):
captured.append(event)
adapter.handle_message = capture
fake_mautrix_mods = _make_fake_mautrix()
mock_sync_store = MagicMock()
mock_sync_store.get_next_batch = AsyncMock(return_value=None)
mock_sync_store.put_next_batch = AsyncMock()
mock_client = MagicMock()
mock_client.mxid = "@bot:example.org"
mock_client.device_id = None
mock_client.crypto = None
mock_client.sync_store = mock_sync_store
mock_client.whoami = AsyncMock(return_value=MagicMock(user_id="@bot:example.org", device_id="DEV123"))
mock_client.sync = AsyncMock(return_value={
"rooms": {"join": {"!dm:example.org": {}}},
"next_batch": "s1",
})
mock_client.get_account_data = AsyncMock(
return_value=MagicMock(content={"@alice:example.org": ["!dm:example.org"]})
)
mock_client.get_state_event = AsyncMock(side_effect=Exception("no state"))
mock_client.state_store = MagicMock()
mock_client.state_store.get_members = AsyncMock(return_value=["@bot:example.org", "@alice:example.org"])
mock_client.state_store.get_member = AsyncMock(return_value=None)
mock_client.add_event_handler = MagicMock()
mock_client.add_dispatcher = MagicMock()
mock_client.api = MagicMock()
mock_client.api.token = "syt_test_access_token"
mock_client.api.session = MagicMock()
mock_client.api.session.close = AsyncMock()
event = types.SimpleNamespace(
sender="@alice:example.org",
event_id="$initial-dm",
room_id="!dm:example.org",
timestamp=int(time.time() * 1000),
content={"msgtype": "m.text", "body": "hello after connect"},
)
def handle_sync(sync_data):
return [asyncio.create_task(adapter._on_room_message(event))]
mock_client.handle_sync = MagicMock(side_effect=handle_sync)
fake_mautrix_mods["mautrix.client"].Client = MagicMock(return_value=mock_client)
from gateway.platforms import matrix as matrix_mod
with patch.dict("sys.modules", fake_mautrix_mods):
with patch.object(matrix_mod, "_create_matrix_session", return_value=MagicMock()):
with patch.object(adapter, "_sync_loop", AsyncMock(return_value=None)):
assert await adapter.connect() is True
assert len(captured) == 1
assert captured[0].text == "hello after connect"
assert captured[0].source.chat_type == "dm"
await adapter.disconnect()
@pytest.mark.asyncio
async def test_room_message_after_invite_join_is_received(self):
"""After invite reconciliation joins a room, later room messages dispatch."""
adapter = _make_adapter()
adapter._closing = False
adapter._user_id = "@bot:example.org"
adapter._startup_ts = time.time() - 10
adapter._require_mention = True
adapter._text_batch_delay_seconds = 0
adapter._background_read_receipt = MagicMock()
captured = []
async def capture(event):
captured.append(event)
adapter.handle_message = capture
sync_count = 0
async def _sync(**kwargs):
nonlocal sync_count
sync_count += 1
if sync_count == 1:
return {
"rooms": {"invite": {"!room:example.org": {}}},
"next_batch": "s1",
}
adapter._closing = True
return {
"rooms": {"join": {"!room:example.org": {}}},
"next_batch": "s2",
}
event = types.SimpleNamespace(
sender="@alice:example.org",
event_id="$room1",
room_id="!room:example.org",
timestamp=int(time.time() * 1000),
content={
"msgtype": "m.text",
"body": "@bot:example.org hello room",
"m.mentions": {"user_ids": ["@bot:example.org"]},
},
)
mock_sync_store = MagicMock()
mock_sync_store.get_next_batch = AsyncMock(return_value=None)
mock_sync_store.put_next_batch = AsyncMock()
fake_client = MagicMock()
fake_client.sync = AsyncMock(side_effect=_sync)
fake_client.join_room = AsyncMock()
fake_client.sync_store = mock_sync_store
fake_client.get_account_data = AsyncMock(return_value=MagicMock(content={}))
fake_client.get_state_event = AsyncMock(side_effect=Exception("no state"))
fake_client.state_store = MagicMock()
fake_client.state_store.get_members = AsyncMock(return_value=["@bot:example.org", "@alice:example.org"])
fake_client.state_store.get_member = AsyncMock(return_value=None)
def handle_sync(sync_data):
if sync_data["next_batch"] == "s2":
return [asyncio.create_task(adapter._on_room_message(event))]
return []
fake_client.handle_sync = MagicMock(side_effect=handle_sync)
adapter._client = fake_client
await adapter._sync_loop()
fake_client.join_room.assert_awaited_once()
assert "!room:example.org" in adapter._joined_rooms
assert len(captured) == 1
assert captured[0].source.chat_type == "group"
@pytest.mark.asyncio
async def test_seconds_timestamp_is_not_treated_as_milliseconds(self):
adapter = _make_adapter()
adapter._user_id = "@bot:example.org"
adapter._startup_ts = time.time() - 10
adapter._dm_rooms = {"!dm:example.org": True}
adapter._text_batch_delay_seconds = 0
adapter._background_read_receipt = MagicMock()
adapter._client = MagicMock()
adapter._client.get_state_event = AsyncMock(side_effect=Exception("no state"))
adapter._client.state_store = MagicMock()
adapter._client.state_store.get_members = AsyncMock(return_value=["@bot:example.org", "@alice:example.org"])
adapter._client.state_store.get_member = AsyncMock(return_value=None)
captured = []
async def capture(event):
captured.append(event)
adapter.handle_message = capture
event = types.SimpleNamespace(
sender="@alice:example.org",
event_id="$seconds",
room_id="!dm:example.org",
timestamp=time.time(),
content={"msgtype": "m.text", "body": "seconds ts"},
)
await adapter._on_room_message(event)
assert len(captured) == 1
class TestMatrixUploadAndSend:
@pytest.mark.asyncio
async def test_upload_unencrypted_room_uses_plain_url(self):
"""Unencrypted rooms should use plain 'url' key."""
adapter = _make_adapter()
adapter._encryption = True
mock_client = MagicMock()
mock_client.crypto = object()
mock_client.state_store = MagicMock()
mock_client.state_store.is_encrypted = AsyncMock(return_value=False)
mock_client.upload_media = AsyncMock(return_value="mxc://example.org/plain")
mock_client.send_message_event = AsyncMock(return_value="$event")
adapter._client = mock_client
result = await adapter._upload_and_send(
"!room:example.org", b"hello", "test.txt", "text/plain", "m.file",
)
assert result.success is True
sent = mock_client.send_message_event.await_args.args[2]
assert sent["url"] == "mxc://example.org/plain"
assert "file" not in sent
@pytest.mark.asyncio
async def test_upload_encrypted_room_uses_file_payload(self):
"""Encrypted rooms should use 'file' key with crypto metadata."""
adapter = _make_adapter()
adapter._encryption = True
mock_client = MagicMock()
mock_client.crypto = object()
mock_client.state_store = MagicMock()
mock_client.state_store.is_encrypted = AsyncMock(return_value=True)
mock_client.upload_media = AsyncMock(return_value="mxc://example.org/enc")
mock_client.send_message_event = AsyncMock(return_value="$event")
adapter._client = mock_client
with patch.dict("sys.modules", _make_fake_mautrix()):
result = await adapter._upload_and_send(
"!room:example.org", b"secret", "secret.txt", "text/plain", "m.file",
)
assert result.success is True
# Should have uploaded ciphertext, not plaintext
uploaded_data = mock_client.upload_media.await_args.args[0]
assert uploaded_data != b"secret"
sent = mock_client.send_message_event.await_args.args[2]
assert "url" not in sent
assert "file" in sent
assert sent["file"]["url"] == "mxc://example.org/enc"
@pytest.mark.asyncio
async def test_upload_rejects_oversized_file(self):
adapter = _make_adapter()
adapter._max_media_bytes = 4
adapter._client = MagicMock()
adapter._client.upload_media = AsyncMock()
result = await adapter._upload_and_send(
"!room:example.org",
b"too large",
"big.txt",
"text/plain",
"m.file",
)
assert result.success is False
assert "exceeds Matrix limit" in result.error
adapter._client.upload_media.assert_not_called()
@pytest.mark.asyncio
async def test_media_preserves_caption_and_thread(self):
adapter = _make_adapter()
mock_client = MagicMock()
mock_client.upload_media = AsyncMock(return_value="mxc://example.org/plain")
mock_client.send_message_event = AsyncMock(return_value="$event")
adapter._client = mock_client
result = await adapter._upload_and_send(
"!room:example.org",
b"image",
"chart.png",
"image/png",
"m.image",
caption="Chart caption",
metadata={"thread_id": "$root"},
)
assert result.success is True
sent = mock_client.send_message_event.await_args.args[2]
assert sent["body"] == "Chart caption"
assert sent["m.relates_to"]["rel_type"] == "m.thread"
assert sent["m.relates_to"]["event_id"] == "$root"
assert sent["m.relates_to"]["m.in_reply_to"] == {"event_id": "$root"}
@pytest.mark.asyncio
async def test_send_multiple_images_preserves_logical_batch_order_and_thread(self, tmp_path):
adapter = _make_adapter()
mock_client = MagicMock()
mock_client.upload_media = AsyncMock(side_effect=[
"mxc://example.org/one",
"mxc://example.org/two",
])
mock_client.send_message_event = AsyncMock(side_effect=["$one", "$two"])
adapter._client = mock_client
first = tmp_path / "one.png"
second = tmp_path / "two.png"
first.write_bytes(b"one")
second.write_bytes(b"two")
await adapter.send_multiple_images(
"!room:example.org",
[(f"file://{first}", "First image"), (f"file://{second}", "Second image")],
metadata={"thread_id": "$root"},
)
assert mock_client.send_message_event.await_count == 2
bodies = [call.args[2]["body"] for call in mock_client.send_message_event.await_args_list]
assert bodies == ["First image (1/2)", "Second image (2/2)"]
for call in mock_client.send_message_event.await_args_list:
sent = call.args[2]
assert sent["msgtype"] == "m.image"
assert sent["m.relates_to"]["event_id"] == "$root"
assert sent["m.relates_to"]["m.in_reply_to"] == {"event_id": "$root"}
class TestMatrixDiagnostics:
def test_diagnostics_redacts_credentials_and_reports_status(self, monkeypatch):
from gateway.platforms import matrix as matrix_mod
monkeypatch.setenv("MATRIX_RECOVERY_KEY", "secret recovery key")
adapter = _make_adapter()
adapter._access_token = "syt_super_secret"
adapter._password = "password"
adapter._user_id = "@bot:example.org"
adapter._device_id = "DEV123"
adapter._joined_rooms = {"!one:example.org", "!two:example.org"}
adapter._last_sync_ts = time.time() - 7
adapter._max_media_bytes = 123
adapter._client = MagicMock()
with patch.object(matrix_mod, "_check_e2ee_deps", return_value=True):
diagnostics = adapter.get_diagnostics()
assert diagnostics["auth"]["token_preview"] == "***"
assert "syt_super_secret" not in str(diagnostics)
assert "DEV123" not in str(diagnostics)
assert diagnostics["auth"]["device_id_present"] is True
assert diagnostics["auth"]["device_id_preview"] == "***"
assert diagnostics["sync"]["connected"] is True
assert diagnostics["sync"]["joined_room_count"] == 2
assert diagnostics["sync"]["last_sync_age_seconds"] >= 0
assert diagnostics["e2ee"]["recovery_key_configured"] is True
assert diagnostics["media"]["max_media_bytes"] == 123
def test_matrix_recovery_key_is_never_logged(self, caplog, monkeypatch):
from gateway.platforms.matrix import _handle_generated_matrix_recovery_key
secret = "super-secret-generated-recovery-key"
monkeypatch.delenv("MATRIX_RECOVERY_KEY_OUTPUT_FILE", raising=False)
_handle_generated_matrix_recovery_key("@bot:example.org", secret)
assert secret not in caplog.text
assert "will not be logged" in caplog.text
def test_matrix_recovery_key_output_file_is_0600(self, tmp_path, monkeypatch, caplog):
from gateway.platforms.matrix import _handle_generated_matrix_recovery_key
secret = "super-secret-generated-recovery-key"
output_path = tmp_path / "matrix-recovery-key.txt"
monkeypatch.setenv("MATRIX_RECOVERY_KEY_OUTPUT_FILE", str(output_path))
_handle_generated_matrix_recovery_key("@bot:example.org", secret)
assert output_path.read_text().strip() == secret
assert stat.S_IMODE(output_path.stat().st_mode) == 0o600
assert secret not in caplog.text
@pytest.mark.asyncio
async def test_matrix_recovery_key_bootstrap_skips_without_output_file(
self,
monkeypatch,
caplog,
):
from gateway.platforms.matrix import MatrixAdapter
monkeypatch.delenv("MATRIX_RECOVERY_KEY", raising=False)
monkeypatch.delenv("MATRIX_RECOVERY_KEY_OUTPUT_FILE", raising=False)
config = PlatformConfig(
enabled=True,
token="syt_test_token",
extra={
"homeserver": "https://matrix.example.org",
"user_id": "@bot:example.org",
"encryption": True,
},
)
adapter = MatrixAdapter(config)
fake_mautrix_mods = _make_fake_mautrix()
mock_client = MagicMock()
mock_client.mxid = "@bot:example.org"
mock_client.device_id = None
mock_client.state_store = MagicMock()
mock_client.sync_store = MagicMock()
mock_client.crypto = None
mock_client.whoami = AsyncMock(return_value=MagicMock(user_id="@bot:example.org", device_id="DEV123"))
mock_client.sync = AsyncMock(return_value={"rooms": {"join": {}}})
mock_client.add_event_handler = MagicMock()
mock_client.add_dispatcher = MagicMock()
mock_client.handle_sync = MagicMock(return_value=[])
mock_client.query_keys = AsyncMock(return_value={
"device_keys": {"@bot:example.org": {"DEV123": {
"keys": {"ed25519:DEV123": "fake_ed25519_key"},
}}},
})
mock_client.api = MagicMock()
mock_client.api.token = "syt_test_token"
mock_client.api.session = MagicMock()
mock_client.api.session.close = AsyncMock()
mock_olm = MagicMock()
mock_olm.load = AsyncMock()
mock_olm.share_keys = AsyncMock()
mock_olm.get_own_cross_signing_public_keys = AsyncMock(return_value=None)
mock_olm.generate_recovery_key = AsyncMock(return_value="super-secret-key")
mock_olm.share_keys_min_trust = None
mock_olm.send_keys_min_trust = None
mock_olm.account = MagicMock()
mock_olm.account.identity_keys = {"ed25519": "fake_ed25519_key"}
fake_mautrix_mods["mautrix.client"].Client = MagicMock(return_value=mock_client)
fake_mautrix_mods["mautrix.crypto"].OlmMachine = MagicMock(return_value=mock_olm)
from gateway.platforms import matrix as matrix_mod
with patch.object(matrix_mod, "_check_e2ee_deps", return_value=True):
with patch.dict("sys.modules", fake_mautrix_mods):
with patch.object(adapter, "_refresh_dm_cache", AsyncMock()):
with patch.object(adapter, "_sync_loop", AsyncMock(return_value=None)):
assert await adapter.connect() is True
mock_olm.generate_recovery_key.assert_not_called()
assert "MATRIX_RECOVERY_KEY_OUTPUT_FILE is not configured" in caplog.text
assert "super-secret-key" not in caplog.text
await adapter.disconnect()
@pytest.mark.asyncio
async def test_matrix_recovery_key_bootstrap_skips_existing_output_file(
self,
tmp_path,
monkeypatch,
caplog,
):
from gateway.platforms.matrix import MatrixAdapter
output_path = tmp_path / "matrix-recovery-key.txt"
output_path.write_text("existing\n")
monkeypatch.delenv("MATRIX_RECOVERY_KEY", raising=False)
monkeypatch.setenv("MATRIX_RECOVERY_KEY_OUTPUT_FILE", str(output_path))
config = PlatformConfig(
enabled=True,
token="syt_test_token",
extra={
"homeserver": "https://matrix.example.org",
"user_id": "@bot:example.org",
"encryption": True,
},
)
adapter = MatrixAdapter(config)
fake_mautrix_mods = _make_fake_mautrix()
mock_client = MagicMock()
mock_client.mxid = "@bot:example.org"
mock_client.device_id = None
mock_client.state_store = MagicMock()
mock_client.sync_store = MagicMock()
mock_client.crypto = None
mock_client.whoami = AsyncMock(return_value=MagicMock(user_id="@bot:example.org", device_id="DEV123"))
mock_client.sync = AsyncMock(return_value={"rooms": {"join": {}}})
mock_client.add_event_handler = MagicMock()
mock_client.add_dispatcher = MagicMock()
mock_client.handle_sync = MagicMock(return_value=[])
mock_client.query_keys = AsyncMock(return_value={
"device_keys": {"@bot:example.org": {"DEV123": {
"keys": {"ed25519:DEV123": "fake_ed25519_key"},
}}},
})
mock_client.api = MagicMock()
mock_client.api.token = "syt_test_token"
mock_client.api.session = MagicMock()
mock_client.api.session.close = AsyncMock()
mock_olm = MagicMock()
mock_olm.load = AsyncMock()
mock_olm.share_keys = AsyncMock()
mock_olm.get_own_cross_signing_public_keys = AsyncMock(return_value=None)
mock_olm.generate_recovery_key = AsyncMock(return_value="super-secret-key")
mock_olm.share_keys_min_trust = None
mock_olm.send_keys_min_trust = None
mock_olm.account = MagicMock()
mock_olm.account.identity_keys = {"ed25519": "fake_ed25519_key"}
fake_mautrix_mods["mautrix.client"].Client = MagicMock(return_value=mock_client)
fake_mautrix_mods["mautrix.crypto"].OlmMachine = MagicMock(return_value=mock_olm)
from gateway.platforms import matrix as matrix_mod
with patch.object(matrix_mod, "_check_e2ee_deps", return_value=True):
with patch.dict("sys.modules", fake_mautrix_mods):
with patch.object(adapter, "_refresh_dm_cache", AsyncMock()):
with patch.object(adapter, "_sync_loop", AsyncMock(return_value=None)):
assert await adapter.connect() is True
mock_olm.generate_recovery_key.assert_not_called()
assert "already exists" in caplog.text
assert "super-secret-key" not in caplog.text
assert output_path.read_text() == "existing\n"
await adapter.disconnect()
def test_matrix_diagnostics_redacts_recovery_key(self, monkeypatch):
monkeypatch.setenv("MATRIX_RECOVERY_KEY", "diagnostic-secret-recovery-key")
adapter = _make_adapter()
diagnostics = adapter.get_diagnostics()
assert diagnostics["e2ee"]["recovery_key_configured"] is True
assert "diagnostic-secret-recovery-key" not in str(diagnostics)
def test_capability_matrix_is_declared_for_docs(self):
from gateway.platforms.matrix import get_matrix_capabilities
capabilities = get_matrix_capabilities()
assert capabilities == {
"text": "yes",
"threads": "yes",
"reactions": "yes",
"approvals": "yes",
"model picker": "yes",
"thinking panes": "yes",
"images": "yes",
"multiple images": "yes",
"files": "yes",
"voice/audio": "yes",
"video": "yes",
"E2EE": "off / optional / required",
"diagnostics": "yes",
}
def test_matrix_capability_claims_match_adapter_surfaces(self):
from gateway.platforms.matrix import MatrixAdapter, get_matrix_capabilities
capabilities = get_matrix_capabilities()
required_methods = {
"text": "send",
"threads": "_apply_relation_metadata",
"reactions": "_send_reaction",
"approvals": "send_exec_approval",
"model picker": "send_model_picker",
"thinking panes": "edit_message",
"images": "send_image",
"multiple images": "send_multiple_images",
"files": "send_document",
"voice/audio": "send_voice",
"video": "send_video",
"diagnostics": "get_diagnostics",
}
for capability, method in required_methods.items():
assert capabilities[capability] == "yes"
assert hasattr(MatrixAdapter, method), f"{capability} needs {method}"
assert capabilities["E2EE"] == "off / optional / required"
def test_matrix_docs_capability_table_matches_declaration(self):
from pathlib import Path
from gateway.platforms.matrix import get_matrix_capabilities
docs = (
Path(__file__).resolve().parents[2]
/ "website"
/ "docs"
/ "user-guide"
/ "messaging"
/ "matrix.md"
).read_text()
for capability, status in get_matrix_capabilities().items():
assert f"| {capability} | {status} |" in docs
class TestMatrixEncryptedSendFallback:
@pytest.mark.asyncio
async def test_send_retries_after_e2ee_error(self):
"""send() should retry with crypto.share_keys() on E2EE errors."""
adapter = _make_adapter()
adapter._encryption = True
fake_client = MagicMock()
fake_client.send_message_event = AsyncMock(side_effect=[
Exception("encryption error"),
"$event123", # mautrix returns EventID string directly
])
mock_crypto = MagicMock()
mock_crypto.share_keys = AsyncMock()
fake_client.crypto = mock_crypto
adapter._client = fake_client
result = await adapter.send("!room:example.org", "hello")
assert result.success is True
assert result.message_id == "$event123"
mock_crypto.share_keys.assert_awaited_once()
assert fake_client.send_message_event.await_count == 2
# ---------------------------------------------------------------------------
# E2EE: _joined_rooms reference preservation for CryptoStateStore
# ---------------------------------------------------------------------------
class TestJoinedRoomsReference:
def test_joined_rooms_reference_preserved_after_reassignment(self):
"""_CryptoStateStore must see updates after initial sync populates rooms."""
from gateway.platforms.matrix import _CryptoStateStore
joined = set()
store = _CryptoStateStore(MagicMock(), joined)
# Simulate what connect() should do: mutate in place, not reassign.
joined.clear()
joined.update(["!room1:example.org", "!room2:example.org"])
import asyncio
rooms = asyncio.get_event_loop().run_until_complete(store.find_shared_rooms("@user:ex"))
assert set(rooms) == {"!room1:example.org", "!room2:example.org"}
# ---------------------------------------------------------------------------
# E2EE: connect registers encrypted event handler
# ---------------------------------------------------------------------------
class TestMatrixEncryptedEventHandler:
@pytest.mark.asyncio
async def test_connect_registers_encrypted_event_handler_when_encryption_on(self):
from gateway.platforms.matrix import MatrixAdapter
config = PlatformConfig(
enabled=True,
token="syt_test_token",
extra={
"homeserver": "https://matrix.example.org",
"user_id": "@bot:example.org",
"encryption": True,
},
)
adapter = MatrixAdapter(config)
fake_mautrix_mods = _make_fake_mautrix()
mock_client = MagicMock()
mock_client.mxid = "@bot:example.org"
mock_client.device_id = None
mock_client.state_store = MagicMock()
mock_client.sync_store = MagicMock()
mock_client.crypto = None # Will be set during connect
mock_client.whoami = AsyncMock(return_value=MagicMock(user_id="@bot:example.org", device_id="DEV123"))
mock_client.sync = AsyncMock(return_value={"rooms": {"join": {"!room:server": {}}}})
mock_client.add_event_handler = MagicMock()
mock_client.handle_sync = MagicMock(return_value=[])
mock_client.query_keys = AsyncMock(return_value={
"device_keys": {"@bot:example.org": {"DEV123": {
"keys": {"ed25519:DEV123": "fake_ed25519_key"},
}}},
})
mock_client.api = MagicMock()
mock_client.api.token = "syt_test_token"
mock_client.api.session = MagicMock()
mock_client.api.session.close = AsyncMock()
mock_olm = MagicMock()
mock_olm.load = AsyncMock()
mock_olm.share_keys = AsyncMock()
mock_olm.share_keys_min_trust = None
mock_olm.send_keys_min_trust = None
mock_olm.account = MagicMock()
mock_olm.account.identity_keys = {"ed25519": "fake_ed25519_key"}
fake_mautrix_mods["mautrix.client"].Client = MagicMock(return_value=mock_client)
fake_mautrix_mods["mautrix.crypto"].OlmMachine = MagicMock(return_value=mock_olm)
from gateway.platforms import matrix as matrix_mod
with patch.object(matrix_mod, "_check_e2ee_deps", return_value=True):
with patch.dict("sys.modules", fake_mautrix_mods):
with patch.object(adapter, "_refresh_dm_cache", AsyncMock()):
with patch.object(adapter, "_sync_loop", AsyncMock(return_value=None)):
assert await adapter.connect() is True
# Verify event handlers were registered.
# In mautrix the order is: add_event_handler(EventType, callback)
handler_calls = mock_client.add_event_handler.call_args_list
registered_types = [call.args[0] for call in handler_calls]
# Should have registered handlers for ROOM_MESSAGE, REACTION, INVITE
assert len(handler_calls) >= 3
await adapter.disconnect()
@pytest.mark.asyncio
async def test_connect_fails_on_stale_otk_conflict(self):
"""connect() must refuse E2EE when OTK upload hits 'already exists'."""
from gateway.platforms.matrix import MatrixAdapter
config = PlatformConfig(
enabled=True,
token="syt_test_token",
extra={
"homeserver": "https://matrix.example.org",
"user_id": "@bot:example.org",
"encryption": True,
},
)
adapter = MatrixAdapter(config)
fake_mautrix_mods = _make_fake_mautrix()
mock_client = MagicMock()
mock_client.mxid = "@bot:example.org"
mock_client.device_id = None
mock_client.state_store = MagicMock()
mock_client.sync_store = MagicMock()
mock_client.crypto = None
mock_client.whoami = AsyncMock(return_value=MagicMock(user_id="@bot:example.org", device_id="DEV123"))
mock_client.add_event_handler = MagicMock()
mock_client.add_dispatcher = MagicMock()
mock_client.query_keys = AsyncMock(return_value={
"device_keys": {"@bot:example.org": {"DEV123": {
"keys": {"ed25519:DEV123": "fake_ed25519_key"},
}}},
})
mock_client.api = MagicMock()
mock_client.api.token = "syt_test_token"
mock_client.api.session = MagicMock()
mock_client.api.session.close = AsyncMock()
# share_keys succeeds on first call (from _verify_device_keys_on_server),
# then raises "already exists" on the proactive OTK flush in connect().
mock_olm = MagicMock()
mock_olm.load = AsyncMock()
mock_olm.share_keys = AsyncMock(
side_effect=[None, Exception("One time key signed_curve25519:AAAAAQ already exists")]
)
mock_olm.share_keys_min_trust = None
mock_olm.send_keys_min_trust = None
mock_olm.account = MagicMock()
mock_olm.account.identity_keys = {"ed25519": "fake_ed25519_key"}
fake_mautrix_mods["mautrix.client"].Client = MagicMock(return_value=mock_client)
fake_mautrix_mods["mautrix.crypto"].OlmMachine = MagicMock(return_value=mock_olm)
from gateway.platforms import matrix as matrix_mod
with patch.object(matrix_mod, "_check_e2ee_deps", return_value=True):
with patch.dict("sys.modules", fake_mautrix_mods):
result = await adapter.connect()
assert result is False
# ---------------------------------------------------------------------------
# Disconnect
# ---------------------------------------------------------------------------
class TestMatrixDisconnect:
@pytest.mark.asyncio
async def test_disconnect_closes_api_session(self):
"""disconnect() should close client.api.session."""
adapter = _make_adapter()
adapter._sync_task = None
mock_session = MagicMock()
mock_session.close = AsyncMock()
mock_api = MagicMock()
mock_api.session = mock_session
fake_client = MagicMock()
fake_client.api = mock_api
adapter._client = fake_client
await adapter.disconnect()
mock_session.close.assert_awaited_once()
assert adapter._client is None
@pytest.mark.asyncio
async def test_disconnect_handles_session_close_failure(self):
"""disconnect() should not raise if session close fails."""
adapter = _make_adapter()
adapter._sync_task = None
mock_session = MagicMock()
mock_session.close = AsyncMock(side_effect=Exception("close failed"))
mock_api = MagicMock()
mock_api.session = mock_session
fake_client = MagicMock()
fake_client.api = mock_api
adapter._client = fake_client
# Should not raise
await adapter.disconnect()
assert adapter._client is None
@pytest.mark.asyncio
async def test_disconnect_without_client(self):
"""disconnect() should handle None client gracefully."""
adapter = _make_adapter()
adapter._sync_task = None
adapter._client = None
await adapter.disconnect()
assert adapter._client is None
# ---------------------------------------------------------------------------
# Markdown to HTML: security tests
# ---------------------------------------------------------------------------
class TestMatrixMarkdownHtmlSecurity:
"""Tests for HTML injection prevention in _markdown_to_html_fallback."""
def setup_method(self):
from gateway.platforms.matrix import MatrixAdapter
self.convert = MatrixAdapter._markdown_to_html_fallback
def test_script_injection_in_header(self):
result = self.convert("# <script>alert(1)</script>")
assert "<script>" not in result
assert "&lt;script&gt;" in result
def test_script_injection_in_plain_text(self):
result = self.convert("Hello <script>alert(1)</script>")
assert "<script>" not in result
def test_img_onerror_in_blockquote(self):
result = self.convert('> <img onerror="alert(1)">')
assert "onerror" not in result or "&lt;img" in result
def test_script_in_list_item(self):
result = self.convert("- <script>alert(1)</script>")
assert "<script>" not in result
def test_script_in_ordered_list(self):
result = self.convert("1. <script>alert(1)</script>")
assert "<script>" not in result
def test_javascript_uri_blocked(self):
result = self.convert("[click](javascript:alert(1))")
assert 'href="javascript:' not in result
def test_data_uri_blocked(self):
result = self.convert("[click](data:text/html,<script>)")
assert 'href="data:' not in result
def test_vbscript_uri_blocked(self):
result = self.convert("[click](vbscript:alert(1))")
assert 'href="vbscript:' not in result
def test_link_text_html_injection(self):
result = self.convert('[<img onerror="x">](http://safe.com)')
assert "<img" not in result or "&lt;img" in result
def test_link_href_attribute_breakout(self):
result = self.convert('[link](http://x" onclick="alert(1))')
assert "onclick" not in result or "&quot;" in result
def test_html_injection_in_bold(self):
result = self.convert("**<img onerror=alert(1)>**")
assert "<img" not in result or "&lt;img" in result
def test_html_injection_in_italic(self):
result = self.convert("*<script>alert(1)</script>*")
assert "<script>" not in result
# ---------------------------------------------------------------------------
# Markdown to HTML: extended formatting tests
# ---------------------------------------------------------------------------
class TestMatrixMarkdownHtmlFormatting:
"""Tests for new formatting capabilities in _markdown_to_html_fallback."""
def setup_method(self):
from gateway.platforms.matrix import MatrixAdapter
self.convert = MatrixAdapter._markdown_to_html_fallback
def test_fenced_code_block(self):
result = self.convert('```python\ndef hello():\n pass\n```')
assert "<pre><code" in result
assert "language-python" in result
def test_fenced_code_block_no_lang(self):
result = self.convert('```\nsome code\n```')
assert "<pre><code>" in result
def test_code_block_html_escaped(self):
result = self.convert('```\n<script>alert(1)</script>\n```')
assert "&lt;script&gt;" in result
assert "<script>" not in result
def test_headers(self):
assert "<h1>" in self.convert("# H1")
assert "<h2>" in self.convert("## H2")
assert "<h3>" in self.convert("### H3")
def test_unordered_list(self):
result = self.convert("- One\n- Two\n- Three")
assert "<ul>" in result
assert result.count("<li>") == 3
def test_ordered_list(self):
result = self.convert("1. First\n2. Second")
assert "<ol>" in result
assert result.count("<li>") == 2
def test_blockquote(self):
result = self.convert("> A quote\n> continued")
assert "<blockquote>" in result
assert "A quote" in result
def test_horizontal_rule(self):
assert "<hr>" in self.convert("---")
assert "<hr>" in self.convert("***")
def test_strikethrough(self):
result = self.convert("~~deleted~~")
assert "<del>deleted</del>" in result
def test_links_preserved(self):
result = self.convert("[text](https://example.com)")
assert '<a href="https://example.com">text</a>' in result
def test_complex_mixed_document(self):
"""A realistic agent response with multiple formatting types."""
text = "## Summary\n\nHere's what I found:\n\n- **Bold item**\n- `code` item\n\n```bash\necho hello\n```\n\n1. Step one\n2. Step two"
result = self.convert(text)
assert "<h2>" in result
assert "<strong>" in result
assert "<code>" in result
assert "<ul>" in result
assert "<ol>" in result
assert "<pre><code" in result
# ---------------------------------------------------------------------------
# Link URL sanitization
# ---------------------------------------------------------------------------
class TestMatrixLinkSanitization:
def test_safe_https_url(self):
from gateway.platforms.matrix import MatrixAdapter
assert MatrixAdapter._sanitize_link_url("https://example.com") == "https://example.com"
def test_javascript_blocked(self):
from gateway.platforms.matrix import MatrixAdapter
assert MatrixAdapter._sanitize_link_url("javascript:alert(1)") == ""
def test_data_blocked(self):
from gateway.platforms.matrix import MatrixAdapter
assert MatrixAdapter._sanitize_link_url("data:text/html,bad") == ""
def test_vbscript_blocked(self):
from gateway.platforms.matrix import MatrixAdapter
assert MatrixAdapter._sanitize_link_url("vbscript:bad") == ""
def test_quotes_escaped(self):
from gateway.platforms.matrix import MatrixAdapter
result = MatrixAdapter._sanitize_link_url('http://x"y')
assert '"' not in result
assert "&quot;" in result
# ---------------------------------------------------------------------------
# Reactions
# ---------------------------------------------------------------------------
class TestMatrixReactions:
def setup_method(self):
self.adapter = _make_adapter()
@pytest.mark.asyncio
async def test_send_reaction(self):
"""_send_reaction should call send_message_event with m.reaction."""
mock_client = MagicMock()
# mautrix send_message_event returns EventID string directly
mock_client.send_message_event = AsyncMock(return_value="$reaction1")
self.adapter._client = mock_client
result = await self.adapter._send_reaction("!room:ex", "$event1", "\U0001f44d")
assert result == "$reaction1"
mock_client.send_message_event.assert_called_once()
call_args = mock_client.send_message_event.call_args
content = call_args.args[2] if len(call_args.args) > 2 else call_args.kwargs.get("content")
assert content["m.relates_to"]["rel_type"] == "m.annotation"
assert content["m.relates_to"]["key"] == "\U0001f44d"
@pytest.mark.asyncio
async def test_send_reaction_no_client(self):
self.adapter._client = None
result = await self.adapter._send_reaction("!room:ex", "$ev", "\U0001f44d")
assert result is None
@pytest.mark.asyncio
async def test_on_processing_start_sends_eyes(self):
"""on_processing_start should send eyes reaction."""
from gateway.platforms.base import MessageEvent, MessageType
self.adapter._reactions_enabled = True
self.adapter._send_reaction = AsyncMock(return_value="$reaction_event_123")
source = MagicMock()
source.chat_id = "!room:ex"
event = MessageEvent(
text="hello",
message_type=MessageType.TEXT,
source=source,
raw_message={},
message_id="$msg1",
)
await self.adapter.on_processing_start(event)
self.adapter._send_reaction.assert_called_once_with("!room:ex", "$msg1", "\U0001f440")
assert self.adapter._pending_reactions == {("!room:ex", "$msg1"): "$reaction_event_123"}
@pytest.mark.asyncio
async def test_on_processing_complete_sends_check(self):
from gateway.platforms.base import MessageEvent, MessageType, ProcessingOutcome
self.adapter._reactions_enabled = True
self.adapter._reaction_redaction_delay_seconds = 0.01
self.adapter._pending_reactions = {("!room:ex", "$msg1"): "$eyes_reaction_123"}
self.adapter._redact_reaction = AsyncMock(return_value=True)
self.adapter._send_reaction = AsyncMock(return_value="$check_reaction_456")
source = MagicMock()
source.chat_id = "!room:ex"
event = MessageEvent(
text="hello",
message_type=MessageType.TEXT,
source=source,
raw_message={},
message_id="$msg1",
)
await self.adapter.on_processing_complete(event, ProcessingOutcome.SUCCESS)
self.adapter._redact_reaction.assert_not_awaited()
self.adapter._send_reaction.assert_called_once_with("!room:ex", "$msg1", "\u2705")
await asyncio.sleep(0.03)
self.adapter._redact_reaction.assert_awaited_once_with(
"!room:ex",
"$eyes_reaction_123",
"processing complete",
)
@pytest.mark.asyncio
async def test_on_processing_complete_sends_cross_on_failure(self):
from gateway.platforms.base import MessageEvent, MessageType, ProcessingOutcome
self.adapter._reactions_enabled = True
self.adapter._reaction_redaction_delay_seconds = 0.01
self.adapter._pending_reactions = {("!room:ex", "$msg1"): "$eyes_reaction_123"}
self.adapter._redact_reaction = AsyncMock(return_value=True)
self.adapter._send_reaction = AsyncMock(return_value="$cross_reaction_456")
source = MagicMock()
source.chat_id = "!room:ex"
event = MessageEvent(
text="hello",
message_type=MessageType.TEXT,
source=source,
raw_message={},
message_id="$msg1",
)
await self.adapter.on_processing_complete(event, ProcessingOutcome.FAILURE)
self.adapter._redact_reaction.assert_not_awaited()
self.adapter._send_reaction.assert_called_once_with("!room:ex", "$msg1", "\u274c")
await asyncio.sleep(0.03)
self.adapter._redact_reaction.assert_awaited_once_with(
"!room:ex",
"$eyes_reaction_123",
"processing complete",
)
@pytest.mark.asyncio
async def test_on_processing_complete_cancelled_sends_no_terminal_reaction(self):
from gateway.platforms.base import MessageEvent, MessageType, ProcessingOutcome
self.adapter._reactions_enabled = True
self.adapter._send_reaction = AsyncMock(return_value=True)
source = MagicMock()
source.chat_id = "!room:ex"
event = MessageEvent(
text="hello",
message_type=MessageType.TEXT,
source=source,
raw_message={},
message_id="$msg1",
)
await self.adapter.on_processing_complete(event, ProcessingOutcome.CANCELLED)
self.adapter._send_reaction.assert_not_called()
@pytest.mark.asyncio
async def test_on_processing_complete_no_pending_reaction(self):
"""on_processing_complete should skip redaction if no eyes reaction was tracked."""
from gateway.platforms.base import MessageEvent, MessageType, ProcessingOutcome
self.adapter._reactions_enabled = True
self.adapter._pending_reactions = {}
self.adapter._redact_reaction = AsyncMock()
self.adapter._send_reaction = AsyncMock(return_value="$check_reaction_789")
source = MagicMock()
source.chat_id = "!room:ex"
event = MessageEvent(
text="hello",
message_type=MessageType.TEXT,
source=source,
raw_message={},
message_id="$msg1",
)
await self.adapter.on_processing_complete(event, ProcessingOutcome.SUCCESS)
self.adapter._redact_reaction.assert_not_called()
self.adapter._send_reaction.assert_called_once_with("!room:ex", "$msg1", "\u2705")
@pytest.mark.asyncio
async def test_approval_reaction_cleanup_is_delayed(self):
"""Bot approval reaction redactions should not run inline."""
self.adapter._reaction_redaction_delay_seconds = 0.01
self.adapter._redact_reaction = AsyncMock(return_value=True)
prompt = MagicMock()
prompt.bot_reaction_events = {
"\u2705": "$allow_reaction",
"\u274e": "$deny_reaction",
}
await self.adapter._redact_bot_approval_reactions("!room:ex", prompt)
self.adapter._redact_reaction.assert_not_awaited()
await asyncio.sleep(0.03)
self.adapter._redact_reaction.assert_any_await(
"!room:ex",
"$allow_reaction",
"approval resolved",
)
self.adapter._redact_reaction.assert_any_await(
"!room:ex",
"$deny_reaction",
"approval resolved",
)
@pytest.mark.asyncio
async def test_reactions_disabled(self):
from gateway.platforms.base import MessageEvent, MessageType
self.adapter._reactions_enabled = False
self.adapter._send_reaction = AsyncMock()
source = MagicMock()
source.chat_id = "!room:ex"
event = MessageEvent(
text="hello",
message_type=MessageType.TEXT,
source=source,
raw_message={},
message_id="$msg1",
)
await self.adapter.on_processing_start(event)
self.adapter._send_reaction.assert_not_called()
# ---------------------------------------------------------------------------
# Read receipts
# ---------------------------------------------------------------------------
class TestMatrixReadReceipts:
def setup_method(self):
self.adapter = _make_adapter()
@pytest.mark.asyncio
async def test_accepted_message_schedules_read_receipt(self):
self.adapter._is_dm_room = AsyncMock(return_value=True)
self.adapter._get_display_name = AsyncMock(return_value="Alice")
self.adapter._background_read_receipt = MagicMock()
ctx = await self.adapter._resolve_message_context(
room_id="!room:ex",
sender="@alice:ex",
event_id="$event1",
body="hello",
source_content={"body": "hello"},
relates_to={},
)
assert ctx is not None
self.adapter._background_read_receipt.assert_called_once_with(
"!room:ex", "$event1"
)
@pytest.mark.asyncio
async def test_send_read_receipt(self):
"""send_read_receipt should call mautrix's real read-marker API."""
mock_client = MagicMock()
mock_client.set_fully_read_marker = AsyncMock(return_value=None)
self.adapter._client = mock_client
result = await self.adapter.send_read_receipt("!room:ex", "$event1")
assert result is True
mock_client.set_fully_read_marker.assert_awaited_once_with(
"!room:ex", "$event1", "$event1"
)
@pytest.mark.asyncio
async def test_send_read_receipt_falls_back_to_receipt_only(self):
"""send_read_receipt should still work with clients lacking read markers."""
mock_client = MagicMock(spec=["send_receipt"])
mock_client.send_receipt = AsyncMock(return_value=None)
self.adapter._client = mock_client
result = await self.adapter.send_read_receipt("!room:ex", "$event1")
assert result is True
mock_client.send_receipt.assert_awaited_once_with("!room:ex", "$event1")
@pytest.mark.asyncio
async def test_read_receipt_no_client(self):
self.adapter._client = None
result = await self.adapter.send_read_receipt("!room:ex", "$event1")
assert result is False
# ---------------------------------------------------------------------------
# Media normalization
# ---------------------------------------------------------------------------
class TestMatrixImageOnlyMediaNormalization:
def setup_method(self):
self.adapter = _make_adapter()
self.adapter._client = MagicMock()
self.adapter._client.download_media = AsyncMock(return_value=None)
self.adapter._is_dm_room = AsyncMock(return_value=True)
self.adapter._get_display_name = AsyncMock(return_value="Alice")
self.adapter._background_read_receipt = MagicMock()
self.adapter._mxc_to_http = (
lambda url: "https://matrix.example.org/_matrix/media/v3/download/example/30.png"
)
@pytest.mark.asyncio
async def test_image_only_filename_body_is_not_forwarded_as_text(self):
captured_event = None
async def capture(msg_event):
nonlocal captured_event
captured_event = msg_event
self.adapter.handle_message = capture
await self.adapter._handle_media_message(
room_id="!room:example.org",
sender="@alice:example.org",
event_id="$image1",
event_ts=0.0,
source_content={
"msgtype": "m.image",
"body": "30.png",
"url": "mxc://example/30.png",
"info": {"mimetype": "image/png"},
},
relates_to={},
msgtype="m.image",
)
assert captured_event is not None
assert captured_event.text == ""
assert captured_event.media_urls == [
"https://matrix.example.org/_matrix/media/v3/download/example/30.png"
]
assert captured_event.message_type == MessageType.PHOTO
@pytest.mark.asyncio
async def test_image_caption_text_is_preserved(self):
captured_event = None
async def capture(msg_event):
nonlocal captured_event
captured_event = msg_event
self.adapter.handle_message = capture
await self.adapter._handle_media_message(
room_id="!room:example.org",
sender="@alice:example.org",
event_id="$image2",
event_ts=0.0,
source_content={
"msgtype": "m.image",
"body": "Please describe this chart",
"url": "mxc://example/30.png",
"info": {"mimetype": "image/png"},
},
relates_to={},
msgtype="m.image",
)
assert captured_event is not None
assert captured_event.text == "Please describe this chart"
@pytest.mark.asyncio
async def test_inbound_oversized_media_is_rejected(self):
captured_event = None
async def capture(msg_event):
nonlocal captured_event
captured_event = msg_event
self.adapter._max_media_bytes = 10
self.adapter.handle_message = capture
await self.adapter._handle_media_message(
room_id="!room:example.org",
sender="@alice:example.org",
event_id="$image-big",
event_ts=0.0,
source_content={
"msgtype": "m.image",
"body": "huge.png",
"url": "mxc://example/huge.png",
"info": {"mimetype": "image/png", "size": 11},
},
relates_to={},
msgtype="m.image",
)
assert captured_event is None
self.adapter._client.download_media.assert_not_called()
@pytest.mark.asyncio
async def test_external_media_download_rejects_oversized_content_length(self, monkeypatch):
import aiohttp
class _Content:
async def iter_chunked(self, _size):
yield b"x"
class _Response:
url = "https://example.com/image.png"
headers = {"Content-Length": "11"}
content_type = "image/png"
content = _Content()
async def __aenter__(self):
return self
async def __aexit__(self, *_args):
return None
def raise_for_status(self):
return None
class _Session:
async def __aenter__(self):
return self
async def __aexit__(self, *_args):
return None
def get(self, *_args, **_kwargs):
return _Response()
self.adapter._max_media_bytes = 10
monkeypatch.setattr(aiohttp, "ClientSession", lambda **_kwargs: _Session())
with pytest.raises(ValueError, match="exceeds Matrix limit"):
await self.adapter._download_external_media_with_cap(
"https://example.com/image.png"
)
@pytest.mark.asyncio
async def test_external_media_download_rejects_oversized_stream(self, monkeypatch):
import aiohttp
class _Content:
async def iter_chunked(self, _size):
yield b"12345"
yield b"67890"
yield b"!"
class _Response:
url = "https://example.com/image.png"
headers = {}
content_type = "image/png"
content = _Content()
async def __aenter__(self):
return self
async def __aexit__(self, *_args):
return None
def raise_for_status(self):
return None
class _Session:
async def __aenter__(self):
return self
async def __aexit__(self, *_args):
return None
def get(self, *_args, **_kwargs):
return _Response()
self.adapter._max_media_bytes = 10
monkeypatch.setattr(aiohttp, "ClientSession", lambda **_kwargs: _Session())
with pytest.raises(ValueError, match="exceeds Matrix limit"):
await self.adapter._download_external_media_with_cap(
"https://example.com/image.png"
)
@pytest.mark.asyncio
async def test_external_media_download_rejects_unsafe_redirect(self, monkeypatch):
import aiohttp
class _Content:
async def iter_chunked(self, _size):
yield b"ok"
class _Response:
url = "http://127.0.0.1/private.png"
headers = {}
content_type = "image/png"
content = _Content()
async def __aenter__(self):
return self
async def __aexit__(self, *_args):
return None
def raise_for_status(self):
return None
class _Session:
async def __aenter__(self):
return self
async def __aexit__(self, *_args):
return None
def get(self, *_args, **_kwargs):
return _Response()
monkeypatch.setattr(aiohttp, "ClientSession", lambda **_kwargs: _Session())
with pytest.raises(ValueError, match="unsafe redirect"):
await self.adapter._download_external_media_with_cap(
"https://example.com/image.png"
)
@pytest.mark.asyncio
async def test_external_media_download_rejects_unsafe_initial_url(self):
with pytest.raises(ValueError, match="unsafe media URL"):
await self.adapter._download_external_media_with_cap(
"file:///etc/passwd"
)
@pytest.mark.asyncio
async def test_external_media_download_rejects_non_image_content_type(self, monkeypatch):
import aiohttp
class _Content:
async def iter_chunked(self, _size):
yield b"<html></html>"
class _Response:
url = "https://example.com/image.png"
headers = {}
content_type = "text/html"
content = _Content()
async def __aenter__(self):
return self
async def __aexit__(self, *_args):
return None
def raise_for_status(self):
return None
class _Session:
async def __aenter__(self):
return self
async def __aexit__(self, *_args):
return None
def get(self, *_args, **_kwargs):
return _Response()
monkeypatch.setattr(aiohttp, "ClientSession", lambda **_kwargs: _Session())
with pytest.raises(ValueError, match="not an image"):
await self.adapter._download_external_media_with_cap(
"https://example.com/image.png"
)
@pytest.mark.asyncio
async def test_send_image_failure_log_redacts_signed_url(self, caplog):
from gateway.platforms.base import SendResult
signed_url = "https://example.com/image.png?signature=secret-token#frag"
self.adapter._download_external_media_with_cap = AsyncMock(
side_effect=ValueError("download failed")
)
self.adapter.send = AsyncMock(return_value=SendResult(success=True))
await self.adapter.send_image("!room:example.org", signed_url)
assert "https://example.com/image.png" in caplog.text
assert "secret-token" not in caplog.text
assert "#frag" not in caplog.text
@pytest.mark.asyncio
async def test_send_image_failure_response_does_not_expose_signed_url_query(self):
from gateway.platforms.base import SendResult
signed_url = "https://example.com/image.png?signature=secret-token"
self.adapter._download_external_media_with_cap = AsyncMock(
side_effect=ValueError("download failed")
)
self.adapter.send = AsyncMock(return_value=SendResult(success=True))
await self.adapter.send_image("!room:example.org", signed_url)
sent_text = self.adapter.send.await_args.args[1]
assert "signature=" not in sent_text
assert "secret-token" not in sent_text
assert signed_url not in sent_text
assert "source URL was not shown" in sent_text
@pytest.mark.asyncio
async def test_send_image_failure_response_does_not_expose_signed_url_fragment(self):
from gateway.platforms.base import SendResult
signed_url = "https://example.com/image.png#fragment-secret"
self.adapter._download_external_media_with_cap = AsyncMock(
side_effect=ValueError("download failed")
)
self.adapter.send = AsyncMock(return_value=SendResult(success=True))
await self.adapter.send_image("!room:example.org", signed_url)
sent_text = self.adapter.send.await_args.args[1]
assert "#fragment-secret" not in sent_text
assert "fragment-secret" not in sent_text
assert signed_url not in sent_text
assert "source URL was not shown" in sent_text
@pytest.mark.asyncio
async def test_send_image_failure_response_preserves_caption(self):
from gateway.platforms.base import SendResult
signed_url = "https://example.com/image.png?signature=secret-token#fragment"
self.adapter._download_external_media_with_cap = AsyncMock(
side_effect=ValueError("download failed")
)
self.adapter.send = AsyncMock(return_value=SendResult(success=True))
await self.adapter.send_image(
"!room:example.org",
signed_url,
caption="Here is the image",
)
sent_text = self.adapter.send.await_args.args[1]
assert "Here is the image" in sent_text
assert "signature=" not in sent_text
assert "secret-token" not in sent_text
assert "#fragment" not in sent_text
assert signed_url not in sent_text
@pytest.mark.asyncio
async def test_send_image_failure_log_still_redacts_signed_url(self, caplog):
from gateway.platforms.base import SendResult
signed_url = "https://example.com/image.png?signature=secret-token#fragment"
self.adapter._download_external_media_with_cap = AsyncMock(
side_effect=ValueError("download failed")
)
self.adapter.send = AsyncMock(return_value=SendResult(success=True))
await self.adapter.send_image("!room:example.org", signed_url)
assert "https://example.com/image.png" in caplog.text
assert "signature=" not in caplog.text
assert "secret-token" not in caplog.text
assert "#fragment" not in caplog.text
@pytest.mark.asyncio
async def test_inbound_non_mxc_media_url_is_rejected(self):
captured_event = None
async def capture(msg_event):
nonlocal captured_event
captured_event = msg_event
self.adapter.handle_message = capture
await self.adapter._handle_media_message(
room_id="!room:example.org",
sender="@alice:example.org",
event_id="$image-http",
event_ts=0.0,
source_content={
"msgtype": "m.image",
"body": "remote.png",
"url": "https://evil.example.org/remote.png",
"info": {"mimetype": "image/png", "size": 1},
},
relates_to={},
msgtype="m.image",
)
assert captured_event is None
self.adapter._client.download_media.assert_not_called()
@pytest.mark.asyncio
async def test_inbound_encrypted_non_mxc_media_url_is_rejected(self):
captured_event = None
async def capture(msg_event):
nonlocal captured_event
captured_event = msg_event
self.adapter.handle_message = capture
await self.adapter._handle_media_message(
room_id="!room:example.org",
sender="@alice:example.org",
event_id="$image-enc-http",
event_ts=0.0,
source_content={
"msgtype": "m.image",
"body": "remote.png",
"file": {"url": "https://evil.example.org/remote.png"},
"info": {"mimetype": "image/png", "size": 1},
},
relates_to={},
msgtype="m.image",
)
assert captured_event is None
self.adapter._client.download_media.assert_not_called()
# ---------------------------------------------------------------------------
# Message redaction
# ---------------------------------------------------------------------------
class TestMatrixRedaction:
def setup_method(self):
self.adapter = _make_adapter()
@pytest.mark.asyncio
async def test_redact_message(self):
"""redact_message should call client.redact()."""
mock_client = MagicMock()
# mautrix redact() returns EventID string
mock_client.redact = AsyncMock(return_value="$redact_event")
self.adapter._client = mock_client
result = await self.adapter.redact_message("!room:ex", "$ev1", "oops")
assert result is True
mock_client.redact.assert_called_once()
@pytest.mark.asyncio
async def test_redact_no_client(self):
self.adapter._client = None
result = await self.adapter.redact_message("!room:ex", "$ev1")
assert result is False
# ---------------------------------------------------------------------------
# Room creation & invite
# ---------------------------------------------------------------------------
class TestMatrixRoomManagement:
def setup_method(self):
self.adapter = _make_adapter()
@pytest.mark.asyncio
async def test_create_room(self):
"""create_room should call client.create_room() returning RoomID string."""
mock_client = MagicMock()
# mautrix create_room returns RoomID string directly
mock_client.create_room = AsyncMock(return_value="!new:example.org")
self.adapter._client = mock_client
room_id = await self.adapter.create_room(name="Test Room", topic="A test")
assert room_id == "!new:example.org"
assert "!new:example.org" in self.adapter._joined_rooms
@pytest.mark.asyncio
async def test_invite_user(self):
"""invite_user should call client.invite_user()."""
mock_client = MagicMock()
mock_client.invite_user = AsyncMock(return_value=None)
self.adapter._client = mock_client
result = await self.adapter.invite_user("!room:ex", "@user:ex")
assert result is True
@pytest.mark.asyncio
async def test_create_room_no_client(self):
self.adapter._client = None
result = await self.adapter.create_room()
assert result is None
# ---------------------------------------------------------------------------
# Presence
# ---------------------------------------------------------------------------
class TestMatrixPresence:
def setup_method(self):
self.adapter = _make_adapter()
@pytest.mark.asyncio
async def test_set_presence_valid(self):
mock_client = MagicMock()
mock_client.set_presence = AsyncMock()
self.adapter._client = mock_client
result = await self.adapter.set_presence("online")
assert result is True
@pytest.mark.asyncio
async def test_set_presence_invalid_state(self):
mock_client = MagicMock()
self.adapter._client = mock_client
result = await self.adapter.set_presence("busy")
assert result is False
@pytest.mark.asyncio
async def test_set_presence_no_client(self):
self.adapter._client = None
result = await self.adapter.set_presence("online")
assert result is False
# ---------------------------------------------------------------------------
# Self / bridge / system sender filtering — regression coverage for #15763
# ("Hall of Mirrors": recursive pairing / echo loops triggered by bridge
# or bot-self senders bypassing the early-drop guard in _on_room_message).
# ---------------------------------------------------------------------------
class TestMatrixSelfSenderFilter:
def setup_method(self):
self.adapter = _make_adapter()
def test_exact_match_is_self(self):
self.adapter._user_id = "@bot:example.org"
assert self.adapter._is_self_sender("@bot:example.org") is True
def test_case_insensitive_match_is_self(self):
# Some homeservers canonicalize the localpart differently at
# different API surfaces — a case-sensitive equality check lets
# the bot's own sender through and triggers the pairing / echo
# loop in #15763.
self.adapter._user_id = "@Bot:Example.ORG"
assert self.adapter._is_self_sender("@bot:example.org") is True
assert self.adapter._is_self_sender("@BOT:EXAMPLE.ORG") is True
def test_whitespace_trimmed(self):
self.adapter._user_id = "@bot:example.org"
assert self.adapter._is_self_sender(" @bot:example.org ") is True
def test_different_user_is_not_self(self):
self.adapter._user_id = "@bot:example.org"
assert self.adapter._is_self_sender("@alice:example.org") is False
def test_empty_user_id_is_treated_as_self(self):
# If whoami hasn't resolved yet (or login failed), we cannot
# prove a sender is NOT us. Defensively drop rather than leak
# our own outbound traffic into the agent loop.
self.adapter._user_id = ""
assert self.adapter._is_self_sender("@alice:example.org") is True
assert self.adapter._is_self_sender("") is True
class TestMatrixSystemBridgeFilter:
def setup_method(self):
self.adapter = _make_adapter()
def test_appservice_underscore_prefix_is_bridge(self):
# Conventional appservice namespace puppets
assert self.adapter._is_system_or_bridge_sender(
"@_telegram_12345:bridge.example.org"
) is True
assert self.adapter._is_system_or_bridge_sender(
"@_discord_999:example.org"
) is True
assert self.adapter._is_system_or_bridge_sender(
"@_slackbridge_puppet:example.org"
) is True
def test_empty_localpart_is_system(self):
assert self.adapter._is_system_or_bridge_sender("@:server.example") is True
def test_empty_sender_is_system(self):
assert self.adapter._is_system_or_bridge_sender("") is True
assert self.adapter._is_system_or_bridge_sender(" ") is True
def test_regular_user_is_not_bridge(self):
assert self.adapter._is_system_or_bridge_sender(
"@alice:example.org"
) is False
# A user whose localpart merely CONTAINS an underscore is not a
# bridge — the convention is a LEADING underscore.
assert self.adapter._is_system_or_bridge_sender(
"@alice_smith:example.org"
) is False
def test_bot_account_is_not_bridge(self):
# The Hermes bot itself (no leading underscore) must not be
# classified as a bridge — that filter is a pairing guard, not
# a self-filter.
assert self.adapter._is_system_or_bridge_sender(
"@daemon:nerdworks.casa"
) is False
class TestMatrixOnRoomMessageFilter:
"""End-to-end coverage of _on_room_message drop conditions."""
def setup_method(self):
self.adapter = _make_adapter()
self.adapter._user_id = "@bot:example.org"
self.adapter._startup_ts = 0.0 # accept any event_ts
self.adapter._handle_text_message = AsyncMock()
self.adapter._handle_media_message = AsyncMock()
@staticmethod
def _mk_event(sender, body="hi", msgtype="m.text", event_id=None, ts=None, room_id=None):
import time as _t
ev = MagicMock()
ev.room_id = room_id or "!room:example.org"
ev.sender = sender
ev.event_id = event_id or f"$evt-{sender}-{body}"
ev.timestamp = int((ts or _t.time()) * 1000)
ev.server_timestamp = ev.timestamp
ev.content = {"msgtype": msgtype, "body": body}
return ev
@pytest.mark.asyncio
async def test_own_sender_case_insensitive_dropped(self):
# Simulate whoami returning a differently-cased copy of our MXID.
self.adapter._user_id = "@Bot:Example.ORG"
ev = self._mk_event(sender="@bot:example.org")
await self.adapter._on_room_message(ev)
self.adapter._handle_text_message.assert_not_called()
@pytest.mark.asyncio
async def test_bridge_sender_dropped_before_pairing(self):
ev = self._mk_event(sender="@_telegram_12345:bridge.example.org")
await self.adapter._on_room_message(ev)
# Bridge / appservice identities must never flow through to the
# gateway — otherwise they trigger pairing (#15763).
self.adapter._handle_text_message.assert_not_called()
@pytest.mark.asyncio
async def test_empty_sender_dropped(self):
ev = self._mk_event(sender="")
await self.adapter._on_room_message(ev)
self.adapter._handle_text_message.assert_not_called()
@pytest.mark.asyncio
async def test_self_with_unresolved_user_id_dropped(self):
# whoami has not resolved yet → user_id empty → drop ALL traffic
# defensively rather than risk echoing our own outbound messages.
self.adapter._user_id = ""
ev = self._mk_event(sender="@alice:example.org")
await self.adapter._on_room_message(ev)
self.adapter._handle_text_message.assert_not_called()
@pytest.mark.asyncio
async def test_regular_user_reaches_text_handler(self):
ev = self._mk_event(sender="@alice:example.org", body="hello bot")
await self.adapter._on_room_message(ev)
self.adapter._handle_text_message.assert_awaited_once()
@pytest.mark.asyncio
async def test_unauthorized_user_reaches_text_handler(self):
"""MATRIX_ALLOWED_USERS is enforced by gateway authz, not adapter intake."""
self.adapter._allowed_user_ids = {"@alice:example.org"}
ev = self._mk_event(sender="@mallory:example.org", body="hello bot")
await self.adapter._on_room_message(ev)
self.adapter._handle_text_message.assert_awaited_once()
@pytest.mark.asyncio
async def test_authorized_user_reaches_text_handler(self):
self.adapter._allowed_user_ids = {"@alice:example.org"}
ev = self._mk_event(sender="@alice:example.org", body="hello bot")
await self.adapter._on_room_message(ev)
self.adapter._handle_text_message.assert_awaited_once()
@pytest.mark.asyncio
async def test_unauthorized_room_is_dropped(self):
self.adapter._allowed_room_ids = {"!allowed:example.org"}
self.adapter._is_dm_room = AsyncMock(return_value=False)
ev = self._mk_event(
sender="@alice:example.org",
body="hello bot",
room_id="!other:example.org",
)
await self.adapter._on_room_message(ev)
self.adapter._handle_text_message.assert_not_called()
@pytest.mark.asyncio
async def test_dm_room_bypasses_allowed_room_gate(self):
self.adapter._allowed_room_ids = {"!project:example.org"}
self.adapter._is_dm_room = AsyncMock(return_value=True)
ev = self._mk_event(
sender="@alice:example.org",
body="hello bot",
room_id="!dm:example.org",
)
await self.adapter._on_room_message(ev)
self.adapter._handle_text_message.assert_awaited_once()
@pytest.mark.asyncio
async def test_configured_bridge_pattern_is_dropped(self):
self.adapter._ignored_user_patterns = [re.compile(r"^@telegram_")]
ev = self._mk_event(sender="@telegram_123:example.org", body="hello bot")
await self.adapter._on_room_message(ev)
self.adapter._handle_text_message.assert_not_called()
@pytest.mark.asyncio
async def test_notice_message_is_dropped_by_default(self):
ev = self._mk_event(
sender="@alice:example.org",
body="bot notice",
msgtype="m.notice",
)
await self.adapter._on_room_message(ev)
self.adapter._handle_text_message.assert_not_called()
@pytest.mark.asyncio
async def test_notice_message_can_be_enabled(self):
self.adapter._process_notices = True
ev = self._mk_event(
sender="@alice:example.org",
body="human-authored notice",
msgtype="m.notice",
)
await self.adapter._on_room_message(ev)
self.adapter._handle_text_message.assert_awaited_once()
@pytest.mark.asyncio
async def test_duplicate_event_id_dropped(self):
ev1 = self._mk_event(sender="@alice:example.org", body="hello bot", event_id="$dup")
ev2 = self._mk_event(sender="@alice:example.org", body="hello again bot", event_id="$dup")
await self.adapter._on_room_message(ev1)
await self.adapter._on_room_message(ev2)
self.adapter._handle_text_message.assert_awaited_once()
@pytest.mark.asyncio
async def test_old_startup_event_dropped(self):
now = time.time()
self.adapter._startup_ts = now
ev = self._mk_event(
sender="@alice:example.org",
body="hello bot",
event_id="$old",
ts=now - 60,
)
await self.adapter._on_room_message(ev)
self.adapter._handle_text_message.assert_not_called()
@pytest.mark.asyncio
async def test_seconds_timestamp_reaches_text_handler(self):
now = time.time()
self.adapter._startup_ts = now - 10
ev = self._mk_event(
sender="@alice:example.org",
body="hello bot",
event_id="$seconds-filter",
ts=now,
)
ev.timestamp = now
ev.server_timestamp = now
await self.adapter._on_room_message(ev)
self.adapter._handle_text_message.assert_awaited_once()
class TestMatrixRequireMention:
"""require_mention should honor config.extra like thread_require_mention."""
def test_require_mention_from_config_extra_false(self):
from gateway.platforms.matrix import MatrixAdapter
config = PlatformConfig(
enabled=True,
token="syt_test",
extra={
"homeserver": "https://matrix.example.org",
"require_mention": False,
},
)
adapter = MatrixAdapter(config)
assert adapter._require_mention is False
def test_require_mention_from_env_when_extra_unset(self, monkeypatch):
monkeypatch.setenv("MATRIX_REQUIRE_MENTION", "false")
from gateway.platforms.matrix import MatrixAdapter
config = PlatformConfig(
enabled=True,
token="syt_test",
extra={"homeserver": "https://matrix.example.org"},
)
adapter = MatrixAdapter(config)
assert adapter._require_mention is False
def test_require_mention_config_takes_precedence_over_env(self, monkeypatch):
monkeypatch.setenv("MATRIX_REQUIRE_MENTION", "true")
from gateway.platforms.matrix import MatrixAdapter
config = PlatformConfig(
enabled=True,
token="syt_test",
extra={
"homeserver": "https://matrix.example.org",
"require_mention": False,
},
)
adapter = MatrixAdapter(config)
assert adapter._require_mention is False
@pytest.mark.asyncio
async def test_require_mention_false_allows_unmentioned_group_message(self):
from gateway.platforms.matrix import MatrixAdapter
config = PlatformConfig(
enabled=True,
token="syt_test",
extra={
"homeserver": "https://matrix.example.org",
"user_id": "@bot:example.org",
"require_mention": False,
},
)
adapter = MatrixAdapter(config)
adapter._is_dm_room = AsyncMock(return_value=False)
adapter._resolve_room_identity = AsyncMock(
return_value=MagicMock(display_name="Project Room")
)
adapter._get_display_name = AsyncMock(return_value="Alice")
adapter._background_read_receipt = MagicMock()
ctx = await adapter._resolve_message_context(
room_id="!project:example.org",
sender="@alice:example.org",
event_id="$unmentioned",
body="hello there",
source_content={"body": "hello there"},
relates_to={},
)
assert ctx is not None
class TestMatrixFreeResponsePolicy:
def setup_method(self):
self.adapter = _make_adapter()
self.adapter._user_id = "@bot:example.org"
self.adapter._require_mention = True
self.adapter._free_rooms = {"!free:example.org"}
self.adapter._is_dm_room = AsyncMock(return_value=False)
self.adapter._resolve_room_identity = AsyncMock(
return_value=MagicMock(display_name="Free Room")
)
self.adapter._get_display_name = AsyncMock(return_value="Alice")
self.adapter._background_read_receipt = MagicMock()
@pytest.mark.asyncio
async def test_free_response_room_allows_unmentioned_message(self):
ctx = await self.adapter._resolve_message_context(
room_id="!free:example.org",
sender="@alice:example.org",
event_id="$free",
body="hello there",
source_content={"body": "hello there"},
relates_to={},
)
assert ctx is not None
@pytest.mark.asyncio
async def test_non_free_room_requires_mention(self):
ctx = await self.adapter._resolve_message_context(
room_id="!locked:example.org",
sender="@alice:example.org",
event_id="$locked",
body="hello there",
source_content={"body": "hello there"},
relates_to={},
)
assert ctx is None
class TestMatrixClockSkewWarning:
"""Clock-skew detector for #12614.
Reporter's host clock was set ~2 hours ahead of real time. The grace
filter `event_ts < startup_ts - 5` then drops every live event because
server timestamps look "older than startup". When this happens well
after startup (>30s), the adapter logs a one-shot WARNING pointing the
user at NTP instead of failing silently.
"""
def setup_method(self):
self.adapter = _make_adapter()
self.adapter._user_id = "@bot:example.org"
self.adapter._handle_text_message = AsyncMock()
self.adapter._handle_media_message = AsyncMock()
@staticmethod
def _mk_event(sender, ts_ms, event_id=None):
ev = MagicMock()
ev.room_id = "!room:example.org"
ev.sender = sender
ev.event_id = event_id or f"$evt-{sender}-{ts_ms}"
ev.timestamp = ts_ms
ev.server_timestamp = ts_ms
ev.content = {"msgtype": "m.text", "body": "hi"}
return ev
@pytest.mark.asyncio
async def test_late_drops_emit_one_shot_clock_skew_warning(self, caplog):
import logging
import time as _t
# Simulate the reporter's environment: host clock is ~2 hours ahead
# of server time. Startup happened "in the future" relative to the
# real-world events we're now receiving.
now = _t.time()
self.adapter._startup_ts = now - 60 # bot started 60s ago (wall clock)
# Server events are dated 2h before startup_ts (skewed clock).
skewed_event_ts_ms = int((self.adapter._startup_ts - 7200) * 1000)
with caplog.at_level(logging.WARNING, logger="gateway.platforms.matrix"):
for i in range(5):
ev = self._mk_event(
sender=f"@alice{i}:example.org", ts_ms=skewed_event_ts_ms
)
await self.adapter._on_room_message(ev)
# Handler should never be invoked — all events failed the grace check.
self.adapter._handle_text_message.assert_not_called()
# Exactly one WARNING from THIS logger should be emitted. Filter by
# logger name so unrelated stdlib/library warnings can't satisfy the
# assertion.
skew_warnings = [
r for r in caplog.records
if r.name == "gateway.platforms.matrix"
and r.levelname == "WARNING"
and "set-ntp" in r.getMessage()
]
assert len(skew_warnings) == 1, (
f"expected exactly 1 clock-skew warning, got {len(skew_warnings)}"
)
msg = skew_warnings[0].getMessage()
assert "7200" in msg, f"skew value missing from message: {msg!r}"
# Pin the counter so a regression in the gating logic (e.g. warning
# at threshold 1 or 5, or not stopping after warn) is caught.
assert self.adapter._late_grace_drops == 3
assert self.adapter._clock_skew_warned is True
@pytest.mark.asyncio
async def test_initial_sync_drops_do_not_warn(self, caplog):
"""During the first 30s after startup, old events are normal backfill."""
import logging
import time as _t
now = _t.time()
# Startup was 1s ago — we're still in the initial-sync window.
self.adapter._startup_ts = now - 1
old_ts_ms = int((self.adapter._startup_ts - 3600) * 1000)
with caplog.at_level(logging.WARNING, logger="gateway.platforms.matrix"):
for i in range(5):
ev = self._mk_event(
sender=f"@alice{i}:example.org", ts_ms=old_ts_ms
)
await self.adapter._on_room_message(ev)
# Backfill drops are silent — no clock-skew warning fired.
assert self.adapter._clock_skew_warned is False
skew_warnings = [
r for r in caplog.records
if r.name == "gateway.platforms.matrix"
and "set-ntp" in r.getMessage()
]
assert skew_warnings == []
@pytest.mark.asyncio
async def test_fewer_than_three_late_drops_do_not_warn(self, caplog):
"""A single delayed backfill event after 30s shouldn't trigger NTP advice."""
import logging
import time as _t
now = _t.time()
self.adapter._startup_ts = now - 120 # extra slack vs the 30s gate
old_ts_ms = int((self.adapter._startup_ts - 3600) * 1000)
with caplog.at_level(logging.WARNING, logger="gateway.platforms.matrix"):
for i in range(2): # only 2 late drops — under the threshold
ev = self._mk_event(
sender=f"@alice{i}:example.org", ts_ms=old_ts_ms
)
await self.adapter._on_room_message(ev)
assert self.adapter._late_grace_drops == 2
assert self.adapter._clock_skew_warned is False
@pytest.mark.asyncio
async def test_varied_backfill_skews_do_not_warn(self, caplog):
"""Backfill from a freshly-invited room delivers events of varied age.
A genuine clock-skew bug produces drops with a *constant* offset
(every event is ~X seconds older than wall clock). Joining an old
room post-startup delivers events spanning hours-to-days; those
skews vary wildly and must NOT trigger the NTP warning.
"""
import logging
import time as _t
now = _t.time()
self.adapter._startup_ts = now - 120
# Each event has a different age, ranging from 1h to 30d ago.
ages_in_hours = [1, 24, 168, 720, 4] # 1h, 1d, 1w, 30d, 4h
with caplog.at_level(logging.WARNING, logger="gateway.platforms.matrix"):
for i, hrs in enumerate(ages_in_hours):
ts_ms = int((self.adapter._startup_ts - hrs * 3600) * 1000)
ev = self._mk_event(
sender=f"@alice{i}:example.org", ts_ms=ts_ms
)
await self.adapter._on_room_message(ev)
# The varied-skew guard should keep the counter from reaching 3.
assert self.adapter._late_grace_drops < 3
assert self.adapter._clock_skew_warned is False
skew_warnings = [
r for r in caplog.records
if r.name == "gateway.platforms.matrix"
and "set-ntp" in r.getMessage()
]
assert skew_warnings == []
@pytest.mark.asyncio
async def test_state_reset_allows_warning_to_fire_again(self, caplog):
"""After the reset block at top of connect() runs, the warning is rearmed.
Reconnect lifecycle: the user fixes NTP, restarts the bot, and the
new connect() call resets _late_grace_drops / _clock_skew_warned at
the top. This test exercises the rearm path by:
1. Tripping the warning once (state: warned=True).
2. Running the same reset block connect() runs.
3. Tripping the warning a second time — the second warning should
fire because the state was cleared.
"""
import logging
import time as _t
now = _t.time()
self.adapter._startup_ts = now - 60
skewed_ms = int((self.adapter._startup_ts - 7200) * 1000)
with caplog.at_level(logging.WARNING, logger="gateway.platforms.matrix"):
for i in range(3):
ev = self._mk_event(
sender=f"@alice{i}:example.org", ts_ms=skewed_ms,
event_id=f"$first-{i}",
)
await self.adapter._on_room_message(ev)
assert self.adapter._clock_skew_warned is True
# Mirror the reset block in connect() (matrix.py around line 855).
self.adapter._startup_ts = _t.time() - 60
self.adapter._late_grace_drops = 0
self.adapter._late_grace_skew = 0.0
self.adapter._clock_skew_warned = False
# Same skewed-clock scenario should warn AGAIN after reset.
skewed_ms2 = int((self.adapter._startup_ts - 7200) * 1000)
for i in range(3):
ev = self._mk_event(
sender=f"@bob{i}:example.org", ts_ms=skewed_ms2,
event_id=f"$second-{i}",
)
await self.adapter._on_room_message(ev)
skew_warnings = [
r for r in caplog.records
if r.name == "gateway.platforms.matrix"
and "set-ntp" in r.getMessage()
]
assert len(skew_warnings) == 2, (
f"expected 2 warnings (one per connect cycle), got {len(skew_warnings)}"
)
# ---------------------------------------------------------------------------
# DM auto-thread
# ---------------------------------------------------------------------------
class TestMatrixDmAutoThread:
def setup_method(self):
self.adapter = _make_adapter()
self.adapter._is_dm_room = AsyncMock(return_value=True)
self.adapter._get_display_name = AsyncMock(return_value="Alice")
self.adapter._background_read_receipt = MagicMock()
# Disable require_mention so DMs pass gating
self.adapter._require_mention = False
@pytest.mark.asyncio
async def test_dm_auto_thread_enabled_creates_thread(self):
"""When dm_auto_thread is True, DM messages get auto-threaded."""
self.adapter._dm_auto_thread = True
ctx = await self.adapter._resolve_message_context(
room_id="!dm:ex",
sender="@alice:ex",
event_id="$ev1",
body="hello",
source_content={"body": "hello"},
relates_to={},
)
assert ctx is not None
_body, _is_dm, _chat_type, thread_id, _display, _source = ctx
assert thread_id == "$ev1"
@pytest.mark.asyncio
async def test_dm_auto_thread_disabled_no_thread(self):
"""When dm_auto_thread is False (default), DMs have no auto-thread."""
self.adapter._dm_auto_thread = False
ctx = await self.adapter._resolve_message_context(
room_id="!dm:ex",
sender="@alice:ex",
event_id="$ev2",
body="hello",
source_content={"body": "hello"},
relates_to={},
)
assert ctx is not None
_body, _is_dm, _chat_type, thread_id, _display, _source = ctx
assert thread_id is None
# ---------------------------------------------------------------------------
# Proxy configuration
# ---------------------------------------------------------------------------
class TestMatrixProxyConfig:
"""Verify that MatrixAdapter resolves and propagates proxy settings."""
def _make_adapter(self, monkeypatch, proxy_env=None):
monkeypatch.setenv("MATRIX_ACCESS_TOKEN", "syt_test")
monkeypatch.setenv("MATRIX_HOMESERVER", "https://matrix.example.org")
# Clear generic proxy vars so they don't leak from the host
for key in ("HTTPS_PROXY", "HTTP_PROXY", "ALL_PROXY",
"https_proxy", "http_proxy", "all_proxy", "MATRIX_PROXY"):
monkeypatch.delenv(key, raising=False)
if proxy_env:
for k, v in proxy_env.items():
monkeypatch.setenv(k, v)
with patch.dict("sys.modules", _make_fake_mautrix()):
from gateway.platforms.matrix import MatrixAdapter
cfg = PlatformConfig(enabled=True, token="syt_test",
extra={"homeserver": "https://matrix.example.org",
"user_id": "@bot:example.org"})
return MatrixAdapter(cfg)
def test_no_proxy_by_default(self, monkeypatch):
adapter = self._make_adapter(monkeypatch)
assert adapter._proxy_url is None
def test_matrix_proxy_env_var(self, monkeypatch):
adapter = self._make_adapter(monkeypatch,
proxy_env={"MATRIX_PROXY": "socks5://proxy:1080"})
assert adapter._proxy_url == "socks5://proxy:1080"
def test_generic_proxy_fallback(self, monkeypatch):
adapter = self._make_adapter(monkeypatch,
proxy_env={"HTTPS_PROXY": "http://corp:8080"})
assert adapter._proxy_url == "http://corp:8080"
def test_matrix_proxy_takes_priority(self, monkeypatch):
adapter = self._make_adapter(monkeypatch,
proxy_env={"MATRIX_PROXY": "socks5://special:1080",
"HTTPS_PROXY": "http://generic:8080"})
assert adapter._proxy_url == "socks5://special:1080"
class TestCreateMatrixSession:
"""Verify _create_matrix_session applies proxy at the session level."""
@pytest.mark.asyncio
async def test_no_proxy_returns_trust_env_session(self):
with patch.dict("sys.modules", _make_fake_mautrix()):
from gateway.platforms.matrix import _create_matrix_session
session = _create_matrix_session(None)
try:
assert session.trust_env is True
finally:
await session.close()
@pytest.mark.asyncio
async def test_http_proxy_sets_default_proxy(self):
with patch.dict("sys.modules", _make_fake_mautrix()):
from gateway.platforms.matrix import _create_matrix_session
session = _create_matrix_session("http://proxy:8080")
try:
assert str(session._default_proxy) == "http://proxy:8080"
finally:
await session.close()
@pytest.mark.asyncio
async def test_socks_proxy_uses_connector(self):
fake_connector = MagicMock()
with patch.dict("sys.modules", _make_fake_mautrix()):
with patch.dict("sys.modules", {
"aiohttp_socks": MagicMock(
ProxyConnector=MagicMock(
from_url=MagicMock(return_value=fake_connector)
)
),
}):
from gateway.platforms.matrix import _create_matrix_session
session = _create_matrix_session("socks5://proxy:1080")
try:
assert session.connector is fake_connector
finally:
await session.close()