From a7fd62d8248611a78034f796aee83c08a5874980 Mon Sep 17 00:00:00 2001 From: liuhao1024 Date: Mon, 15 Jun 2026 08:29:21 +0800 Subject: [PATCH] fix(send_message): reuse live gateway adapter for Matrix media sends MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a live gateway adapter is available (i.e. the tool runs inside a running gateway), reuse the persistent connection instead of creating a new MatrixAdapter per call. This eliminates per-message E2EE re-init storms that exhaust recipient OTKs and silently drop messages. The fix follows the same pattern as _send_to_platform (line 618): gateway_runner_ref → runner.adapters[Platform.MATRIX]. Falls back to the ephemeral connect/disconnect cycle for standalone contexts. Also extracts the shared send logic into _send_via_matrix_adapter() to avoid duplicating the media dispatch code between the two paths. Fixes #46310 --- tests/tools/test_send_message_tool.py | 139 ++++++++++++++++++++++++++ tools/send_message_tool.py | 115 +++++++++++++-------- 2 files changed, 213 insertions(+), 41 deletions(-) diff --git a/tests/tools/test_send_message_tool.py b/tests/tools/test_send_message_tool.py index d9c0b64b7ab..439e3032cc6 100644 --- a/tests/tools/test_send_message_tool.py +++ b/tests/tools/test_send_message_tool.py @@ -913,6 +913,145 @@ class TestSendToPlatformChunking: ] +class TestMatrixMediaLiveAdapterReuse: + """Verify _send_matrix_via_adapter reuses the live gateway adapter + when available, avoiding per-message E2EE re-init storms (#46310).""" + + def test_live_adapter_skips_connect_disconnect(self, tmp_path): + """When a live gateway adapter exists, no connect() or disconnect() + should be called — the persistent E2EE session is reused.""" + img_path = tmp_path / "photo.png" + img_path.write_bytes(b"\x89PNG\r\n") + + calls = [] + + class LiveAdapter: + async def send(self, chat_id, message, metadata=None): + calls.append(("send", chat_id, message)) + return SimpleNamespace(success=True, message_id="$text") + + async def send_image_file(self, chat_id, path, metadata=None): + calls.append(("send_image_file", chat_id, path)) + return SimpleNamespace(success=True, message_id="$img") + + live_adapter = LiveAdapter() + fake_runner = SimpleNamespace( + adapters={Platform.MATRIX: live_adapter} + ) + + with patch( + "gateway.run._gateway_runner_ref", + return_value=fake_runner, + ), patch.dict( + sys.modules, {"gateway.platforms.matrix": SimpleNamespace()} + ): + result = asyncio.run( + _send_matrix_via_adapter( + SimpleNamespace(enabled=True, token="tok", extra={}), + "!room:example.com", + "here is an image", + media_files=[(str(img_path), False)], + ) + ) + + assert result["success"] is True + assert result["message_id"] == "$img" + # Only send + send_image_file; no connect / disconnect + assert calls == [ + ("send", "!room:example.com", "here is an image"), + ("send_image_file", "!room:example.com", str(img_path)), + ] + + def test_live_adapter_not_available_falls_back_to_ephemeral(self, tmp_path): + """When _gateway_runner_ref returns None, the ephemeral adapter + path (connect + disconnect) is used as before.""" + doc_path = tmp_path / "doc.pdf" + doc_path.write_bytes(b"%PDF-1.4") + + calls = [] + + class EphemeralAdapter: + def __init__(self, _config): + pass + + async def connect(self): + calls.append(("connect",)) + return True + + async def send(self, chat_id, message, metadata=None): + calls.append(("send", chat_id, message)) + return SimpleNamespace(success=True, message_id="$txt") + + async def send_document(self, chat_id, path, metadata=None): + calls.append(("send_document", chat_id, path)) + return SimpleNamespace(success=True, message_id="$doc") + + async def disconnect(self): + calls.append(("disconnect",)) + + fake_module = SimpleNamespace(MatrixAdapter=EphemeralAdapter) + + with patch( + "gateway.run._gateway_runner_ref", return_value=None + ), patch.dict(sys.modules, {"gateway.platforms.matrix": fake_module}): + result = asyncio.run( + _send_matrix_via_adapter( + SimpleNamespace(enabled=True, token="tok", extra={}), + "!room:example.com", + "report attached", + media_files=[(str(doc_path), False)], + ) + ) + + assert result["success"] is True + assert calls == [ + ("connect",), + ("send", "!room:example.com", "report attached"), + ("send_document", "!room:example.com", str(doc_path)), + ("disconnect",), + ] + + def test_live_adapter_no_matrix_adapter_falls_back(self): + """When the runner exists but has no Matrix adapter registered, + fall back to ephemeral.""" + calls = [] + + class EphemeralAdapter: + def __init__(self, _config): + pass + + async def connect(self): + calls.append(("connect",)) + return True + + async def send(self, chat_id, message, metadata=None): + calls.append(("send",)) + return SimpleNamespace(success=True, message_id="$txt") + + async def disconnect(self): + calls.append(("disconnect",)) + + # Runner exists but adapters dict has no MATRIX key + fake_runner = SimpleNamespace(adapters={}) + fake_module = SimpleNamespace(MatrixAdapter=EphemeralAdapter) + + with patch( + "gateway.run._gateway_runner_ref", + return_value=fake_runner, + ), patch.dict(sys.modules, {"gateway.platforms.matrix": fake_module}): + result = asyncio.run( + _send_matrix_via_adapter( + SimpleNamespace(enabled=True, token="tok", extra={}), + "!room:example.com", + "hello", + ) + ) + + assert result["success"] is True + assert ("connect",) in calls + assert ("disconnect",) in calls + + # --------------------------------------------------------------------------- # HTML auto-detection in Telegram send # --------------------------------------------------------------------------- diff --git a/tools/send_message_tool.py b/tools/send_message_tool.py index 054da5290f5..58021fd4ca5 100644 --- a/tools/send_message_tool.py +++ b/tools/send_message_tool.py @@ -1468,56 +1468,50 @@ async def _send_signal(extra, chat_id, message, media_files=None): async def _send_matrix_via_adapter(pconfig, chat_id, message, media_files=None, thread_id=None): - """Send via the Matrix adapter so native Matrix media uploads are preserved.""" + """Send via the Matrix adapter so native Matrix media uploads are preserved. + + When a live gateway adapter is available (i.e. the tool runs inside a + running gateway), the persistent connection is reused — one olm/megolm + session for all sends. This avoids per-message E2EE re-init storms + that exhaust recipient OTKs and silently drop messages (issue #46310). + + Falls back to an ephemeral connect/disconnect cycle only when no gateway + is running (standalone cron, ``hermes send`` CLI). + """ + media_files = media_files or [] + metadata = {"thread_id": thread_id} if thread_id else None + + # --- Try the live gateway adapter first (persistent E2EE session) --- + live_adapter = None + try: + from gateway.config import Platform + from gateway.run import _gateway_runner_ref + + runner = _gateway_runner_ref() + if runner is not None: + live_adapter = runner.adapters.get(Platform.MATRIX) + except Exception: + live_adapter = None + + if live_adapter is not None: + return await _send_via_matrix_adapter( + live_adapter, chat_id, message, media_files, metadata + ) + + # --- Fallback: ephemeral adapter (standalone / cron context) --- try: from plugins.platforms.matrix.adapter import MatrixAdapter except ImportError: return {"error": "Matrix dependencies not installed. Run: pip install 'mautrix[encryption]'"} - media_files = media_files or [] - + adapter = MatrixAdapter(pconfig) try: - adapter = MatrixAdapter(pconfig) connected = await adapter.connect() if not connected: return _error("Matrix connect failed") - - metadata = {"thread_id": thread_id} if thread_id else None - last_result = None - - if message.strip(): - last_result = await adapter.send(chat_id, message, metadata=metadata) - if not last_result.success: - return _error(f"Matrix send failed: {last_result.error}") - - for media_path, is_voice in media_files: - if not os.path.exists(media_path): - return _error(f"Media file not found: {media_path}") - - ext = os.path.splitext(media_path)[1].lower() - if ext in _IMAGE_EXTS: - last_result = await adapter.send_image_file(chat_id, media_path, metadata=metadata) - elif ext in _VIDEO_EXTS: - last_result = await adapter.send_video(chat_id, media_path, metadata=metadata) - elif ext in _VOICE_EXTS and is_voice: - last_result = await adapter.send_voice(chat_id, media_path, metadata=metadata) - elif ext in _AUDIO_EXTS: - last_result = await adapter.send_voice(chat_id, media_path, metadata=metadata) - else: - last_result = await adapter.send_document(chat_id, media_path, metadata=metadata) - - if not last_result.success: - return _error(f"Matrix media send failed: {last_result.error}") - - if last_result is None: - return {"error": "No deliverable text or media remained after processing MEDIA tags"} - - return { - "success": True, - "platform": "matrix", - "chat_id": chat_id, - "message_id": last_result.message_id, - } + return await _send_via_matrix_adapter( + adapter, chat_id, message, media_files, metadata + ) except Exception as e: return _error(f"Matrix send failed: {e}") finally: @@ -1527,6 +1521,45 @@ async def _send_matrix_via_adapter(pconfig, chat_id, message, media_files=None, pass +async def _send_via_matrix_adapter(adapter, chat_id, message, media_files, metadata): + """Core send logic shared by live and ephemeral Matrix adapters.""" + last_result = None + + if message.strip(): + last_result = await adapter.send(chat_id, message, metadata=metadata) + if not last_result.success: + return _error(f"Matrix send failed: {last_result.error}") + + for media_path, is_voice in media_files: + if not os.path.exists(media_path): + return _error(f"Media file not found: {media_path}") + + ext = os.path.splitext(media_path)[1].lower() + if ext in _IMAGE_EXTS: + last_result = await adapter.send_image_file(chat_id, media_path, metadata=metadata) + elif ext in _VIDEO_EXTS: + last_result = await adapter.send_video(chat_id, media_path, metadata=metadata) + elif ext in _VOICE_EXTS and is_voice: + last_result = await adapter.send_voice(chat_id, media_path, metadata=metadata) + elif ext in _AUDIO_EXTS: + last_result = await adapter.send_voice(chat_id, media_path, metadata=metadata) + else: + last_result = await adapter.send_document(chat_id, media_path, metadata=metadata) + + if not last_result.success: + return _error(f"Matrix media send failed: {last_result.error}") + + if last_result is None: + return {"error": "No deliverable text or media remained after processing MEDIA tags"} + + return { + "success": True, + "platform": "matrix", + "chat_id": chat_id, + "message_id": last_result.message_id, + } + + # _send_dingtalk moved to plugins/platforms/dingtalk/adapter.py::_standalone_send, # wired via standalone_sender_fn and reached through _registry_standalone_send. #41112.