fix(send_message): reuse live gateway adapter for Matrix media sends

When a live gateway adapter is available (i.e. the tool runs inside a
running gateway), reuse the persistent connection instead of creating a
new MatrixAdapter per call. This eliminates per-message E2EE re-init
storms that exhaust recipient OTKs and silently drop messages.

The fix follows the same pattern as _send_to_platform (line 618):
gateway_runner_ref → runner.adapters[Platform.MATRIX]. Falls back to
the ephemeral connect/disconnect cycle for standalone contexts.

Also extracts the shared send logic into _send_via_matrix_adapter()
to avoid duplicating the media dispatch code between the two paths.

Fixes #46310
This commit is contained in:
liuhao1024 2026-06-15 08:29:21 +08:00 committed by kshitij
parent 1466eab4ee
commit a7fd62d824
2 changed files with 213 additions and 41 deletions

View file

@ -913,6 +913,145 @@ class TestSendToPlatformChunking:
]
class TestMatrixMediaLiveAdapterReuse:
"""Verify _send_matrix_via_adapter reuses the live gateway adapter
when available, avoiding per-message E2EE re-init storms (#46310)."""
def test_live_adapter_skips_connect_disconnect(self, tmp_path):
"""When a live gateway adapter exists, no connect() or disconnect()
should be called the persistent E2EE session is reused."""
img_path = tmp_path / "photo.png"
img_path.write_bytes(b"\x89PNG\r\n")
calls = []
class LiveAdapter:
async def send(self, chat_id, message, metadata=None):
calls.append(("send", chat_id, message))
return SimpleNamespace(success=True, message_id="$text")
async def send_image_file(self, chat_id, path, metadata=None):
calls.append(("send_image_file", chat_id, path))
return SimpleNamespace(success=True, message_id="$img")
live_adapter = LiveAdapter()
fake_runner = SimpleNamespace(
adapters={Platform.MATRIX: live_adapter}
)
with patch(
"gateway.run._gateway_runner_ref",
return_value=fake_runner,
), patch.dict(
sys.modules, {"gateway.platforms.matrix": SimpleNamespace()}
):
result = asyncio.run(
_send_matrix_via_adapter(
SimpleNamespace(enabled=True, token="tok", extra={}),
"!room:example.com",
"here is an image",
media_files=[(str(img_path), False)],
)
)
assert result["success"] is True
assert result["message_id"] == "$img"
# Only send + send_image_file; no connect / disconnect
assert calls == [
("send", "!room:example.com", "here is an image"),
("send_image_file", "!room:example.com", str(img_path)),
]
def test_live_adapter_not_available_falls_back_to_ephemeral(self, tmp_path):
"""When _gateway_runner_ref returns None, the ephemeral adapter
path (connect + disconnect) is used as before."""
doc_path = tmp_path / "doc.pdf"
doc_path.write_bytes(b"%PDF-1.4")
calls = []
class EphemeralAdapter:
def __init__(self, _config):
pass
async def connect(self):
calls.append(("connect",))
return True
async def send(self, chat_id, message, metadata=None):
calls.append(("send", chat_id, message))
return SimpleNamespace(success=True, message_id="$txt")
async def send_document(self, chat_id, path, metadata=None):
calls.append(("send_document", chat_id, path))
return SimpleNamespace(success=True, message_id="$doc")
async def disconnect(self):
calls.append(("disconnect",))
fake_module = SimpleNamespace(MatrixAdapter=EphemeralAdapter)
with patch(
"gateway.run._gateway_runner_ref", return_value=None
), patch.dict(sys.modules, {"gateway.platforms.matrix": fake_module}):
result = asyncio.run(
_send_matrix_via_adapter(
SimpleNamespace(enabled=True, token="tok", extra={}),
"!room:example.com",
"report attached",
media_files=[(str(doc_path), False)],
)
)
assert result["success"] is True
assert calls == [
("connect",),
("send", "!room:example.com", "report attached"),
("send_document", "!room:example.com", str(doc_path)),
("disconnect",),
]
def test_live_adapter_no_matrix_adapter_falls_back(self):
"""When the runner exists but has no Matrix adapter registered,
fall back to ephemeral."""
calls = []
class EphemeralAdapter:
def __init__(self, _config):
pass
async def connect(self):
calls.append(("connect",))
return True
async def send(self, chat_id, message, metadata=None):
calls.append(("send",))
return SimpleNamespace(success=True, message_id="$txt")
async def disconnect(self):
calls.append(("disconnect",))
# Runner exists but adapters dict has no MATRIX key
fake_runner = SimpleNamespace(adapters={})
fake_module = SimpleNamespace(MatrixAdapter=EphemeralAdapter)
with patch(
"gateway.run._gateway_runner_ref",
return_value=fake_runner,
), patch.dict(sys.modules, {"gateway.platforms.matrix": fake_module}):
result = asyncio.run(
_send_matrix_via_adapter(
SimpleNamespace(enabled=True, token="tok", extra={}),
"!room:example.com",
"hello",
)
)
assert result["success"] is True
assert ("connect",) in calls
assert ("disconnect",) in calls
# ---------------------------------------------------------------------------
# HTML auto-detection in Telegram send
# ---------------------------------------------------------------------------

