diff --git a/cron/scheduler.py b/cron/scheduler.py index 8b977f422..5d561066a 100644 --- a/cron/scheduler.py +++ b/cron/scheduler.py @@ -74,13 +74,28 @@ def _resolve_delivery_target(job: dict) -> Optional[dict]: return None if deliver == "origin": - if not origin: - return None - return { - "platform": origin["platform"], - "chat_id": str(origin["chat_id"]), - "thread_id": origin.get("thread_id"), - } + if origin: + return { + "platform": origin["platform"], + "chat_id": str(origin["chat_id"]), + "thread_id": origin.get("thread_id"), + } + # Origin missing (e.g. job created via API/script) — try each + # platform's home channel as a fallback instead of silently dropping. + for platform_name in ("matrix", "telegram", "discord", "slack"): + chat_id = os.getenv(f"{platform_name.upper()}_HOME_CHANNEL", "") + if chat_id: + logger.info( + "Job '%s' has deliver=origin but no origin; falling back to %s home channel", + job.get("name", job.get("id", "?")), + platform_name, + ) + return { + "platform": platform_name, + "chat_id": chat_id, + "thread_id": None, + } + return None if ":" in deliver: platform_name, rest = deliver.split(":", 1) @@ -130,12 +145,14 @@ def _resolve_delivery_target(job: dict) -> Optional[dict]: } -def _deliver_result(job: dict, content: str) -> None: +def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> None: """ Deliver job output to the configured target (origin chat, specific platform, etc.). - Uses the standalone platform send functions from send_message_tool so delivery - works whether or not the gateway is running. + When ``adapters`` and ``loop`` are provided (gateway is running), tries to + use the live adapter first — this supports E2EE rooms (e.g. Matrix) where + the standalone HTTP path cannot encrypt. Falls back to standalone send if + the adapter path fails or is unavailable. """ target = _resolve_delivery_target(job) if not target: @@ -206,7 +223,33 @@ def _deliver_result(job: dict, content: str) -> None: else: delivery_content = content - # Run the async send in a fresh event loop (safe from any thread) + # Prefer the live adapter when the gateway is running — this supports E2EE + # rooms (e.g. Matrix) where the standalone HTTP path cannot encrypt. + runtime_adapter = (adapters or {}).get(platform) + if runtime_adapter is not None and loop is not None and getattr(loop, "is_running", lambda: False)(): + send_metadata = {"thread_id": thread_id} if thread_id else None + try: + future = asyncio.run_coroutine_threadsafe( + runtime_adapter.send(chat_id, delivery_content, metadata=send_metadata), + loop, + ) + send_result = future.result(timeout=60) + if send_result and not getattr(send_result, "success", True): + err = getattr(send_result, "error", "unknown") + logger.warning( + "Job '%s': live adapter send to %s:%s failed (%s), falling back to standalone", + job["id"], platform_name, chat_id, err, + ) + else: + logger.info("Job '%s': delivered to %s:%s via live adapter", job["id"], platform_name, chat_id) + return + except Exception as e: + logger.warning( + "Job '%s': live adapter delivery to %s:%s failed (%s), falling back to standalone", + job["id"], platform_name, chat_id, e, + ) + + # Standalone path: run the async send in a fresh event loop (safe from any thread) coro = _send_to_platform(platform, pconfig, chat_id, delivery_content, thread_id=thread_id) try: result = asyncio.run(coro) @@ -629,7 +672,7 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]: logger.debug("Job '%s': failed to close SQLite session store: %s", job_id, e) -def tick(verbose: bool = True) -> int: +def tick(verbose: bool = True, adapters=None, loop=None) -> int: """ Check and run all due jobs. @@ -638,6 +681,8 @@ def tick(verbose: bool = True) -> int: Args: verbose: Whether to print status messages + adapters: Optional dict mapping Platform → live adapter (from gateway) + loop: Optional asyncio event loop (from gateway) for live adapter sends Returns: Number of jobs executed (0 if another tick is already running) @@ -694,7 +739,7 @@ def tick(verbose: bool = True) -> int: if should_deliver: try: - _deliver_result(job, deliver_content) + _deliver_result(job, deliver_content, adapters=adapters, loop=loop) except Exception as de: logger.error("Delivery failed for job %s: %s", job["id"], de) diff --git a/gateway/platforms/matrix.py b/gateway/platforms/matrix.py index 055c6e65f..25f701d95 100644 --- a/gateway/platforms/matrix.py +++ b/gateway/platforms/matrix.py @@ -1056,10 +1056,20 @@ class MatrixAdapter(BasePlatformAdapter): media_type = "application/octet-stream" msg_type = MessageType.DOCUMENT - is_encrypted_image = isinstance(event, getattr(nio, "RoomEncryptedImage", ())) - is_encrypted_audio = isinstance(event, getattr(nio, "RoomEncryptedAudio", ())) - is_encrypted_video = isinstance(event, getattr(nio, "RoomEncryptedVideo", ())) - is_encrypted_file = isinstance(event, getattr(nio, "RoomEncryptedFile", ())) + + # Safely resolve encrypted media classes — they may not exist on older + # nio versions, and in test environments nio may be mocked (MagicMock + # auto-attributes are not valid types for isinstance). + def _safe_isinstance(obj, cls_name): + cls = getattr(nio, cls_name, None) + if cls is None or not isinstance(cls, type): + return False + return isinstance(obj, cls) + + is_encrypted_image = _safe_isinstance(event, "RoomEncryptedImage") + is_encrypted_audio = _safe_isinstance(event, "RoomEncryptedAudio") + is_encrypted_video = _safe_isinstance(event, "RoomEncryptedVideo") + is_encrypted_file = _safe_isinstance(event, "RoomEncryptedFile") is_encrypted_media = any((is_encrypted_image, is_encrypted_audio, is_encrypted_video, is_encrypted_file)) is_voice_message = False diff --git a/gateway/run.py b/gateway/run.py index f14713249..52bc9f7a0 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -6843,13 +6843,16 @@ class GatewayRunner: return response -def _start_cron_ticker(stop_event: threading.Event, adapters=None, interval: int = 60): +def _start_cron_ticker(stop_event: threading.Event, adapters=None, loop=None, interval: int = 60): """ Background thread that ticks the cron scheduler at a regular interval. Runs inside the gateway process so cronjobs fire automatically without needing a separate `hermes cron daemon` or system cron entry. + When ``adapters`` and ``loop`` are provided, passes them through to the + cron delivery path so live adapters can be used for E2EE rooms. + Also refreshes the channel directory every 5 minutes and prunes the image/audio/document cache once per hour. """ @@ -6863,7 +6866,7 @@ def _start_cron_ticker(stop_event: threading.Event, adapters=None, interval: int tick_count = 0 while not stop_event.is_set(): try: - cron_tick(verbose=False) + cron_tick(verbose=False, adapters=adapters, loop=loop) except Exception as e: logger.debug("Cron tick error: %s", e) @@ -7049,12 +7052,13 @@ async def start_gateway(config: Optional[GatewayConfig] = None, replace: bool = write_pid_file() atexit.register(remove_pid_file) - # Start background cron ticker so scheduled jobs fire automatically + # Start background cron ticker so scheduled jobs fire automatically. + # Pass the event loop so cron delivery can use live adapters (E2EE support). cron_stop = threading.Event() cron_thread = threading.Thread( target=_start_cron_ticker, args=(cron_stop,), - kwargs={"adapters": runner.adapters}, + kwargs={"adapters": runner.adapters, "loop": asyncio.get_running_loop()}, daemon=True, name="cron-ticker", ) diff --git a/tools/send_message_tool.py b/tools/send_message_tool.py index d12eed509..32741f08a 100644 --- a/tools/send_message_tool.py +++ b/tools/send_message_tool.py @@ -719,7 +719,11 @@ async def _send_mattermost(token, extra, chat_id, message): async def _send_matrix(token, extra, chat_id, message): - """Send via Matrix Client-Server API.""" + """Send via Matrix Client-Server API. + + Converts markdown to HTML for rich rendering in Matrix clients. + Falls back to plain text if the ``markdown`` library is not installed. + """ try: import aiohttp except ImportError: @@ -729,11 +733,24 @@ async def _send_matrix(token, extra, chat_id, message): token = token or os.getenv("MATRIX_ACCESS_TOKEN", "") if not homeserver or not token: return {"error": "Matrix not configured (MATRIX_HOMESERVER, MATRIX_ACCESS_TOKEN required)"} - txn_id = f"hermes_{int(time.time() * 1000)}" + txn_id = f"hermes_{int(time.time() * 1000)}_{os.urandom(4).hex()}" url = f"{homeserver}/_matrix/client/v3/rooms/{chat_id}/send/m.room.message/{txn_id}" headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"} + + # Build message payload with optional HTML formatted_body. + payload = {"msgtype": "m.text", "body": message} + try: + import markdown as _md + html = _md.markdown(message, extensions=["fenced_code", "tables"]) + # Convert h1-h6 to bold for Element X compatibility. + html = re.sub(r"(.*?)", r"\1", html) + payload["format"] = "org.matrix.custom.html" + payload["formatted_body"] = html + except ImportError: + pass + async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=30)) as session: - async with session.put(url, headers=headers, json={"msgtype": "m.text", "body": message}) as resp: + async with session.put(url, headers=headers, json=payload) as resp: if resp.status not in (200, 201): body = await resp.text() return {"error": f"Matrix API error ({resp.status}): {body}"}