feat(matrix): Tier 1 enhancement — reactions, read receipts, rich formatting, room management

Cherry-picked from PR #4338 by nepenth, resolved against current main.

Adds:
- Processing lifecycle reactions (eyes/checkmark/cross) via MATRIX_REACTIONS env
- Reaction send/receive with ReactionEvent + UnknownEvent fallback for older nio
- Fire-and-forget read receipts on text and media messages
- Message redaction, room history fetch, room creation, user invite
- Presence status control (online/offline/unavailable)
- Emote (/me) and notice message types with HTML rendering
- XSS-hardened markdown-to-HTML converter (strips raw HTML preprocessor,
  sanitizes link URLs against javascript:/data:/vbscript: schemes)
- Comprehensive regex fallback with full block/inline markdown support
- Markdown>=3.6 added to [matrix] extras in pyproject.toml
- 46 new tests covering all features and security hardening
This commit is contained in:
nepenth 2026-04-05 11:19:27 -07:00 committed by Teknium
parent 20b4060dbf
commit 534511bebb
3 changed files with 989 additions and 19 deletions

View file

@ -10,8 +10,10 @@ Environment variables:
MATRIX_USER_ID Full user ID (@bot:server) required for password login
MATRIX_PASSWORD Password (alternative to access token)
MATRIX_ENCRYPTION Set "true" to enable E2EE
MATRIX_ALLOWED_USERS Comma-separated Matrix user IDs (@user:server)
MATRIX_HOME_ROOM Room ID for cron/notification delivery
MATRIX_ALLOWED_USERS Comma-separated Matrix user IDs (@user:server)
MATRIX_HOME_ROOM Room ID for cron/notification delivery
MATRIX_REACTIONS Set "false" to disable processing lifecycle reactions
(eyes/checkmark/cross). Default: true
MATRIX_REQUIRE_MENTION Require @mention in rooms (default: true)
MATRIX_FREE_RESPONSE_ROOMS Comma-separated room IDs exempt from mention requirement
MATRIX_AUTO_THREAD Auto-create threads for room messages (default: true)
@ -30,6 +32,8 @@ import time
from pathlib import Path
from typing import Any, Dict, Optional, Set
from html import escape as _html_escape
from gateway.config import Platform, PlatformConfig
from gateway.platforms.base import (
BasePlatformAdapter,
@ -130,6 +134,11 @@ class MatrixAdapter(BasePlatformAdapter):
self._bot_participated_threads: set = self._load_participated_threads()
self._MAX_TRACKED_THREADS = 500
# Reactions: configurable via MATRIX_REACTIONS (default: true).
self._reactions_enabled: bool = os.getenv(
"MATRIX_REACTIONS", "true"
).lower() not in ("false", "0", "no")
def _is_duplicate_event(self, event_id) -> bool:
"""Return True if this event was already processed. Tracks the ID otherwise."""
if not event_id:
@ -283,6 +292,13 @@ class MatrixAdapter(BasePlatformAdapter):
client.add_event_callback(self._on_room_message_media, encrypted_media_cls)
client.add_event_callback(self._on_invite, nio.InviteMemberEvent)
# Reaction events (m.reaction).
if hasattr(nio, "ReactionEvent"):
client.add_event_callback(self._on_reaction, nio.ReactionEvent)
else:
# Older matrix-nio versions: use UnknownEvent fallback.
client.add_event_callback(self._on_unknown_event, nio.UnknownEvent)
# If E2EE: handle encrypted events.
if self._encryption and hasattr(client, "olm"):
client.add_event_callback(
@ -1002,6 +1018,9 @@ class MatrixAdapter(BasePlatformAdapter):
if thread_id:
self._track_thread(thread_id)
# Acknowledge receipt so the room shows as read (fire-and-forget).
self._background_read_receipt(room.room_id, event.event_id)
await self.handle_message(msg_event)
async def _on_room_message_media(self, room: Any, event: Any) -> None:
@ -1220,6 +1239,9 @@ class MatrixAdapter(BasePlatformAdapter):
if thread_id:
self._track_thread(thread_id)
# Acknowledge receipt so the room shows as read (fire-and-forget).
self._background_read_receipt(room.room_id, event.event_id)
await self.handle_message(msg_event)
async def _on_invite(self, room: Any, event: Any) -> None:
@ -1255,6 +1277,369 @@ class MatrixAdapter(BasePlatformAdapter):
except Exception as exc:
logger.warning("Matrix: error joining %s: %s", room.room_id, exc)
# ------------------------------------------------------------------
# Reactions (send, receive, processing lifecycle)
# ------------------------------------------------------------------
async def _send_reaction(
self, room_id: str, event_id: str, emoji: str,
) -> bool:
"""Send an emoji reaction to a message in a room."""
import nio
if not self._client:
return False
content = {
"m.relates_to": {
"rel_type": "m.annotation",
"event_id": event_id,
"key": emoji,
}
}
try:
resp = await self._client.room_send(
room_id, "m.reaction", content,
ignore_unverified_devices=True,
)
if isinstance(resp, nio.RoomSendResponse):
logger.debug("Matrix: sent reaction %s to %s", emoji, event_id)
return True
logger.debug("Matrix: reaction send failed: %s", resp)
return False
except Exception as exc:
logger.debug("Matrix: reaction send error: %s", exc)
return False
async def _redact_reaction(
self, room_id: str, reaction_event_id: str, reason: str = "",
) -> bool:
"""Remove a reaction by redacting its event."""
return await self.redact_message(room_id, reaction_event_id, reason)
async def on_processing_start(self, event: MessageEvent) -> None:
"""Add eyes reaction when the agent starts processing a message."""
if not self._reactions_enabled:
return
msg_id = event.message_id
room_id = event.source.chat_id
if msg_id and room_id:
await self._send_reaction(room_id, msg_id, "\U0001f440")
async def on_processing_complete(
self, event: MessageEvent, success: bool,
) -> None:
"""Replace eyes with checkmark (success) or cross (failure)."""
if not self._reactions_enabled:
return
msg_id = event.message_id
room_id = event.source.chat_id
if not msg_id or not room_id:
return
# Note: Matrix doesn't support removing a specific reaction easily
# without tracking the reaction event_id. We send the new reaction;
# the eyes stays (acceptable UX — both are visible).
await self._send_reaction(
room_id, msg_id, "\u2705" if success else "\u274c",
)
async def _on_reaction(self, room: Any, event: Any) -> None:
"""Handle incoming reaction events."""
if event.sender == self._user_id:
return
if self._is_duplicate_event(getattr(event, "event_id", None)):
return
# Log for now; future: trigger agent actions based on emoji.
reacts_to = getattr(event, "reacts_to", "")
key = getattr(event, "key", "")
logger.info(
"Matrix: reaction %s from %s on %s in %s",
key, event.sender, reacts_to, room.room_id,
)
async def _on_unknown_event(self, room: Any, event: Any) -> None:
"""Fallback handler for events not natively parsed by matrix-nio.
Catches m.reaction on older nio versions that lack ReactionEvent.
"""
source = getattr(event, "source", {})
if source.get("type") != "m.reaction":
return
content = source.get("content", {})
relates_to = content.get("m.relates_to", {})
if relates_to.get("rel_type") != "m.annotation":
return
if source.get("sender") == self._user_id:
return
logger.info(
"Matrix: reaction %s from %s on %s in %s",
relates_to.get("key", "?"),
source.get("sender", "?"),
relates_to.get("event_id", "?"),
room.room_id,
)
# ------------------------------------------------------------------
# Read receipts
# ------------------------------------------------------------------
def _background_read_receipt(self, room_id: str, event_id: str) -> None:
"""Fire-and-forget read receipt with error logging."""
async def _send() -> None:
try:
await self.send_read_receipt(room_id, event_id)
except Exception as exc: # pragma: no cover — defensive
logger.debug("Matrix: background read receipt failed: %s", exc)
asyncio.ensure_future(_send())
async def send_read_receipt(self, room_id: str, event_id: str) -> bool:
"""Send a read receipt (m.read) for an event.
Also sets the fully-read marker so the room is marked as read
in all clients.
"""
if not self._client:
return False
try:
if hasattr(self._client, "room_read_markers"):
await self._client.room_read_markers(
room_id,
fully_read_event=event_id,
read_event=event_id,
)
else:
# Fallback for older matrix-nio.
await self._client.room_send(
room_id, "m.receipt", {"event_id": event_id},
)
logger.debug("Matrix: sent read receipt for %s in %s", event_id, room_id)
return True
except Exception as exc:
logger.debug("Matrix: read receipt failed: %s", exc)
return False
# ------------------------------------------------------------------
# Message redaction
# ------------------------------------------------------------------
async def redact_message(
self, room_id: str, event_id: str, reason: str = "",
) -> bool:
"""Redact (delete) a message or event from a room."""
import nio
if not self._client:
return False
try:
resp = await self._client.room_redact(
room_id, event_id, reason=reason,
)
if isinstance(resp, nio.RoomRedactResponse):
logger.info("Matrix: redacted %s in %s", event_id, room_id)
return True
logger.warning("Matrix: redact failed: %s", resp)
return False
except Exception as exc:
logger.warning("Matrix: redact error: %s", exc)
return False
# ------------------------------------------------------------------
# Room history
# ------------------------------------------------------------------
async def fetch_room_history(
self,
room_id: str,
limit: int = 50,
start: str = "",
) -> list:
"""Fetch recent messages from a room.
Returns a list of dicts with keys: event_id, sender, body,
timestamp, type. Uses the ``room_messages()`` API.
"""
import nio
if not self._client:
return []
try:
resp = await self._client.room_messages(
room_id,
start=start or "",
limit=limit,
direction=nio.Api.MessageDirection.back
if hasattr(nio.Api, "MessageDirection")
else "b",
)
except Exception as exc:
logger.warning("Matrix: room_messages failed for %s: %s", room_id, exc)
return []
if not isinstance(resp, nio.RoomMessagesResponse):
logger.warning("Matrix: room_messages returned %s", type(resp).__name__)
return []
messages = []
for event in reversed(resp.chunk):
body = getattr(event, "body", "") or ""
messages.append({
"event_id": getattr(event, "event_id", ""),
"sender": getattr(event, "sender", ""),
"body": body,
"timestamp": getattr(event, "server_timestamp", 0),
"type": type(event).__name__,
})
return messages
# ------------------------------------------------------------------
# Room creation & management
# ------------------------------------------------------------------
async def create_room(
self,
name: str = "",
topic: str = "",
invite: Optional[list] = None,
is_direct: bool = False,
preset: str = "private_chat",
) -> Optional[str]:
"""Create a new Matrix room.
Args:
name: Human-readable room name.
topic: Room topic.
invite: List of user IDs to invite.
is_direct: Mark as a DM room.
preset: One of private_chat, public_chat, trusted_private_chat.
Returns the room_id on success, None on failure.
"""
import nio
if not self._client:
return None
try:
resp = await self._client.room_create(
name=name or None,
topic=topic or None,
invite=invite or [],
is_direct=is_direct,
preset=getattr(
nio.Api.RoomPreset if hasattr(nio.Api, "RoomPreset") else type("", (), {}),
preset, None,
) or preset,
)
if isinstance(resp, nio.RoomCreateResponse):
room_id = resp.room_id
self._joined_rooms.add(room_id)
logger.info("Matrix: created room %s (%s)", room_id, name or "unnamed")
return room_id
logger.warning("Matrix: room_create failed: %s", resp)
return None
except Exception as exc:
logger.warning("Matrix: room_create error: %s", exc)
return None
async def invite_user(self, room_id: str, user_id: str) -> bool:
"""Invite a user to a room."""
import nio
if not self._client:
return False
try:
resp = await self._client.room_invite(room_id, user_id)
if isinstance(resp, nio.RoomInviteResponse):
logger.info("Matrix: invited %s to %s", user_id, room_id)
return True
logger.warning("Matrix: invite failed: %s", resp)
return False
except Exception as exc:
logger.warning("Matrix: invite error: %s", exc)
return False
# ------------------------------------------------------------------
# Presence
# ------------------------------------------------------------------
_VALID_PRESENCE_STATES = frozenset(("online", "offline", "unavailable"))
async def set_presence(self, state: str = "online", status_msg: str = "") -> bool:
"""Set the bot's presence status."""
if not self._client:
return False
if state not in self._VALID_PRESENCE_STATES:
logger.warning("Matrix: invalid presence state %r", state)
return False
try:
if hasattr(self._client, "set_presence"):
await self._client.set_presence(state, status_msg=status_msg or None)
logger.debug("Matrix: presence set to %s", state)
return True
except Exception as exc:
logger.debug("Matrix: set_presence failed: %s", exc)
return False
# ------------------------------------------------------------------
# Emote & notice message types
# ------------------------------------------------------------------
async def send_emote(
self, chat_id: str, text: str, metadata: Optional[Dict[str, Any]] = None,
) -> SendResult:
"""Send an emote message (/me style action)."""
import nio
if not self._client or not text:
return SendResult(success=False, error="No client or empty text")
msg_content: Dict[str, Any] = {
"msgtype": "m.emote",
"body": text,
}
html = self._markdown_to_html(text)
if html and html != text:
msg_content["format"] = "org.matrix.custom.html"
msg_content["formatted_body"] = html
try:
resp = await self._client.room_send(
chat_id, "m.room.message", msg_content,
ignore_unverified_devices=True,
)
if isinstance(resp, nio.RoomSendResponse):
return SendResult(success=True, message_id=resp.event_id)
return SendResult(success=False, error=str(resp))
except Exception as exc:
return SendResult(success=False, error=str(exc))
async def send_notice(
self, chat_id: str, text: str, metadata: Optional[Dict[str, Any]] = None,
) -> SendResult:
"""Send a notice message (bot-appropriate, non-alerting)."""
import nio
if not self._client or not text:
return SendResult(success=False, error="No client or empty text")
msg_content: Dict[str, Any] = {
"msgtype": "m.notice",
"body": text,
}
html = self._markdown_to_html(text)
if html and html != text:
msg_content["format"] = "org.matrix.custom.html"
msg_content["formatted_body"] = html
try:
resp = await self._client.room_send(
chat_id, "m.room.message", msg_content,
ignore_unverified_devices=True,
)
if isinstance(resp, nio.RoomSendResponse):
return SendResult(success=True, message_id=resp.event_id)
return SendResult(success=False, error=str(resp))
except Exception as exc:
return SendResult(success=False, error=str(exc))
# ------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------
@ -1406,29 +1791,196 @@ class MatrixAdapter(BasePlatformAdapter):
return f"{self._homeserver}/_matrix/client/v1/media/download/{parts}"
def _markdown_to_html(self, text: str) -> str:
"""Convert Markdown to Matrix-compatible HTML.
"""Convert Markdown to Matrix-compatible HTML (org.matrix.custom.html).
Uses a simple conversion for common patterns. For full fidelity
a markdown-it style library could be used, but this covers the
common cases without an extra dependency.
Uses the ``markdown`` library when available (installed with the
``matrix`` extra). Falls back to a comprehensive regex converter
that handles fenced code blocks, inline code, headers, bold,
italic, strikethrough, links, blockquotes, lists, and horizontal
rules everything the Matrix HTML spec allows.
"""
try:
import markdown
html = markdown.markdown(
text,
extensions=["fenced_code", "tables", "nl2br"],
import markdown as _md
md = _md.Markdown(
extensions=["fenced_code", "tables", "nl2br", "sane_lists"],
)
# Strip wrapping <p> tags for single-paragraph messages.
# Remove the raw HTML preprocessor so <script> etc. in the
# source are escaped rather than passed through.
if "html_block" in md.preprocessors:
md.preprocessors.deregister("html_block")
html = md.convert(text)
md.reset()
# Strip wrapping <p> tags for single-paragraph messages so
# clients don't add extra spacing around short replies.
if html.count("<p>") == 1:
html = html.replace("<p>", "").replace("</p>", "")
return html
except ImportError:
pass
# Minimal fallback: just handle bold, italic, code.
html = text
html = re.sub(r"\*\*(.+?)\*\*", r"<strong>\1</strong>", html)
html = re.sub(r"\*(.+?)\*", r"<em>\1</em>", html)
html = re.sub(r"`([^`]+)`", r"<code>\1</code>", html)
html = re.sub(r"\n", r"<br>", html)
return html
return self._markdown_to_html_fallback(text)
# ------------------------------------------------------------------
# Regex-based Markdown -> HTML (no extra dependencies)
# ------------------------------------------------------------------
@staticmethod
def _sanitize_link_url(url: str) -> str:
"""Sanitize a URL for use in an href attribute.
Rejects dangerous URI schemes (javascript:, data:, vbscript:) and
escapes double-quotes to prevent attribute breakout.
"""
stripped = url.strip()
scheme = stripped.split(":", 1)[0].lower().strip() if ":" in stripped else ""
if scheme in ("javascript", "data", "vbscript"):
return ""
# Escape double quotes to prevent href attribute breakout.
return stripped.replace('"', "&quot;")
@staticmethod
def _markdown_to_html_fallback(text: str) -> str:
"""Comprehensive regex Markdown-to-HTML for Matrix.
Handles fenced code blocks, inline code, headers, bold, italic,
strikethrough, links, blockquotes, ordered/unordered lists, and
horizontal rules. Code regions are extracted first to prevent
inner transformations from mangling them.
Security: all non-code text is HTML-escaped before markdown
transforms to prevent HTML injection via crafted input. Link
URLs are sanitized against dangerous URI schemes.
"""
placeholders: list = []
def _protect_html(html_fragment: str) -> str:
idx = len(placeholders)
placeholders.append(html_fragment)
return f"\x00PROTECTED{idx}\x00"
# Fenced code blocks: ```lang\n...\n```
result = re.sub(
r"```(\w*)\n(.*?)```",
lambda m: _protect_html(
f'<pre><code class="language-{_html_escape(m.group(1))}">'
f"{_html_escape(m.group(2))}</code></pre>"
if m.group(1)
else f"<pre><code>{_html_escape(m.group(2))}</code></pre>"
),
text,
flags=re.DOTALL,
)
# Inline code: `code`
result = re.sub(
r"`([^`\n]+)`",
lambda m: _protect_html(
f"<code>{_html_escape(m.group(1))}</code>"
),
result,
)
# Extract and protect markdown links before escaping.
result = re.sub(
r"\[([^\]]+)\]\(([^)]+)\)",
lambda m: _protect_html(
'<a href="{}">{}</a>'.format(
MatrixAdapter._sanitize_link_url(m.group(2)),
_html_escape(m.group(1)),
)
),
result,
)
# HTML-escape remaining text (neutralises <script>, <img onerror=...>).
parts = re.split(r"(\x00PROTECTED\d+\x00)", result)
for idx, part in enumerate(parts):
if not part.startswith("\x00PROTECTED"):
parts[idx] = _html_escape(part)
result = "".join(parts)
# Block-level transforms (line-oriented).
lines = result.split("\n")
out_lines: list = []
i = 0
while i < len(lines):
line = lines[i]
# Horizontal rule
if re.match(r"^[\s]*([-*_])\s*\1\s*\1[\s\-*_]*$", line):
out_lines.append("<hr>")
i += 1
continue
# Headers
hdr = re.match(r"^(#{1,6})\s+(.+)$", line)
if hdr:
level = len(hdr.group(1))
out_lines.append(f"<h{level}>{hdr.group(2).strip()}</h{level}>")
i += 1
continue
# Blockquote (> may be escaped to &gt; by html.escape)
if line.startswith("&gt; ") or line == "&gt;" or line.startswith("> ") or line == ">":
bq_lines = []
while i < len(lines) and (
lines[i].startswith("&gt; ") or lines[i] == "&gt;"
or lines[i].startswith("> ") or lines[i] == ">"
):
ln = lines[i]
if ln.startswith("&gt; "):
bq_lines.append(ln[5:])
elif ln.startswith("> "):
bq_lines.append(ln[2:])
else:
bq_lines.append("")
i += 1
out_lines.append(f"<blockquote>{'<br>'.join(bq_lines)}</blockquote>")
continue
# Unordered list
ul_match = re.match(r"^[\s]*[-*+]\s+(.+)$", line)
if ul_match:
items = []
while i < len(lines) and re.match(r"^[\s]*[-*+]\s+(.+)$", lines[i]):
items.append(re.match(r"^[\s]*[-*+]\s+(.+)$", lines[i]).group(1))
i += 1
li = "".join(f"<li>{item}</li>" for item in items)
out_lines.append(f"<ul>{li}</ul>")
continue
# Ordered list
ol_match = re.match(r"^[\s]*\d+[.)]\s+(.+)$", line)
if ol_match:
items = []
while i < len(lines) and re.match(r"^[\s]*\d+[.)]\s+(.+)$", lines[i]):
items.append(re.match(r"^[\s]*\d+[.)]\s+(.+)$", lines[i]).group(1))
i += 1
li = "".join(f"<li>{item}</li>" for item in items)
out_lines.append(f"<ol>{li}</ol>")
continue
out_lines.append(line)
i += 1
result = "\n".join(out_lines)
# Inline transforms.
result = re.sub(r"\*\*(.+?)\*\*", r"<strong>\1</strong>", result, flags=re.DOTALL)
result = re.sub(r"__(.+?)__", r"<strong>\1</strong>", result, flags=re.DOTALL)
result = re.sub(r"\*(.+?)\*", r"<em>\1</em>", result, flags=re.DOTALL)
result = re.sub(r"(?<!\w)_(.+?)_(?!\w)", r"<em>\1</em>", result, flags=re.DOTALL)
result = re.sub(r"~~(.+?)~~", r"<del>\1</del>", result, flags=re.DOTALL)
result = re.sub(r"\n", "<br>\n", result)
# Clean up excessive <br> around block elements.
result = re.sub(r"<br>\n(</?(?:pre|blockquote|h[1-6]|ul|ol|li|hr))", r"\n\1", result)
result = re.sub(r"(</(?:pre|blockquote|h[1-6]|ul|ol|li)>)<br>", r"\1", result)
# Restore protected regions.
for idx, original in enumerate(placeholders):
result = result.replace(f"\x00PROTECTED{idx}\x00", original)
return result

View file

@ -43,7 +43,7 @@ dev = ["debugpy>=1.8.0,<2", "pytest>=9.0.2,<10", "pytest-asyncio>=1.3.0,<2", "py
messaging = ["python-telegram-bot>=22.6,<23", "discord.py[voice]>=2.7.1,<3", "aiohttp>=3.13.3,<4", "slack-bolt>=1.18.0,<2", "slack-sdk>=3.27.0,<4"]
cron = ["croniter>=6.0.0,<7"]
slack = ["slack-bolt>=1.18.0,<2", "slack-sdk>=3.27.0,<4"]
matrix = ["matrix-nio[e2e]>=0.24.0,<1"]
matrix = ["matrix-nio[e2e]>=0.24.0,<1", "Markdown>=3.6,<4"]
cli = ["simple-term-menu>=1.0,<2"]
tts-premium = ["elevenlabs>=1.0,<2"]
voice = [

View file

@ -1348,3 +1348,421 @@ class TestMatrixEncryptedMedia:
msg_event = adapter.handle_message.await_args.args[0]
assert not msg_event.media_urls
assert not msg_event.media_types
# ---------------------------------------------------------------------------
# Markdown to HTML: security tests
# ---------------------------------------------------------------------------
class TestMatrixMarkdownHtmlSecurity:
"""Tests for HTML injection prevention in _markdown_to_html_fallback."""
def setup_method(self):
from gateway.platforms.matrix import MatrixAdapter
self.convert = MatrixAdapter._markdown_to_html_fallback
def test_script_injection_in_header(self):
result = self.convert("# <script>alert(1)</script>")
assert "<script>" not in result
assert "&lt;script&gt;" in result
def test_script_injection_in_plain_text(self):
result = self.convert("Hello <script>alert(1)</script>")
assert "<script>" not in result
def test_img_onerror_in_blockquote(self):
result = self.convert('> <img onerror="alert(1)">')
assert "onerror" not in result or "&lt;img" in result
def test_script_in_list_item(self):
result = self.convert("- <script>alert(1)</script>")
assert "<script>" not in result
def test_script_in_ordered_list(self):
result = self.convert("1. <script>alert(1)</script>")
assert "<script>" not in result
def test_javascript_uri_blocked(self):
result = self.convert("[click](javascript:alert(1))")
assert 'href="javascript:' not in result
def test_data_uri_blocked(self):
result = self.convert("[click](data:text/html,<script>)")
assert 'href="data:' not in result
def test_vbscript_uri_blocked(self):
result = self.convert("[click](vbscript:alert(1))")
assert 'href="vbscript:' not in result
def test_link_text_html_injection(self):
result = self.convert('[<img onerror="x">](http://safe.com)')
assert "<img" not in result or "&lt;img" in result
def test_link_href_attribute_breakout(self):
result = self.convert('[link](http://x" onclick="alert(1))')
assert "onclick" not in result or "&quot;" in result
def test_html_injection_in_bold(self):
result = self.convert("**<img onerror=alert(1)>**")
assert "<img" not in result or "&lt;img" in result
def test_html_injection_in_italic(self):
result = self.convert("*<script>alert(1)</script>*")
assert "<script>" not in result
# ---------------------------------------------------------------------------
# Markdown to HTML: extended formatting tests
# ---------------------------------------------------------------------------
class TestMatrixMarkdownHtmlFormatting:
"""Tests for new formatting capabilities in _markdown_to_html_fallback."""
def setup_method(self):
from gateway.platforms.matrix import MatrixAdapter
self.convert = MatrixAdapter._markdown_to_html_fallback
def test_fenced_code_block(self):
result = self.convert('```python\ndef hello():\n pass\n```')
assert "<pre><code" in result
assert "language-python" in result
def test_fenced_code_block_no_lang(self):
result = self.convert('```\nsome code\n```')
assert "<pre><code>" in result
def test_code_block_html_escaped(self):
result = self.convert('```\n<script>alert(1)</script>\n```')
assert "&lt;script&gt;" in result
assert "<script>" not in result
def test_headers(self):
assert "<h1>" in self.convert("# H1")
assert "<h2>" in self.convert("## H2")
assert "<h3>" in self.convert("### H3")
def test_unordered_list(self):
result = self.convert("- One\n- Two\n- Three")
assert "<ul>" in result
assert result.count("<li>") == 3
def test_ordered_list(self):
result = self.convert("1. First\n2. Second")
assert "<ol>" in result
assert result.count("<li>") == 2
def test_blockquote(self):
result = self.convert("> A quote\n> continued")
assert "<blockquote>" in result
assert "A quote" in result
def test_horizontal_rule(self):
assert "<hr>" in self.convert("---")
assert "<hr>" in self.convert("***")
def test_strikethrough(self):
result = self.convert("~~deleted~~")
assert "<del>deleted</del>" in result
def test_links_preserved(self):
result = self.convert("[text](https://example.com)")
assert '<a href="https://example.com">text</a>' in result
def test_complex_mixed_document(self):
"""A realistic agent response with multiple formatting types."""
text = "## Summary\n\nHere's what I found:\n\n- **Bold item**\n- `code` item\n\n```bash\necho hello\n```\n\n1. Step one\n2. Step two"
result = self.convert(text)
assert "<h2>" in result
assert "<strong>" in result
assert "<code>" in result
assert "<ul>" in result
assert "<ol>" in result
assert "<pre><code" in result
# ---------------------------------------------------------------------------
# Link URL sanitization
# ---------------------------------------------------------------------------
class TestMatrixLinkSanitization:
def test_safe_https_url(self):
from gateway.platforms.matrix import MatrixAdapter
assert MatrixAdapter._sanitize_link_url("https://example.com") == "https://example.com"
def test_javascript_blocked(self):
from gateway.platforms.matrix import MatrixAdapter
assert MatrixAdapter._sanitize_link_url("javascript:alert(1)") == ""
def test_data_blocked(self):
from gateway.platforms.matrix import MatrixAdapter
assert MatrixAdapter._sanitize_link_url("data:text/html,bad") == ""
def test_vbscript_blocked(self):
from gateway.platforms.matrix import MatrixAdapter
assert MatrixAdapter._sanitize_link_url("vbscript:bad") == ""
def test_quotes_escaped(self):
from gateway.platforms.matrix import MatrixAdapter
result = MatrixAdapter._sanitize_link_url('http://x"y')
assert '"' not in result
assert "&quot;" in result
# ---------------------------------------------------------------------------
# Reactions
# ---------------------------------------------------------------------------
class TestMatrixReactions:
def setup_method(self):
self.adapter = _make_adapter()
@pytest.mark.asyncio
async def test_send_reaction(self):
"""_send_reaction should call room_send with m.reaction."""
nio = pytest.importorskip("nio")
mock_client = MagicMock()
mock_client.room_send = AsyncMock(
return_value=MagicMock(spec=nio.RoomSendResponse)
)
self.adapter._client = mock_client
result = await self.adapter._send_reaction("!room:ex", "$event1", "👍")
assert result is True
mock_client.room_send.assert_called_once()
args = mock_client.room_send.call_args
assert args[0][1] == "m.reaction"
content = args[0][2]
assert content["m.relates_to"]["rel_type"] == "m.annotation"
assert content["m.relates_to"]["key"] == "👍"
@pytest.mark.asyncio
async def test_send_reaction_no_client(self):
self.adapter._client = None
result = await self.adapter._send_reaction("!room:ex", "$ev", "👍")
assert result is False
@pytest.mark.asyncio
async def test_on_processing_start_sends_eyes(self):
"""on_processing_start should send 👀 reaction."""
from gateway.platforms.base import MessageEvent, MessageType
self.adapter._reactions_enabled = True
self.adapter._send_reaction = AsyncMock(return_value=True)
source = MagicMock()
source.chat_id = "!room:ex"
event = MessageEvent(
text="hello",
message_type=MessageType.TEXT,
source=source,
raw_message={},
message_id="$msg1",
)
await self.adapter.on_processing_start(event)
self.adapter._send_reaction.assert_called_once_with("!room:ex", "$msg1", "👀")
@pytest.mark.asyncio
async def test_on_processing_complete_sends_check(self):
from gateway.platforms.base import MessageEvent, MessageType
self.adapter._reactions_enabled = True
self.adapter._send_reaction = AsyncMock(return_value=True)
source = MagicMock()
source.chat_id = "!room:ex"
event = MessageEvent(
text="hello",
message_type=MessageType.TEXT,
source=source,
raw_message={},
message_id="$msg1",
)
await self.adapter.on_processing_complete(event, success=True)
self.adapter._send_reaction.assert_called_once_with("!room:ex", "$msg1", "")
@pytest.mark.asyncio
async def test_reactions_disabled(self):
from gateway.platforms.base import MessageEvent, MessageType
self.adapter._reactions_enabled = False
self.adapter._send_reaction = AsyncMock()
source = MagicMock()
source.chat_id = "!room:ex"
event = MessageEvent(
text="hello",
message_type=MessageType.TEXT,
source=source,
raw_message={},
message_id="$msg1",
)
await self.adapter.on_processing_start(event)
self.adapter._send_reaction.assert_not_called()
# ---------------------------------------------------------------------------
# Read receipts
# ---------------------------------------------------------------------------
class TestMatrixReadReceipts:
def setup_method(self):
self.adapter = _make_adapter()
@pytest.mark.asyncio
async def test_send_read_receipt(self):
mock_client = MagicMock()
mock_client.room_read_markers = AsyncMock(return_value=MagicMock())
self.adapter._client = mock_client
result = await self.adapter.send_read_receipt("!room:ex", "$event1")
assert result is True
mock_client.room_read_markers.assert_called_once()
@pytest.mark.asyncio
async def test_read_receipt_no_client(self):
self.adapter._client = None
result = await self.adapter.send_read_receipt("!room:ex", "$event1")
assert result is False
# ---------------------------------------------------------------------------
# Message redaction
# ---------------------------------------------------------------------------
class TestMatrixRedaction:
def setup_method(self):
self.adapter = _make_adapter()
@pytest.mark.asyncio
async def test_redact_message(self):
nio = pytest.importorskip("nio")
mock_client = MagicMock()
mock_client.room_redact = AsyncMock(
return_value=MagicMock(spec=nio.RoomRedactResponse)
)
self.adapter._client = mock_client
result = await self.adapter.redact_message("!room:ex", "$ev1", "oops")
assert result is True
mock_client.room_redact.assert_called_once()
@pytest.mark.asyncio
async def test_redact_no_client(self):
self.adapter._client = None
result = await self.adapter.redact_message("!room:ex", "$ev1")
assert result is False
# ---------------------------------------------------------------------------
# Room creation & invite
# ---------------------------------------------------------------------------
class TestMatrixRoomManagement:
def setup_method(self):
self.adapter = _make_adapter()
@pytest.mark.asyncio
async def test_create_room(self):
nio = pytest.importorskip("nio")
mock_resp = MagicMock(spec=nio.RoomCreateResponse)
mock_resp.room_id = "!new:example.org"
mock_client = MagicMock()
mock_client.room_create = AsyncMock(return_value=mock_resp)
self.adapter._client = mock_client
room_id = await self.adapter.create_room(name="Test Room", topic="A test")
assert room_id == "!new:example.org"
assert "!new:example.org" in self.adapter._joined_rooms
@pytest.mark.asyncio
async def test_invite_user(self):
nio = pytest.importorskip("nio")
mock_client = MagicMock()
mock_client.room_invite = AsyncMock(
return_value=MagicMock(spec=nio.RoomInviteResponse)
)
self.adapter._client = mock_client
result = await self.adapter.invite_user("!room:ex", "@user:ex")
assert result is True
@pytest.mark.asyncio
async def test_create_room_no_client(self):
self.adapter._client = None
result = await self.adapter.create_room()
assert result is None
# ---------------------------------------------------------------------------
# Presence
# ---------------------------------------------------------------------------
class TestMatrixPresence:
def setup_method(self):
self.adapter = _make_adapter()
@pytest.mark.asyncio
async def test_set_presence_valid(self):
mock_client = MagicMock()
mock_client.set_presence = AsyncMock()
self.adapter._client = mock_client
result = await self.adapter.set_presence("online")
assert result is True
@pytest.mark.asyncio
async def test_set_presence_invalid_state(self):
mock_client = MagicMock()
self.adapter._client = mock_client
result = await self.adapter.set_presence("busy")
assert result is False
@pytest.mark.asyncio
async def test_set_presence_no_client(self):
self.adapter._client = None
result = await self.adapter.set_presence("online")
assert result is False
# ---------------------------------------------------------------------------
# Emote & notice
# ---------------------------------------------------------------------------
class TestMatrixMessageTypes:
def setup_method(self):
self.adapter = _make_adapter()
@pytest.mark.asyncio
async def test_send_emote(self):
nio = pytest.importorskip("nio")
mock_client = MagicMock()
mock_resp = MagicMock(spec=nio.RoomSendResponse)
mock_resp.event_id = "$emote1"
mock_client.room_send = AsyncMock(return_value=mock_resp)
self.adapter._client = mock_client
result = await self.adapter.send_emote("!room:ex", "waves hello")
assert result.success is True
call_args = mock_client.room_send.call_args[0]
assert call_args[2]["msgtype"] == "m.emote"
@pytest.mark.asyncio
async def test_send_notice(self):
nio = pytest.importorskip("nio")
mock_client = MagicMock()
mock_resp = MagicMock(spec=nio.RoomSendResponse)
mock_resp.event_id = "$notice1"
mock_client.room_send = AsyncMock(return_value=mock_resp)
self.adapter._client = mock_client
result = await self.adapter.send_notice("!room:ex", "System message")
assert result.success is True
call_args = mock_client.room_send.call_args[0]
assert call_args[2]["msgtype"] == "m.notice"
@pytest.mark.asyncio
async def test_send_emote_empty_text(self):
self.adapter._client = MagicMock()
result = await self.adapter.send_emote("!room:ex", "")
assert result.success is False