View file

@ -1468,56 +1468,50 @@ async def _send_signal(extra, chat_id, message, media_files=None):
async def _send_matrix_via_adapter(pconfig, chat_id, message, media_files=None, thread_id=None):
"""Send via the Matrix adapter so native Matrix media uploads are preserved."""
"""Send via the Matrix adapter so native Matrix media uploads are preserved.
When a live gateway adapter is available (i.e. the tool runs inside a
running gateway), the persistent connection is reused one olm/megolm
session for all sends. This avoids per-message E2EE re-init storms
that exhaust recipient OTKs and silently drop messages (issue #46310).
Falls back to an ephemeral connect/disconnect cycle only when no gateway
is running (standalone cron, ``hermes send`` CLI).
"""
media_files = media_files or []
metadata = {"thread_id": thread_id} if thread_id else None
# --- Try the live gateway adapter first (persistent E2EE session) ---
live_adapter = None
try:
from gateway.config import Platform
from gateway.run import _gateway_runner_ref
runner = _gateway_runner_ref()
if runner is not None:
live_adapter = runner.adapters.get(Platform.MATRIX)
except Exception:
live_adapter = None
if live_adapter is not None:
return await _send_via_matrix_adapter(
live_adapter, chat_id, message, media_files, metadata
)
# --- Fallback: ephemeral adapter (standalone / cron context) ---
try:
from plugins.platforms.matrix.adapter import MatrixAdapter
except ImportError:
return {"error": "Matrix dependencies not installed. Run: pip install 'mautrix[encryption]'"}
media_files = media_files or []
adapter = MatrixAdapter(pconfig)
try:
adapter = MatrixAdapter(pconfig)
connected = await adapter.connect()
if not connected:
return _error("Matrix connect failed")
metadata = {"thread_id": thread_id} if thread_id else None
last_result = None
if message.strip():
last_result = await adapter.send(chat_id, message, metadata=metadata)
if not last_result.success:
return _error(f"Matrix send failed: {last_result.error}")
for media_path, is_voice in media_files:
if not os.path.exists(media_path):
return _error(f"Media file not found: {media_path}")
ext = os.path.splitext(media_path)[1].lower()
if ext in _IMAGE_EXTS:
last_result = await adapter.send_image_file(chat_id, media_path, metadata=metadata)
elif ext in _VIDEO_EXTS:
last_result = await adapter.send_video(chat_id, media_path, metadata=metadata)
elif ext in _VOICE_EXTS and is_voice:
last_result = await adapter.send_voice(chat_id, media_path, metadata=metadata)
elif ext in _AUDIO_EXTS:
last_result = await adapter.send_voice(chat_id, media_path, metadata=metadata)
else:
last_result = await adapter.send_document(chat_id, media_path, metadata=metadata)
if not last_result.success:
return _error(f"Matrix media send failed: {last_result.error}")
if last_result is None:
return {"error": "No deliverable text or media remained after processing MEDIA tags"}
return {
"success": True,
"platform": "matrix",
"chat_id": chat_id,
"message_id": last_result.message_id,
}
return await _send_via_matrix_adapter(
adapter, chat_id, message, media_files, metadata
)
except Exception as e:
return _error(f"Matrix send failed: {e}")
finally:
@ -1527,6 +1521,45 @@ async def _send_matrix_via_adapter(pconfig, chat_id, message, media_files=None,
pass
async def _send_via_matrix_adapter(adapter, chat_id, message, media_files, metadata):
"""Core send logic shared by live and ephemeral Matrix adapters."""
last_result = None
if message.strip():
last_result = await adapter.send(chat_id, message, metadata=metadata)
if not last_result.success:
return _error(f"Matrix send failed: {last_result.error}")
for media_path, is_voice in media_files:
if not os.path.exists(media_path):
return _error(f"Media file not found: {media_path}")
ext = os.path.splitext(media_path)[1].lower()
if ext in _IMAGE_EXTS:
last_result = await adapter.send_image_file(chat_id, media_path, metadata=metadata)
elif ext in _VIDEO_EXTS:
last_result = await adapter.send_video(chat_id, media_path, metadata=metadata)
elif ext in _VOICE_EXTS and is_voice:
last_result = await adapter.send_voice(chat_id, media_path, metadata=metadata)
elif ext in _AUDIO_EXTS:
last_result = await adapter.send_voice(chat_id, media_path, metadata=metadata)
else:
last_result = await adapter.send_document(chat_id, media_path, metadata=metadata)
if not last_result.success:
return _error(f"Matrix media send failed: {last_result.error}")
if last_result is None:
return {"error": "No deliverable text or media remained after processing MEDIA tags"}
return {
"success": True,
"platform": "matrix",
"chat_id": chat_id,
"message_id": last_result.message_id,
}
# _send_dingtalk moved to plugins/platforms/dingtalk/adapter.py::_standalone_send,
# wired via standalone_sender_fn and reached through _registry_standalone_send. #41112.