fix(matrix): E2EE cron delivery via live adapter + HTML formatting + origin fallback

Salvaged from PRs #3767 (chalkers), #5236 (ygd58), #2641 (buntingszn).

Three improvements to Matrix cron delivery:

1. Live adapter path: when the gateway is running, cron delivery now uses
   the connected MatrixAdapter via run_coroutine_threadsafe instead of
   the standalone HTTP PUT. This enables delivery to E2EE rooms where
   the raw HTTP path cannot encrypt. Falls back to standalone on failure.
   Threads adapters + event loop from gateway -> cron ticker -> tick() ->
   _deliver_result(). (from #3767)

2. HTML formatted_body: _send_matrix() now converts markdown to HTML
   using the optional markdown library, with h1-h6 to bold conversion
   for Element X compatibility. Falls back to plain text if markdown
   is not installed. Also adds random bytes to txn_id to prevent
   collisions. (from #5236)

3. Origin fallback: when deliver="origin" but origin is null (jobs
   created via API/scripts), falls back to HOME_CHANNEL env vars
   in order: matrix -> telegram -> discord -> slack. (from #2641)
This commit is contained in:
Teknium 2026-04-05 10:52:29 -07:00 committed by Teknium
parent 36e046e843
commit c100ad874c
4 changed files with 100 additions and 24 deletions

View file

@ -74,13 +74,28 @@ def _resolve_delivery_target(job: dict) -> Optional[dict]:
return None
if deliver == "origin":
if not origin:
return None
return {
"platform": origin["platform"],
"chat_id": str(origin["chat_id"]),
"thread_id": origin.get("thread_id"),
}
if origin:
return {
"platform": origin["platform"],
"chat_id": str(origin["chat_id"]),
"thread_id": origin.get("thread_id"),
}
# Origin missing (e.g. job created via API/script) — try each
# platform's home channel as a fallback instead of silently dropping.
for platform_name in ("matrix", "telegram", "discord", "slack"):
chat_id = os.getenv(f"{platform_name.upper()}_HOME_CHANNEL", "")
if chat_id:
logger.info(
"Job '%s' has deliver=origin but no origin; falling back to %s home channel",
job.get("name", job.get("id", "?")),
platform_name,
)
return {
"platform": platform_name,
"chat_id": chat_id,
"thread_id": None,
}
return None
if ":" in deliver:
platform_name, rest = deliver.split(":", 1)
@ -130,12 +145,14 @@ def _resolve_delivery_target(job: dict) -> Optional[dict]:
}
def _deliver_result(job: dict, content: str) -> None:
def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> None:
"""
Deliver job output to the configured target (origin chat, specific platform, etc.).
Uses the standalone platform send functions from send_message_tool so delivery
works whether or not the gateway is running.
When ``adapters`` and ``loop`` are provided (gateway is running), tries to
use the live adapter first this supports E2EE rooms (e.g. Matrix) where
the standalone HTTP path cannot encrypt. Falls back to standalone send if
the adapter path fails or is unavailable.
"""
target = _resolve_delivery_target(job)
if not target:
@ -206,7 +223,33 @@ def _deliver_result(job: dict, content: str) -> None:
else:
delivery_content = content
# Run the async send in a fresh event loop (safe from any thread)
# Prefer the live adapter when the gateway is running — this supports E2EE
# rooms (e.g. Matrix) where the standalone HTTP path cannot encrypt.
runtime_adapter = (adapters or {}).get(platform)
if runtime_adapter is not None and loop is not None and getattr(loop, "is_running", lambda: False)():
send_metadata = {"thread_id": thread_id} if thread_id else None
try:
future = asyncio.run_coroutine_threadsafe(
runtime_adapter.send(chat_id, delivery_content, metadata=send_metadata),
loop,
)
send_result = future.result(timeout=60)
if send_result and not getattr(send_result, "success", True):
err = getattr(send_result, "error", "unknown")
logger.warning(
"Job '%s': live adapter send to %s:%s failed (%s), falling back to standalone",
job["id"], platform_name, chat_id, err,
)
else:
logger.info("Job '%s': delivered to %s:%s via live adapter", job["id"], platform_name, chat_id)
return
except Exception as e:
logger.warning(
"Job '%s': live adapter delivery to %s:%s failed (%s), falling back to standalone",
job["id"], platform_name, chat_id, e,
)
# Standalone path: run the async send in a fresh event loop (safe from any thread)
coro = _send_to_platform(platform, pconfig, chat_id, delivery_content, thread_id=thread_id)
try:
result = asyncio.run(coro)
@ -629,7 +672,7 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
logger.debug("Job '%s': failed to close SQLite session store: %s", job_id, e)
def tick(verbose: bool = True) -> int:
def tick(verbose: bool = True, adapters=None, loop=None) -> int:
"""
Check and run all due jobs.
@ -638,6 +681,8 @@ def tick(verbose: bool = True) -> int:
Args:
verbose: Whether to print status messages
adapters: Optional dict mapping Platform live adapter (from gateway)
loop: Optional asyncio event loop (from gateway) for live adapter sends
Returns:
Number of jobs executed (0 if another tick is already running)
@ -694,7 +739,7 @@ def tick(verbose: bool = True) -> int:
if should_deliver:
try:
_deliver_result(job, deliver_content)
_deliver_result(job, deliver_content, adapters=adapters, loop=loop)
except Exception as de:
logger.error("Delivery failed for job %s: %s", job["id"], de)

View file

@ -1056,10 +1056,20 @@ class MatrixAdapter(BasePlatformAdapter):
media_type = "application/octet-stream"
msg_type = MessageType.DOCUMENT
is_encrypted_image = isinstance(event, getattr(nio, "RoomEncryptedImage", ()))
is_encrypted_audio = isinstance(event, getattr(nio, "RoomEncryptedAudio", ()))
is_encrypted_video = isinstance(event, getattr(nio, "RoomEncryptedVideo", ()))
is_encrypted_file = isinstance(event, getattr(nio, "RoomEncryptedFile", ()))
# Safely resolve encrypted media classes — they may not exist on older
# nio versions, and in test environments nio may be mocked (MagicMock
# auto-attributes are not valid types for isinstance).
def _safe_isinstance(obj, cls_name):
cls = getattr(nio, cls_name, None)
if cls is None or not isinstance(cls, type):
return False
return isinstance(obj, cls)
is_encrypted_image = _safe_isinstance(event, "RoomEncryptedImage")
is_encrypted_audio = _safe_isinstance(event, "RoomEncryptedAudio")
is_encrypted_video = _safe_isinstance(event, "RoomEncryptedVideo")
is_encrypted_file = _safe_isinstance(event, "RoomEncryptedFile")
is_encrypted_media = any((is_encrypted_image, is_encrypted_audio, is_encrypted_video, is_encrypted_file))
is_voice_message = False

View file

@ -6843,13 +6843,16 @@ class GatewayRunner:
return response
def _start_cron_ticker(stop_event: threading.Event, adapters=None, interval: int = 60):
def _start_cron_ticker(stop_event: threading.Event, adapters=None, loop=None, interval: int = 60):
"""
Background thread that ticks the cron scheduler at a regular interval.
Runs inside the gateway process so cronjobs fire automatically without
needing a separate `hermes cron daemon` or system cron entry.
When ``adapters`` and ``loop`` are provided, passes them through to the
cron delivery path so live adapters can be used for E2EE rooms.
Also refreshes the channel directory every 5 minutes and prunes the
image/audio/document cache once per hour.
"""
@ -6863,7 +6866,7 @@ def _start_cron_ticker(stop_event: threading.Event, adapters=None, interval: int
tick_count = 0
while not stop_event.is_set():
try:
cron_tick(verbose=False)
cron_tick(verbose=False, adapters=adapters, loop=loop)
except Exception as e:
logger.debug("Cron tick error: %s", e)
@ -7049,12 +7052,13 @@ async def start_gateway(config: Optional[GatewayConfig] = None, replace: bool =
write_pid_file()
atexit.register(remove_pid_file)
# Start background cron ticker so scheduled jobs fire automatically
# Start background cron ticker so scheduled jobs fire automatically.
# Pass the event loop so cron delivery can use live adapters (E2EE support).
cron_stop = threading.Event()
cron_thread = threading.Thread(
target=_start_cron_ticker,
args=(cron_stop,),
kwargs={"adapters": runner.adapters},
kwargs={"adapters": runner.adapters, "loop": asyncio.get_running_loop()},
daemon=True,
name="cron-ticker",
)

View file

@ -719,7 +719,11 @@ async def _send_mattermost(token, extra, chat_id, message):
async def _send_matrix(token, extra, chat_id, message):
"""Send via Matrix Client-Server API."""
"""Send via Matrix Client-Server API.
Converts markdown to HTML for rich rendering in Matrix clients.
Falls back to plain text if the ``markdown`` library is not installed.
"""
try:
import aiohttp
except ImportError:
@ -729,11 +733,24 @@ async def _send_matrix(token, extra, chat_id, message):
token = token or os.getenv("MATRIX_ACCESS_TOKEN", "")
if not homeserver or not token:
return {"error": "Matrix not configured (MATRIX_HOMESERVER, MATRIX_ACCESS_TOKEN required)"}
txn_id = f"hermes_{int(time.time() * 1000)}"
txn_id = f"hermes_{int(time.time() * 1000)}_{os.urandom(4).hex()}"
url = f"{homeserver}/_matrix/client/v3/rooms/{chat_id}/send/m.room.message/{txn_id}"
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
# Build message payload with optional HTML formatted_body.
payload = {"msgtype": "m.text", "body": message}
try:
import markdown as _md
html = _md.markdown(message, extensions=["fenced_code", "tables"])
# Convert h1-h6 to bold for Element X compatibility.
html = re.sub(r"<h[1-6]>(.*?)</h[1-6]>", r"<strong>\1</strong>", html)
payload["format"] = "org.matrix.custom.html"
payload["formatted_body"] = html
except ImportError:
pass
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=30)) as session:
async with session.put(url, headers=headers, json={"msgtype": "m.text", "body": message}) as resp:
async with session.put(url, headers=headers, json=payload) as resp:
if resp.status not in (200, 201):
body = await resp.text()
return {"error": f"Matrix API error ({resp.status}): {body}"}