diff --git a/gateway/platform_registry.py b/gateway/platform_registry.py index a52f6596927..96bfe1ccadf 100644 --- a/gateway/platform_registry.py +++ b/gateway/platform_registry.py @@ -30,7 +30,7 @@ Usage (gateway side): import logging from dataclasses import dataclass, field -from typing import Any, Callable, Optional +from typing import Any, Awaitable, Callable, Optional logger = logging.getLogger(__name__) @@ -125,6 +125,23 @@ class PlatformEntry: # resolve the default chat/room ID. Empty = no cron home-channel support. cron_deliver_env_var: str = "" + # ── Standalone (out-of-process) sending ── + # Optional: async coroutine that delivers a message without a live + # gateway adapter. Called by ``tools/send_message_tool._send_via_adapter`` + # when ``cron`` runs in a separate process from the gateway and the + # in-process adapter weakref is therefore ``None``. + # + # Signature: + # async (pconfig, chat_id, message, *, thread_id=None, + # media_files=None, force_document=False) -> dict + # + # Returns ``{"success": True, "message_id": ...}`` on success or + # ``{"error": str}`` on failure. Plugin authors typically open an + # ephemeral connection / acquire a fresh OAuth token, send, and close. + # Without this hook, plugin platforms cannot serve as cron ``deliver=`` + # targets when the gateway is not co-resident with the cron process. + standalone_sender_fn: Optional[Callable[..., Awaitable[dict]]] = None + class PlatformRegistry: """Central registry of platform adapters. diff --git a/gateway/platforms/ADDING_A_PLATFORM.md b/gateway/platforms/ADDING_A_PLATFORM.md index 5091c4647c2..80ebd27c5da 100644 --- a/gateway/platforms/ADDING_A_PLATFORM.md +++ b/gateway/platforms/ADDING_A_PLATFORM.md @@ -14,7 +14,7 @@ The plugin system automatically handles: adapter creation, config parsing, user authorization, cron delivery, send_message routing, system prompt hints, status display, gateway setup, and more. -**Three optional hooks cover the edges most adapters need:** +**Optional hooks cover the edges most adapters need:** - `env_enablement_fn: () -> Optional[dict]` — seeds `PlatformConfig.extra` (and an optional `home_channel` dict) from env vars BEFORE the adapter is @@ -24,6 +24,11 @@ status display, gateway setup, and more. - `cron_deliver_env_var: str` — name of the `*_HOME_CHANNEL` env var. When set, `deliver=` cron jobs route to this var without editing `cron/scheduler.py`'s hardcoded sets. +- `standalone_sender_fn: async (...) -> dict`: out-of-process delivery + for cron jobs that run separately from the gateway. Without this, a + `deliver=` job fires correctly but the actual send returns + `No live adapter for platform ''`. Pair with `cron_deliver_env_var` + for end-to-end cron support. See the docsite for the signature. - `plugin.yaml` `requires_env` / `optional_env` rich-dict entries — auto-populate `OPTIONAL_ENV_VARS` in `hermes_cli/config.py` so the setup wizard surfaces proper descriptions, prompts, password flags, and URLs. diff --git a/plugins/platforms/google_chat/adapter.py b/plugins/platforms/google_chat/adapter.py index a4f8d73ec96..e4c1b5dbeea 100644 --- a/plugins/platforms/google_chat/adapter.py +++ b/plugins/platforms/google_chat/adapter.py @@ -3036,6 +3036,165 @@ def interactive_setup() -> None: print_info("Restart the gateway: hermes gateway restart") +# Strict resource-name pattern. ``spaces/`` and ``users/`` must +# only contain Google Chat's documented character set; anything else +# means a tampered chat_id trying to break out of the REST URL path +# (path traversal, ``?`` query injection, ``#`` fragment truncation). +_GCHAT_CHAT_ID_RE = re.compile(r"^(?:spaces|users)/[A-Za-z0-9_-]+$") + + +async def _standalone_send( + pconfig, + chat_id: str, + message: str, + *, + thread_id: Optional[str] = None, + media_files: Optional[List[str]] = None, + force_document: bool = False, +) -> Dict[str, Any]: + """POST a single Google Chat message via the REST API without the SDK. + + Used by ``tools/send_message_tool._send_via_adapter`` when the gateway + runner is not in this process (e.g. ``hermes cron`` running as a + separate process from ``hermes gateway``). Without this hook, + ``deliver=google_chat`` cron jobs fail with ``No live adapter for + platform``. + + Configuration: requires service-account credentials via + ``GOOGLE_CHAT_SERVICE_ACCOUNT_JSON``, ``GOOGLE_APPLICATION_CREDENTIALS``, + or Application Default Credentials, and a space resource name as + ``chat_id`` (e.g. ``spaces/AAAA-BBBB`` or ``users/``). + + Security: ``chat_id`` is validated against the documented Google Chat + resource-name character set before substitution into the REST URL so + a tampered value cannot path-traverse or query-inject. + + ``media_files`` and ``force_document`` are accepted for signature + parity but are not implemented for the standalone path; messages with + attachments send as text-only. The live adapter handles attachments. + """ + if not chat_id: + return {"error": "Google Chat standalone send: chat_id (space resource) is required"} + if not _GCHAT_CHAT_ID_RE.match(chat_id): + return {"error": ( + f"Google Chat standalone send: chat_id {chat_id!r} must match " + f"'spaces/' or 'users/' with only [A-Za-z0-9_-] in the id" + )} + if thread_id is not None and not re.match(r"^spaces/[A-Za-z0-9_-]+/threads/[A-Za-z0-9_-]+$", thread_id): + return {"error": ( + f"Google Chat standalone send: thread_id {thread_id!r} must match " + f"'spaces//threads/'" + )} + + extra = getattr(pconfig, "extra", {}) or {} + sa_value = ( + extra.get("service_account_json") + or os.getenv("GOOGLE_CHAT_SERVICE_ACCOUNT_JSON") + or os.getenv("GOOGLE_APPLICATION_CREDENTIALS") + ) + + if service_account is None: + return {"error": "Google Chat standalone send: google-auth not installed"} + + try: + from google.auth.transport.requests import Request as _GoogleAuthRequest + except Exception as e: + return {"error": f"Google Chat standalone send: google-auth import failed: {e}"} + + try: + if sa_value: + stripped = sa_value.lstrip() + if stripped.startswith("{"): + try: + info = json.loads(sa_value) + except json.JSONDecodeError as exc: + return {"error": f"Google Chat standalone send: inline SA JSON is invalid: {exc}"} + creds = service_account.Credentials.from_service_account_info(info, scopes=_CHAT_SCOPES) + else: + if not os.path.exists(sa_value): + return {"error": f"Google Chat standalone send: SA JSON file not found at {sa_value}"} + try: + with open(sa_value, "r", encoding="utf-8") as fh: + info = json.load(fh) + except json.JSONDecodeError as exc: + return {"error": f"Google Chat standalone send: SA JSON file is invalid: {exc}"} + creds = service_account.Credentials.from_service_account_info(info, scopes=_CHAT_SCOPES) + else: + try: + import google.auth as _google_auth + except ImportError: + return {"error": ( + "Google Chat standalone send: no SA credentials configured " + "and google-auth is not installed for ADC fallback" + )} + try: + creds, _project = _google_auth.default(scopes=_CHAT_SCOPES) + except Exception as exc: + return {"error": ( + f"Google Chat standalone send: no SA credentials configured " + f"and Application Default Credentials are unavailable: {exc}" + )} + except asyncio.CancelledError: + raise + except Exception as e: + return {"error": f"Google Chat standalone send: credential load failed: {e}"} + + # Bound the synchronous urllib3-backed token refresh so a hung Google + # STS endpoint cannot stall the cron scheduler indefinitely. + try: + await asyncio.wait_for( + asyncio.to_thread(creds.refresh, _GoogleAuthRequest()), + timeout=10.0, + ) + except asyncio.TimeoutError: + return {"error": "Google Chat standalone send: token refresh timed out"} + except asyncio.CancelledError: + raise + except Exception as e: + return {"error": f"Google Chat standalone send: token refresh failed: {e}"} + + token = getattr(creds, "token", None) + if not token: + return {"error": "Google Chat standalone send: refreshed credentials have no token"} + + body: Dict[str, Any] = {"text": message} + if thread_id: + body["thread"] = {"name": thread_id} + + url = f"https://chat.googleapis.com/v1/{chat_id}/messages" + try: + import aiohttp as _aiohttp + except ImportError: + return {"error": "Google Chat standalone send: aiohttp not installed"} + + try: + async with _aiohttp.ClientSession(timeout=_aiohttp.ClientTimeout(total=30.0)) as session: + async with session.post( + url, + json=body, + headers={ + "Authorization": f"Bearer {token}", + "Content-Type": "application/json", + }, + ) as resp: + if resp.status >= 400: + text = await resp.text() + return {"error": ( + f"Google Chat standalone send: API returned " + f"{resp.status}: {text[:300]}" + )} + payload = await resp.json() + return { + "success": True, + "message_id": payload.get("name"), + } + except asyncio.CancelledError: + raise + except Exception as e: + logger.debug("Google Chat standalone send raised", exc_info=True) + return {"error": f"Google Chat standalone send failed: {e}"} + + def register(ctx) -> None: """Plugin entry point — called by the Hermes plugin system at startup. @@ -3069,6 +3228,10 @@ def register(ctx) -> None: # cron jobs route to the configured home space without editing # cron/scheduler.py's hardcoded sets. cron_deliver_env_var="GOOGLE_CHAT_HOME_CHANNEL", + # Out-of-process cron delivery via the Chat REST API. Without this + # hook, deliver=google_chat cron jobs fail with "No live adapter" + # when cron runs separately from the gateway. + standalone_sender_fn=_standalone_send, # Auth env vars for _is_user_authorized() integration. allowed_users_env="GOOGLE_CHAT_ALLOWED_USERS", allow_all_env="GOOGLE_CHAT_ALLOW_ALL_USERS", diff --git a/plugins/platforms/irc/adapter.py b/plugins/platforms/irc/adapter.py index c3284344353..ff10475d4e1 100644 --- a/plugins/platforms/irc/adapter.py +++ b/plugins/platforms/irc/adapter.py @@ -53,11 +53,6 @@ from gateway.session import SessionSource from gateway.config import PlatformConfig, Platform -def _ensure_imports(): - """No-op — kept for backward compatibility with any call sites.""" - pass - - # --------------------------------------------------------------------------- # IRC protocol helpers # --------------------------------------------------------------------------- @@ -704,8 +699,233 @@ def _env_enablement() -> dict | None: return seed +def _strip_irc_control_chars(text: str) -> str: + """Strip IRC line terminators and the NUL byte from ``text``. + + IRC commands are CRLF-delimited; a bare ``\\r`` or ``\\n`` in user + content lets an attacker inject arbitrary IRC commands (CTCP, JOIN, + KICK). ``\\x00`` is a protocol-illegal byte. Everything else is + valid in PRIVMSG payloads. + """ + return text.replace("\r", " ").replace("\n", " ").replace("\x00", "") + + +def _is_irc_channel(target: str) -> bool: + return bool(target) and target[0] in "#&+!" + + +async def _standalone_send( + pconfig, + chat_id: str, + message: str, + *, + thread_id: Optional[str] = None, + media_files: Optional[List[str]] = None, + force_document: bool = False, +) -> Dict[str, Any]: + """Open an ephemeral IRC connection, send a PRIVMSG, and quit. + + Used by ``tools/send_message_tool._send_via_adapter`` when the gateway + runner is not in this process (e.g. ``hermes cron`` running as a + separate process from ``hermes gateway``). Without this hook, + ``deliver=irc`` cron jobs fail with ``No live adapter for platform``. + + The standalone client uses a distinct nick suffix (``-cron``) so it + does not collide with the long-running gateway adapter that may already + be holding the configured nickname on the same network. When the + target is a channel, the client JOINs it before sending PRIVMSG so + networks with the default ``+n`` (no external messages) channel mode + accept the delivery. + + ``thread_id`` and ``media_files`` are accepted for signature parity but + are not meaningful on IRC: IRC has no native thread or attachment + primitive. + """ + extra = getattr(pconfig, "extra", {}) or {} + server = os.getenv("IRC_SERVER") or extra.get("server", "") + channel = os.getenv("IRC_CHANNEL") or extra.get("channel", "") + if not server or not channel: + return {"error": "IRC standalone send: IRC_SERVER and IRC_CHANNEL must be configured"} + + port_value = os.getenv("IRC_PORT") or extra.get("port", 6697) + try: + port = int(port_value) + except (TypeError, ValueError): + return {"error": f"IRC standalone send: invalid port {port_value!r}"} + + nickname = os.getenv("IRC_NICKNAME") or extra.get("nickname", "hermes-bot") + use_tls_env = os.getenv("IRC_USE_TLS") + if use_tls_env is not None: + use_tls = use_tls_env.lower() in ("1", "true", "yes") + else: + use_tls = bool(extra.get("use_tls", True)) + + server_password = os.getenv("IRC_SERVER_PASSWORD") or extra.get("server_password", "") + nickserv_password = os.getenv("IRC_NICKSERV_PASSWORD") or extra.get("nickserv_password", "") + + # Reject control characters in chat_id to block IRC command injection. + raw_target = chat_id or channel + if any(ch in raw_target for ch in ("\r", "\n", "\x00", " ")): + return {"error": "IRC standalone send: chat_id contains illegal IRC characters"} + target = raw_target + + # Distinct nick prevents NICK collision with a live gateway adapter + # that may already be holding the configured nickname. Cap to 24 chars + # so subsequent collision retries do not overflow the 30-char NICKLEN + # most networks enforce. + nick_base = nickname.rstrip("_0123456789-")[:24] or "hermes-bot" + standalone_nick = f"{nick_base}-cron"[:30] + plain = IRCAdapter._strip_markdown(message) + + ssl_ctx = ssl.create_default_context() if use_tls else None + try: + reader, writer = await asyncio.wait_for( + asyncio.open_connection(server, port, ssl=ssl_ctx), + timeout=15.0, + ) + except asyncio.CancelledError: + raise + except Exception as e: + return {"error": f"IRC standalone connect failed: {e}"} + + async def _raw(line: str) -> None: + writer.write((line + "\r\n").encode("utf-8")) + await writer.drain() + + nick_attempts = 0 + max_nick_attempts = 5 + try: + if server_password: + await _raw(f"PASS {_strip_irc_control_chars(server_password)}") + await _raw(f"NICK {standalone_nick}") + await _raw(f"USER {standalone_nick} 0 * :Hermes Agent (cron)") + + loop = asyncio.get_running_loop() + deadline = loop.time() + 15.0 + registered = False + while not registered: + remaining = deadline - loop.time() + if remaining <= 0: + return {"error": "IRC standalone send: registration timeout (no RPL_WELCOME)"} + try: + raw_line = await asyncio.wait_for(reader.readuntil(b"\r\n"), timeout=remaining) + except asyncio.TimeoutError: + return {"error": "IRC standalone send: registration timeout (no RPL_WELCOME)"} + except asyncio.IncompleteReadError: + return {"error": "IRC standalone send: server closed connection during registration"} + decoded = raw_line.decode("utf-8", errors="replace").rstrip("\r\n") + msg = _parse_irc_message(decoded) + cmd = msg["command"] + if cmd == "PING": + payload = msg["params"][0] if msg["params"] else "" + await _raw(f"PONG :{payload}") + elif cmd == "001": + registered = True + elif cmd in ("432", "433"): + nick_attempts += 1 + if nick_attempts > max_nick_attempts: + return {"error": "IRC standalone send: too many nick collisions"} + # Build the next nick from the stable base, not the + # mutated value, so the suffix stays bounded. + standalone_nick = f"{nick_base}-cron-{nick_attempts}"[:30] + await _raw(f"NICK {standalone_nick}") + elif cmd in ("464", "465"): + return {"error": f"IRC standalone send: server rejected client ({cmd})"} + + if nickserv_password: + await _raw(f"PRIVMSG NickServ :IDENTIFY {_strip_irc_control_chars(nickserv_password)}") + await asyncio.sleep(2) + + # JOIN before PRIVMSG. IRC channels with the default ``+n`` mode + # (no external messages: Libera, OFTC, EFnet, IRCNet, undernet) + # silently drop PRIVMSG from non-members. Do not JOIN bare nicks + # (DM target) or server queries. + if _is_irc_channel(target): + await _raw(f"JOIN {target}") + join_deadline = loop.time() + 5.0 + joined = False + while not joined: + remaining = join_deadline - loop.time() + if remaining <= 0: + # Timed out waiting for a JOIN ack: proceed anyway, the + # server may still deliver the PRIVMSG depending on mode. + break + try: + raw_line = await asyncio.wait_for(reader.readuntil(b"\r\n"), timeout=remaining) + except (asyncio.TimeoutError, asyncio.IncompleteReadError): + break + decoded = raw_line.decode("utf-8", errors="replace").rstrip("\r\n") + jmsg = _parse_irc_message(decoded) + jcmd = jmsg["command"] + if jcmd == "PING": + payload = jmsg["params"][0] if jmsg["params"] else "" + await _raw(f"PONG :{payload}") + elif jcmd in ("366", "JOIN"): + joined = True + elif jcmd in ("403", "405", "471", "473", "474", "475"): + return {"error": f"IRC standalone send: JOIN {target} rejected ({jcmd})"} + + # Bytes-aware per-line splitting so multi-line plain text never + # exceeds the IRC 510-byte protocol limit. Reuses the same + # algorithm as IRCAdapter._split_message, with control-character + # stripping per line to block CRLF injection from message content. + overhead = len(f"PRIVMSG {target} :".encode("utf-8")) + 2 + max_bytes = 510 - overhead + sent_any = False + for paragraph in plain.split("\n"): + paragraph = _strip_irc_control_chars(paragraph).rstrip() + if not paragraph: + continue + while paragraph: + encoded = paragraph.encode("utf-8") + if len(encoded) <= max_bytes: + await _raw(f"PRIVMSG {target} :{paragraph}") + await asyncio.sleep(0.3) + sent_any = True + break + # Binary search for largest prefix that fits within max_bytes + low, high, best = 1, len(paragraph), 0 + while low <= high: + mid = (low + high) // 2 + if len(paragraph[:mid].encode("utf-8")) <= max_bytes: + best = mid + low = mid + 1 + else: + high = mid - 1 + split_at = best + space = paragraph.rfind(" ", 0, split_at) + if space > split_at // 3: + split_at = space + await _raw(f"PRIVMSG {target} :{paragraph[:split_at].rstrip()}") + await asyncio.sleep(0.3) + sent_any = True + paragraph = paragraph[split_at:].lstrip() + + if not sent_any: + return {"error": "IRC standalone send: empty message after stripping"} + + await _raw("QUIT :delivered") + try: + await asyncio.wait_for(reader.read(1024), timeout=2.0) + except asyncio.TimeoutError: + pass + + return {"success": True, "message_id": str(int(time.time() * 1000))} + except asyncio.CancelledError: + raise + except Exception as e: + logger.debug("IRC standalone send raised", exc_info=True) + return {"error": f"IRC standalone send failed: {e}"} + finally: + try: + writer.close() + await asyncio.wait_for(writer.wait_closed(), timeout=5.0) + except (asyncio.TimeoutError, Exception): + pass + + def register(ctx): - """Plugin entry point — called by the Hermes plugin system.""" + """Plugin entry point: called by the Hermes plugin system.""" ctx.register_platform( name="irc", label="IRC", @@ -716,7 +936,7 @@ def register(ctx): required_env=["IRC_SERVER", "IRC_CHANNEL", "IRC_NICKNAME"], install_hint="No extra packages needed (stdlib only)", setup_fn=interactive_setup, - # Env-driven auto-configuration — seeds PlatformConfig.extra with + # Env-driven auto-configuration: seeds PlatformConfig.extra with # server/channel/port/tls + home_channel so env-only setups show # up in gateway status without instantiating the adapter. env_enablement_fn=_env_enablement, @@ -724,6 +944,10 @@ def register(ctx): # IRC_CHANNEL (see _env_enablement), so cron jobs with # deliver=irc route to the joined channel by default. cron_deliver_env_var="IRC_HOME_CHANNEL", + # Out-of-process cron delivery. Without this hook, deliver=irc + # cron jobs fail with "No live adapter" when cron runs separately + # from the gateway. + standalone_sender_fn=_standalone_send, # Auth env vars for _is_user_authorized() integration allowed_users_env="IRC_ALLOWED_USERS", allow_all_env="IRC_ALLOW_ALL_USERS", diff --git a/plugins/platforms/teams/adapter.py b/plugins/platforms/teams/adapter.py index 5c4a2cf0ce9..34ebeea1755 100644 --- a/plugins/platforms/teams/adapter.py +++ b/plugins/platforms/teams/adapter.py @@ -418,6 +418,9 @@ def _env_enablement() -> dict | None: seed["port"] = int(port) except ValueError: pass + service_url = os.getenv("TEAMS_SERVICE_URL", "").strip() + if service_url: + seed["service_url"] = service_url home = os.getenv("TEAMS_HOME_CHANNEL", "").strip() if home: seed["home_channel"] = { @@ -427,6 +430,173 @@ def _env_enablement() -> dict | None: return seed +# Bot Framework default service URL for the global Teams endpoint. Some +# regional/government tenants need a different host (e.g. +# ``https://smba.infra.gov.teams.microsoft.us/``) which can be supplied via +# ``TEAMS_SERVICE_URL`` or ``extra['service_url']``. +_DEFAULT_TEAMS_SERVICE_URL = "https://smba.trafficmanager.net/teams/" + +# Allowlist of Bot Framework service hosts that may receive a freshly +# minted bearer token. Operator-supplied URLs are matched against this +# allowlist to block SSRF / token-exfiltration via a tampered env var. +_ALLOWED_TEAMS_SERVICE_HOSTS = frozenset({ + "smba.trafficmanager.net", + "smba.infra.gov.teams.microsoft.us", +}) + +# Conservative pattern for Bot Framework conversation IDs. Real values +# combine digits, colons, hyphens, dots, '@', and the ``thread.skype`` / +# ``thread.tacv2`` suffixes; reject anything outside this set so a hostile +# value cannot path-traverse out of ``/v3/conversations//activities``. +import re as _re_teams +_TEAMS_CONV_ID_RE = _re_teams.compile(r"^[A-Za-z0-9:@\-_.]+$") + + +def _validate_teams_service_url(raw: str) -> Optional[str]: + """Return a normalized service URL or ``None`` if it is not allowed. + + Requires ``https://`` and a host in ``_ALLOWED_TEAMS_SERVICE_HOSTS``. + The trailing slash is added if absent so callers can append + ``v3/conversations/...`` without double slashes. + """ + if not raw: + return None + try: + from urllib.parse import urlparse + + parsed = urlparse(raw) + except Exception: + return None + if parsed.scheme != "https": + return None + if parsed.hostname not in _ALLOWED_TEAMS_SERVICE_HOSTS: + return None + normalized = raw if raw.endswith("/") else raw + "/" + return normalized + + +async def _standalone_send( + pconfig, + chat_id: str, + message: str, + *, + thread_id: Optional[str] = None, + media_files: Optional[list] = None, + force_document: bool = False, +) -> Dict[str, Any]: + """Acquire a Bot Framework bearer token and POST a single message activity. + + Used by ``tools/send_message_tool._send_via_adapter`` when the gateway + runner is not in this process (e.g. ``hermes cron`` running as a + separate process from ``hermes gateway``). Without this hook, + ``deliver=teams`` cron jobs fail with ``No live adapter for platform``. + + Configuration: requires ``TEAMS_CLIENT_ID``, ``TEAMS_CLIENT_SECRET``, + ``TEAMS_TENANT_ID``, ``TEAMS_HOME_CHANNEL`` (the conversation ID), and + optionally ``TEAMS_SERVICE_URL`` (Bot Framework service host; must be + a known Bot Framework endpoint, see ``_ALLOWED_TEAMS_SERVICE_HOSTS``). + + Security: ``service_url`` is validated against an allowlist of known + Bot Framework hosts to block SSRF / token-exfiltration via a tampered + env var. ``chat_id`` is validated to match the documented Bot + Framework ID character set so it cannot escape the URL path. + + ``media_files`` and ``force_document`` are accepted for signature + parity but not implemented for the standalone path; messages with + attachments will send as text-only. The live adapter handles + attachments via the SDK. + """ + extra = getattr(pconfig, "extra", {}) or {} + client_id = os.getenv("TEAMS_CLIENT_ID") or extra.get("client_id", "") + client_secret = os.getenv("TEAMS_CLIENT_SECRET") or extra.get("client_secret", "") + tenant_id = os.getenv("TEAMS_TENANT_ID") or extra.get("tenant_id", "") + if not (client_id and client_secret and tenant_id): + return {"error": "Teams standalone send: TEAMS_CLIENT_ID, TEAMS_CLIENT_SECRET, and TEAMS_TENANT_ID are all required"} + + raw_service_url = ( + os.getenv("TEAMS_SERVICE_URL") + or extra.get("service_url", "") + or _DEFAULT_TEAMS_SERVICE_URL + ) + service_url = _validate_teams_service_url(raw_service_url) + if service_url is None: + return {"error": ( + f"Teams standalone send: TEAMS_SERVICE_URL host is not on the " + f"Bot Framework allowlist; expected one of " + f"{sorted(_ALLOWED_TEAMS_SERVICE_HOSTS)}" + )} + + # Bot Framework conversation IDs are restricted to a known character + # set; anything else means a tampered chat_id trying to break out of + # the URL path. + if not chat_id: + return {"error": "Teams standalone send: chat_id (conversation ID) is required"} + if not _TEAMS_CONV_ID_RE.match(chat_id): + return {"error": "Teams standalone send: chat_id contains characters outside the Bot Framework conversation ID set"} + if not _TEAMS_CONV_ID_RE.match(tenant_id): + return {"error": "Teams standalone send: TEAMS_TENANT_ID contains characters outside the expected set"} + + token_url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token" + activities_url = f"{service_url}v3/conversations/{chat_id}/activities" + + if not AIOHTTP_AVAILABLE: + return {"error": "Teams standalone send: aiohttp not installed"} + + try: + import aiohttp as _aiohttp + + # Per-request timeouts so a slow STS endpoint cannot starve the + # subsequent activity POST of its budget. + per_request_timeout = _aiohttp.ClientTimeout(total=15.0) + async with _aiohttp.ClientSession() as session: + async with session.post( + token_url, + data={ + "grant_type": "client_credentials", + "client_id": client_id, + "client_secret": client_secret, + "scope": "https://api.botframework.com/.default", + }, + headers={"Content-Type": "application/x-www-form-urlencoded"}, + timeout=per_request_timeout, + ) as token_resp: + if token_resp.status >= 400: + body = await token_resp.text() + return {"error": f"Teams standalone send: token request failed ({token_resp.status}): {body[:300]}"} + token_payload = await token_resp.json() + access_token = token_payload.get("access_token") + if not access_token: + return {"error": "Teams standalone send: token response missing access_token"} + + activity = { + "type": "message", + "text": message, + "textFormat": "markdown", + } + async with session.post( + activities_url, + json=activity, + headers={ + "Authorization": f"Bearer {access_token}", + "Content-Type": "application/json", + }, + timeout=per_request_timeout, + ) as send_resp: + if send_resp.status >= 400: + body = await send_resp.text() + return {"error": f"Teams standalone send: activity post failed ({send_resp.status}): {body[:300]}"} + send_payload = await send_resp.json() + return { + "success": True, + "message_id": send_payload.get("id"), + } + except asyncio.CancelledError: + raise + except Exception as e: + logger.debug("Teams standalone send raised", exc_info=True) + return {"error": f"Teams standalone send failed: {e}"} + + # Keep the old name as an alias so existing test imports don't break. check_teams_requirements = check_requirements @@ -985,6 +1155,10 @@ def register(ctx) -> None: # jobs route to the configured Teams chat/channel without editing # cron/scheduler.py's hardcoded sets. cron_deliver_env_var="TEAMS_HOME_CHANNEL", + # Out-of-process cron delivery via Bot Framework REST. Without + # this hook, deliver=teams cron jobs fail with "No live adapter" + # when cron runs separately from the gateway. + standalone_sender_fn=_standalone_send, # Auth env vars for _is_user_authorized() integration allowed_users_env="TEAMS_ALLOWED_USERS", allow_all_env="TEAMS_ALLOW_ALL_USERS", diff --git a/tests/gateway/test_google_chat.py b/tests/gateway/test_google_chat.py index 248650aaf82..3f093bcea1d 100644 --- a/tests/gateway/test_google_chat.py +++ b/tests/gateway/test_google_chat.py @@ -2696,3 +2696,173 @@ class TestCronSchedulerRegistry: from cron.scheduler import _resolve_home_env_var assert _resolve_home_env_var("google_chat") == "GOOGLE_CHAT_HOME_CHANNEL" + + +# ── _standalone_send (out-of-process cron delivery) ────────────────────── + + +class _FakeAiohttpResponse: + def __init__(self, status: int, payload, text_body: str = ""): + self.status = status + self._payload = payload + self._text = text_body or (str(payload) if payload is not None else "") + + async def json(self): + return self._payload + + async def text(self): + return self._text + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc, tb): + return None + + +class _FakeAiohttpSession: + def __init__(self, scripts): + self._scripts = list(scripts) + self.calls: list[tuple[str, dict]] = [] + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc, tb): + return None + + def post(self, url, **kwargs): + self.calls.append((url, kwargs)) + if not self._scripts: + raise AssertionError(f"No scripted response for POST {url}") + return self._scripts.pop(0) + + +def _install_fake_aiohttp(monkeypatch, session): + fake_aiohttp = types.SimpleNamespace( + ClientSession=lambda timeout=None: session, + ClientTimeout=lambda total=None: None, + ) + monkeypatch.setitem(sys.modules, "aiohttp", fake_aiohttp) + + +def _install_fake_google_auth_transport(monkeypatch): + fake_request_module = types.SimpleNamespace(Request=lambda: object()) + monkeypatch.setitem(sys.modules, "google.auth.transport", types.SimpleNamespace(requests=fake_request_module)) + monkeypatch.setitem(sys.modules, "google.auth.transport.requests", fake_request_module) + + +class TestGoogleChatStandaloneSend: + + @pytest.mark.asyncio + async def test_standalone_send_refreshes_token_and_posts_message( + self, monkeypatch, tmp_path + ): + sa_file = tmp_path / "sa.json" + sa_file.write_text(json.dumps({ + "type": "service_account", + "client_email": "bot@example.iam.gserviceaccount.com", + "private_key": "fake", + "token_uri": "https://example/token", + })) + monkeypatch.setenv("GOOGLE_CHAT_SERVICE_ACCOUNT_JSON", str(sa_file)) + + fake_creds = MagicMock() + fake_creds.token = "the-token" + fake_creds.refresh = MagicMock(return_value=None) + + original = _gc_mod.service_account.Credentials.from_service_account_info + _gc_mod.service_account.Credentials.from_service_account_info = MagicMock( + return_value=fake_creds + ) + try: + _install_fake_google_auth_transport(monkeypatch) + send_resp = _FakeAiohttpResponse(200, {"name": "spaces/AAA/messages/MMM"}) + session = _FakeAiohttpSession([send_resp]) + _install_fake_aiohttp(monkeypatch, session) + + result = await _gc_mod._standalone_send( + PlatformConfig(enabled=True, extra={}), + "spaces/AAAA-BBBB", + "hello cron", + ) + finally: + _gc_mod.service_account.Credentials.from_service_account_info = original + + assert result == { + "success": True, + "message_id": "spaces/AAA/messages/MMM", + } + fake_creds.refresh.assert_called_once() + assert len(session.calls) == 1 + url, kwargs = session.calls[0] + assert url == "https://chat.googleapis.com/v1/spaces/AAAA-BBBB/messages" + assert kwargs["headers"]["Authorization"] == "Bearer the-token" + assert kwargs["json"] == {"text": "hello cron"} + + @pytest.mark.asyncio + async def test_standalone_send_returns_error_on_invalid_chat_id(self, monkeypatch): + monkeypatch.delenv("GOOGLE_CHAT_SERVICE_ACCOUNT_JSON", raising=False) + result = await _gc_mod._standalone_send( + PlatformConfig(enabled=True, extra={}), + "not-a-resource-name", + "hi", + ) + assert "error" in result + assert "spaces/" in result["error"] or "users/" in result["error"] + + @pytest.mark.asyncio + async def test_standalone_send_propagates_api_failure(self, monkeypatch, tmp_path): + sa_file = tmp_path / "sa.json" + sa_file.write_text(json.dumps({ + "type": "service_account", + "client_email": "bot@example.iam.gserviceaccount.com", + "private_key": "fake", + "token_uri": "https://example/token", + })) + monkeypatch.setenv("GOOGLE_CHAT_SERVICE_ACCOUNT_JSON", str(sa_file)) + + fake_creds = MagicMock() + fake_creds.token = "the-token" + fake_creds.refresh = MagicMock(return_value=None) + + original = _gc_mod.service_account.Credentials.from_service_account_info + _gc_mod.service_account.Credentials.from_service_account_info = MagicMock( + return_value=fake_creds + ) + try: + _install_fake_google_auth_transport(monkeypatch) + send_resp = _FakeAiohttpResponse( + 403, + {"error": {"code": 403, "message": "forbidden"}}, + text_body='{"error":{"code":403,"message":"forbidden"}}', + ) + session = _FakeAiohttpSession([send_resp]) + _install_fake_aiohttp(monkeypatch, session) + + result = await _gc_mod._standalone_send( + PlatformConfig(enabled=True, extra={}), + "spaces/AAAA-BBBB", + "hi", + ) + finally: + _gc_mod.service_account.Credentials.from_service_account_info = original + + assert "error" in result + assert "403" in result["error"] + + @pytest.mark.asyncio + async def test_standalone_send_rejects_chat_id_with_path_traversal(self, monkeypatch): + monkeypatch.delenv("GOOGLE_CHAT_SERVICE_ACCOUNT_JSON", raising=False) + + # Attempt to inject extra path segments after the prefix passes the + # startswith check. The strict regex must reject this. + result = await _gc_mod._standalone_send( + PlatformConfig(enabled=True, extra={}), + "spaces/AAAA/messages?messageReplyOption=REPLY_MESSAGE_FALLBACK_TO_NEW_THREAD", + "hi", + ) + + assert "error" in result + # The error names the expected resource shape so plugin authors can self-correct + assert "spaces/" in result["error"] or "users/" in result["error"] diff --git a/tests/gateway/test_irc_adapter.py b/tests/gateway/test_irc_adapter.py index a1718fbdaf2..246dbfdf0ec 100644 --- a/tests/gateway/test_irc_adapter.py +++ b/tests/gateway/test_irc_adapter.py @@ -20,6 +20,7 @@ IRCAdapter = _irc_mod.IRCAdapter check_requirements = _irc_mod.check_requirements validate_config = _irc_mod.validate_config register = _irc_mod.register +_standalone_send = _irc_mod._standalone_send class TestIRCProtocolHelpers: @@ -500,3 +501,224 @@ class TestIRCPluginRegistration: ctx.register_platform.assert_called_once() call_kwargs = ctx.register_platform.call_args assert call_kwargs[1]["name"] == "irc" or call_kwargs[0][0] == "irc" if call_kwargs[0] else call_kwargs[1]["name"] == "irc" + + +# ── _standalone_send (out-of-process cron delivery) ────────────────────── + + +class _FakeIRCConnection: + """A scripted reader/writer pair used to simulate an IRC server. + + Construct with the lines the server should respond with (already + framed by ``\\r\\n``). Captures every line written by the client so + tests can assert NICK/USER/PRIVMSG/QUIT order. + """ + + def __init__(self, scripted_lines): + self.writes: list[bytes] = [] + self._closed = False + self._scripted = list(scripted_lines) + self._buffer = b"" + + # writer side ──────────────────────────────────────────────────── + def write(self, data: bytes) -> None: + self.writes.append(data) + + async def drain(self) -> None: + return None + + def close(self) -> None: + self._closed = True + + async def wait_closed(self) -> None: + return None + + def is_closing(self) -> bool: + return self._closed + + # reader side ──────────────────────────────────────────────────── + async def readuntil(self, separator: bytes = b"\r\n") -> bytes: + if not self._scripted: + raise asyncio.IncompleteReadError(b"", None) + line = self._scripted.pop(0) + if not line.endswith(b"\r\n"): + line = line + b"\r\n" + return line + + async def read(self, n: int = -1) -> bytes: + return b"" + + +class TestIRCStandaloneSend: + + @pytest.mark.asyncio + async def test_standalone_send_completes_handshake_and_sends_privmsg(self, monkeypatch): + from gateway.config import PlatformConfig + + monkeypatch.setenv("IRC_SERVER", "irc.test.net") + monkeypatch.setenv("IRC_CHANNEL", "#cron") + monkeypatch.setenv("IRC_NICKNAME", "hermesbot") + monkeypatch.setenv("IRC_USE_TLS", "false") + + # Server greets us with 001 RPL_WELCOME, then nothing for QUIT drain. + conn = _FakeIRCConnection([b":server 001 hermesbot-cron :Welcome"]) + + async def _fake_open(host, port, **kwargs): + return conn, conn # reader and writer share the same fake + + monkeypatch.setattr(_irc_mod.asyncio, "open_connection", _fake_open) + + result = await _standalone_send( + PlatformConfig(enabled=True, extra={}), + "#cron", + "hello from cron", + ) + + assert result["success"] is True + assert "message_id" in result + + sent_lines = b"".join(conn.writes).decode("utf-8").splitlines() + # NICK uses the cron-suffixed identity to avoid colliding with the + # long-running gateway adapter that may already hold the nickname. + assert any(line.startswith("NICK hermesbot-cron") for line in sent_lines) + assert any(line.startswith("USER hermesbot-cron 0 * :Hermes Agent (cron)") + for line in sent_lines) + assert any(line == "PRIVMSG #cron :hello from cron" for line in sent_lines) + assert any(line.startswith("QUIT ") for line in sent_lines) + + @pytest.mark.asyncio + async def test_standalone_send_returns_error_when_unconfigured(self, monkeypatch): + from gateway.config import PlatformConfig + + for var in ("IRC_SERVER", "IRC_CHANNEL"): + monkeypatch.delenv(var, raising=False) + + result = await _standalone_send( + PlatformConfig(enabled=True, extra={}), + "", + "hi", + ) + + assert "error" in result + assert "IRC_SERVER" in result["error"] or "IRC_CHANNEL" in result["error"] + + @pytest.mark.asyncio + async def test_standalone_send_returns_error_on_registration_timeout(self, monkeypatch): + from gateway.config import PlatformConfig + + monkeypatch.setenv("IRC_SERVER", "irc.test.net") + monkeypatch.setenv("IRC_CHANNEL", "#cron") + monkeypatch.setenv("IRC_NICKNAME", "hermesbot") + monkeypatch.setenv("IRC_USE_TLS", "false") + + # No 001 response: the readuntil call returns IncompleteReadError so + # the registration loop times out via the asyncio wait_for inside. + conn = _FakeIRCConnection([]) + + async def _fake_open(host, port, **kwargs): + return conn, conn + + monkeypatch.setattr(_irc_mod.asyncio, "open_connection", _fake_open) + + # Patch wait_for to raise TimeoutError immediately so the test is fast + async def _fast_timeout(coro, timeout): + try: + return await coro + except asyncio.IncompleteReadError: + raise asyncio.TimeoutError() + + monkeypatch.setattr(_irc_mod.asyncio, "wait_for", _fast_timeout) + + result = await _standalone_send( + PlatformConfig(enabled=True, extra={}), + "#cron", + "hi", + ) + + assert "error" in result + assert "registration" in result["error"].lower() or "timeout" in result["error"].lower() + + @pytest.mark.asyncio + async def test_standalone_send_rejects_crlf_in_chat_id(self, monkeypatch): + from gateway.config import PlatformConfig + + monkeypatch.setenv("IRC_SERVER", "irc.test.net") + monkeypatch.setenv("IRC_CHANNEL", "#cron") + monkeypatch.setenv("IRC_NICKNAME", "hermesbot") + monkeypatch.setenv("IRC_USE_TLS", "false") + + # Attempt to inject a second IRC command via CRLF in chat_id + result = await _standalone_send( + PlatformConfig(enabled=True, extra={}), + "#cron\r\nKICK #cron hermesbot", + "hi", + ) + + assert "error" in result + assert "illegal IRC characters" in result["error"] + + @pytest.mark.asyncio + async def test_standalone_send_strips_crlf_from_message_body(self, monkeypatch): + from gateway.config import PlatformConfig + + monkeypatch.setenv("IRC_SERVER", "irc.test.net") + monkeypatch.setenv("IRC_CHANNEL", "#cron") + monkeypatch.setenv("IRC_NICKNAME", "hermesbot") + monkeypatch.setenv("IRC_USE_TLS", "false") + + conn = _FakeIRCConnection([b":server 001 hermesbot-cron :Welcome"]) + + async def _fake_open(host, port, **kwargs): + return conn, conn + + monkeypatch.setattr(_irc_mod.asyncio, "open_connection", _fake_open) + + # A bare \r in message content tries to inject a NICK command. + # Our control-char stripper must blank \r so the line stays one PRIVMSG. + result = await _standalone_send( + PlatformConfig(enabled=True, extra={}), + "#cron", + "hello\rNICK eviltwin", + ) + + sent_lines = b"".join(conn.writes).decode("utf-8").splitlines() + # No injected NICK command after the legitimate registration NICK + nick_lines = [line for line in sent_lines if line.startswith("NICK ")] + # Only the original registration NICK should be present (no injected one) + assert all(line.startswith("NICK hermesbot-cron") for line in nick_lines) + # The PRIVMSG should contain "hello NICK eviltwin" as one line (with \r blanked) + assert any("PRIVMSG #cron :hello NICK eviltwin" in line for line in sent_lines) + + @pytest.mark.asyncio + async def test_standalone_send_joins_channel_before_privmsg(self, monkeypatch): + from gateway.config import PlatformConfig + + monkeypatch.setenv("IRC_SERVER", "irc.test.net") + monkeypatch.setenv("IRC_CHANNEL", "#cron") + monkeypatch.setenv("IRC_NICKNAME", "hermesbot") + monkeypatch.setenv("IRC_USE_TLS", "false") + + # Register, then accept JOIN with 366 RPL_ENDOFNAMES, then PRIVMSG. + conn = _FakeIRCConnection([ + b":server 001 hermesbot-cron :Welcome", + b":server 366 hermesbot-cron #cron :End of /NAMES list.", + ]) + + async def _fake_open(host, port, **kwargs): + return conn, conn + + monkeypatch.setattr(_irc_mod.asyncio, "open_connection", _fake_open) + + result = await _standalone_send( + PlatformConfig(enabled=True, extra={}), + "#cron", + "hello", + ) + + assert result["success"] is True + sent_lines = b"".join(conn.writes).decode("utf-8").splitlines() + join_idx = next((i for i, line in enumerate(sent_lines) if line.startswith("JOIN #cron")), None) + privmsg_idx = next((i for i, line in enumerate(sent_lines) if line.startswith("PRIVMSG #cron")), None) + assert join_idx is not None, "JOIN must be sent for channel targets" + assert privmsg_idx is not None + assert join_idx < privmsg_idx, "JOIN must precede PRIVMSG" diff --git a/tests/gateway/test_teams.py b/tests/gateway/test_teams.py index c8730f76848..34cd0ca3eed 100644 --- a/tests/gateway/test_teams.py +++ b/tests/gateway/test_teams.py @@ -703,3 +703,177 @@ class TestTeamsMessageHandling: await adapter._on_message(ctx) assert adapter.handle_message.await_count == 1 + + +# ── _standalone_send (out-of-process cron delivery) ────────────────────── + + +class _FakeAiohttpResponse: + def __init__(self, status: int, payload, text_body: str = ""): + self.status = status + self._payload = payload + self._text = text_body or (str(payload) if payload is not None else "") + + async def json(self): + return self._payload + + async def text(self): + return self._text + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc, tb): + return None + + +class _FakeAiohttpSession: + """Scripted aiohttp.ClientSession with a queue of responses so tests + can assert calls in order.""" + + def __init__(self, scripts): + self._scripts = list(scripts) + self.calls: list[tuple[str, dict]] = [] + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc, tb): + return None + + def post(self, url, **kwargs): + self.calls.append((url, kwargs)) + if not self._scripts: + raise AssertionError(f"No scripted response for POST {url}") + return self._scripts.pop(0) + + +def _install_fake_aiohttp(monkeypatch, session): + """Replace ``aiohttp`` in ``sys.modules`` so ``import aiohttp as _aiohttp`` + inside ``_standalone_send`` picks up our fake.""" + fake_aiohttp = types.SimpleNamespace( + ClientSession=lambda timeout=None: session, + ClientTimeout=lambda total=None: None, + ) + monkeypatch.setitem(sys.modules, "aiohttp", fake_aiohttp) + + +class TestTeamsStandaloneSend: + + @pytest.mark.asyncio + async def test_standalone_send_acquires_token_and_posts_activity(self, monkeypatch): + monkeypatch.setenv("TEAMS_CLIENT_ID", "client-id") + monkeypatch.setenv("TEAMS_CLIENT_SECRET", "secret") + monkeypatch.setenv("TEAMS_TENANT_ID", "tenant") + monkeypatch.delenv("TEAMS_SERVICE_URL", raising=False) + + token_resp = _FakeAiohttpResponse(200, {"access_token": "the-token"}) + activity_resp = _FakeAiohttpResponse(200, {"id": "msg-99"}) + session = _FakeAiohttpSession([token_resp, activity_resp]) + _install_fake_aiohttp(monkeypatch, session) + + result = await _teams_mod._standalone_send( + PlatformConfig(enabled=True, extra={}), + "19:abc@thread.skype", + "hello cron", + ) + + assert result == {"success": True, "message_id": "msg-99"} + assert len(session.calls) == 2 + + token_url, token_kwargs = session.calls[0] + assert "login.microsoftonline.com/tenant/oauth2/v2.0/token" in token_url + assert token_kwargs["data"]["client_id"] == "client-id" + assert token_kwargs["data"]["client_secret"] == "secret" + assert token_kwargs["data"]["scope"] == "https://api.botframework.com/.default" + + activity_url, activity_kwargs = session.calls[1] + # Default service URL when TEAMS_SERVICE_URL is unset + assert "smba.trafficmanager.net" in activity_url + assert "/v3/conversations/19:abc@thread.skype/activities" in activity_url + assert activity_kwargs["headers"]["Authorization"] == "Bearer the-token" + assert activity_kwargs["json"]["text"] == "hello cron" + assert activity_kwargs["json"]["type"] == "message" + + @pytest.mark.asyncio + async def test_standalone_send_returns_error_when_unconfigured(self, monkeypatch): + for var in ("TEAMS_CLIENT_ID", "TEAMS_CLIENT_SECRET", "TEAMS_TENANT_ID"): + monkeypatch.delenv(var, raising=False) + + result = await _teams_mod._standalone_send( + PlatformConfig(enabled=True, extra={}), + "19:abc@thread.skype", + "hi", + ) + + assert "error" in result + assert "TEAMS_CLIENT_ID" in result["error"] + + @pytest.mark.asyncio + async def test_standalone_send_propagates_token_failure(self, monkeypatch): + monkeypatch.setenv("TEAMS_CLIENT_ID", "client-id") + monkeypatch.setenv("TEAMS_CLIENT_SECRET", "secret") + monkeypatch.setenv("TEAMS_TENANT_ID", "tenant") + + token_resp = _FakeAiohttpResponse( + 401, + {"error": "unauthorized_client"}, + text_body='{"error":"unauthorized_client"}', + ) + session = _FakeAiohttpSession([token_resp]) + _install_fake_aiohttp(monkeypatch, session) + + result = await _teams_mod._standalone_send( + PlatformConfig(enabled=True, extra={}), + "19:abc@thread.skype", + "hi", + ) + + assert "error" in result + assert "401" in result["error"] + assert "token" in result["error"].lower() + + @pytest.mark.asyncio + async def test_standalone_send_rejects_off_allowlist_service_url(self, monkeypatch): + monkeypatch.setenv("TEAMS_CLIENT_ID", "client-id") + monkeypatch.setenv("TEAMS_CLIENT_SECRET", "secret") + monkeypatch.setenv("TEAMS_TENANT_ID", "tenant") + # SSRF attempt: point us at an attacker-controlled host + monkeypatch.setenv("TEAMS_SERVICE_URL", "https://attacker.example.com/teams/") + + # If the allowlist check fails to fire, the fake session will assert + # because no scripts are queued; a passing test means we returned + # before any HTTP call. + session = _FakeAiohttpSession([]) + _install_fake_aiohttp(monkeypatch, session) + + result = await _teams_mod._standalone_send( + PlatformConfig(enabled=True, extra={}), + "19:abc@thread.skype", + "hi", + ) + + assert "error" in result + assert "allowlist" in result["error"].lower() + assert len(session.calls) == 0, "must not call any HTTP endpoint with a tampered service URL" + + @pytest.mark.asyncio + async def test_standalone_send_rejects_chat_id_with_path_traversal(self, monkeypatch): + monkeypatch.setenv("TEAMS_CLIENT_ID", "client-id") + monkeypatch.setenv("TEAMS_CLIENT_SECRET", "secret") + monkeypatch.setenv("TEAMS_TENANT_ID", "tenant") + monkeypatch.delenv("TEAMS_SERVICE_URL", raising=False) + + session = _FakeAiohttpSession([]) + _install_fake_aiohttp(monkeypatch, session) + + # Attempt to break out of /v3/conversations//activities via a `/` + result = await _teams_mod._standalone_send( + PlatformConfig(enabled=True, extra={}), + "19:abc/activities/19:other@thread.skype", + "hi", + ) + + assert "error" in result + assert "Bot Framework conversation ID" in result["error"] + assert len(session.calls) == 0 diff --git a/tests/tools/test_send_message_tool.py b/tests/tools/test_send_message_tool.py index 48003f410a8..024cf43f948 100644 --- a/tests/tools/test_send_message_tool.py +++ b/tests/tools/test_send_message_tool.py @@ -2052,3 +2052,180 @@ class TestSendSignalChunking: # Only the existing file made it into the RPC params = fake.calls[0]["payload"]["params"] assert len(params["attachments"]) == 1 + + +# ── _send_via_adapter standalone fallback ──────────────────────────────── + + +class _FakePlatform: + """Stand-in for the gateway.config.Platform enum. Holds the .value + attribute consulted by ``_send_via_adapter`` for registry lookups.""" + + def __init__(self, value): + self.value = value + + +class TestSendViaAdapterStandaloneFallback: + """Coverage for the out-of-process plugin-platform send path. + + When the gateway runner is not in this process (e.g. ``hermes cron`` + runs separately from ``hermes gateway``), ``_send_via_adapter`` should + fall through to the plugin's ``standalone_sender_fn`` registered on + its ``PlatformEntry``. Without the hook, the existing error string + is returned (with a more helpful tail). + """ + + @staticmethod + def _make_entry(send_fn): + from gateway.platform_registry import PlatformEntry + + return PlatformEntry( + name="fakeplatform", + label="Fake", + adapter_factory=lambda cfg: None, + check_fn=lambda: True, + standalone_sender_fn=send_fn, + ) + + @pytest.mark.asyncio + async def test_standalone_sender_fn_called_when_no_adapter(self, monkeypatch): + """Registry has hook, runner ref returns None: the hook is awaited.""" + from tools.send_message_tool import _send_via_adapter + from gateway.platform_registry import platform_registry + + recorded = {} + + async def fake_send(pconfig, chat_id, message, **kwargs): + recorded["pconfig"] = pconfig + recorded["chat_id"] = chat_id + recorded["message"] = message + recorded["kwargs"] = kwargs + return {"success": True, "message_id": "msg-42"} + + platform_registry.register(self._make_entry(fake_send)) + try: + monkeypatch.setattr("gateway.run._gateway_runner_ref", lambda: None) + + pconfig = SimpleNamespace(extra={}) + result = await _send_via_adapter( + _FakePlatform("fakeplatform"), + pconfig, + "room/123", + "hello cron", + ) + finally: + platform_registry.unregister("fakeplatform") + + assert result == {"success": True, "message_id": "msg-42"} + assert recorded["chat_id"] == "room/123" + assert recorded["message"] == "hello cron" + assert recorded["pconfig"] is pconfig + + @pytest.mark.asyncio + async def test_standalone_sender_fn_kwargs_forwarded(self, monkeypatch): + """thread_id, media_files, and force_document all reach the hook.""" + from tools.send_message_tool import _send_via_adapter + from gateway.platform_registry import platform_registry + + recorded = {} + + async def fake_send(pconfig, chat_id, message, *, thread_id=None, + media_files=None, force_document=False): + recorded["thread_id"] = thread_id + recorded["media_files"] = media_files + recorded["force_document"] = force_document + return {"success": True, "message_id": "x"} + + platform_registry.register(self._make_entry(fake_send)) + try: + monkeypatch.setattr("gateway.run._gateway_runner_ref", lambda: None) + + await _send_via_adapter( + _FakePlatform("fakeplatform"), + SimpleNamespace(extra={}), + "chat-1", + "hi", + thread_id="thread-7", + media_files=["/tmp/a.png"], + force_document=True, + ) + finally: + platform_registry.unregister("fakeplatform") + + assert recorded["thread_id"] == "thread-7" + assert recorded["media_files"] == ["/tmp/a.png"] + assert recorded["force_document"] is True + + @pytest.mark.asyncio + async def test_standalone_sender_fn_absent_returns_helpful_error(self, monkeypatch): + """Registry entry has no hook: the fall-through error explains both + options (gateway-running and standalone hook).""" + from tools.send_message_tool import _send_via_adapter + from gateway.platform_registry import platform_registry + + platform_registry.register(self._make_entry(None)) + try: + monkeypatch.setattr("gateway.run._gateway_runner_ref", lambda: None) + + result = await _send_via_adapter( + _FakePlatform("fakeplatform"), + SimpleNamespace(extra={}), + "chat-1", + "hi", + ) + finally: + platform_registry.unregister("fakeplatform") + + assert "error" in result + assert "fakeplatform" in result["error"] + assert "standalone_sender_fn" in result["error"] + + @pytest.mark.asyncio + async def test_standalone_sender_fn_raises_is_caught_and_formatted(self, monkeypatch): + """Hook raises: error dict has 'Plugin standalone send failed: ...'""" + from tools.send_message_tool import _send_via_adapter + from gateway.platform_registry import platform_registry + + async def boom(pconfig, chat_id, message, **kwargs): + raise ValueError("boom!") + + platform_registry.register(self._make_entry(boom)) + try: + monkeypatch.setattr("gateway.run._gateway_runner_ref", lambda: None) + + result = await _send_via_adapter( + _FakePlatform("fakeplatform"), + SimpleNamespace(extra={}), + "chat-1", + "hi", + ) + finally: + platform_registry.unregister("fakeplatform") + + assert result == {"error": "Plugin standalone send failed: boom!"} + + @pytest.mark.asyncio + async def test_standalone_sender_fn_return_shape_passed_through(self, monkeypatch): + """Hook returns success dict: passed through unchanged.""" + from tools.send_message_tool import _send_via_adapter + from gateway.platform_registry import platform_registry + + async def fake_send(pconfig, chat_id, message, **kwargs): + return {"success": True, "message_id": "abc-123", "extra_field": "preserved"} + + platform_registry.register(self._make_entry(fake_send)) + try: + monkeypatch.setattr("gateway.run._gateway_runner_ref", lambda: None) + + result = await _send_via_adapter( + _FakePlatform("fakeplatform"), + SimpleNamespace(extra={}), + "chat-1", + "hi", + ) + finally: + platform_registry.unregister("fakeplatform") + + assert result["success"] is True + assert result["message_id"] == "abc-123" + assert result["extra_field"] == "preserved" diff --git a/tools/send_message_tool.py b/tools/send_message_tool.py index 5199bca67a0..785b42a3d9f 100644 --- a/tools/send_message_tool.py +++ b/tools/send_message_tool.py @@ -423,25 +423,92 @@ def _maybe_skip_cron_duplicate_send(platform_name: str, chat_id: str, thread_id: } -async def _send_via_adapter(platform, pconfig, chat_id, chunk): - """Send a message via a live gateway adapter (for plugin platforms). +async def _send_via_adapter( + platform, + pconfig, + chat_id, + chunk, + *, + thread_id=None, + media_files=None, + force_document=False, +): + """Send a message via a live gateway adapter, with a standalone fallback + for out-of-process callers (e.g. cron running separately from the gateway). - Falls back to error if no adapter is connected for this platform. + Order of attempts: + 1. Live in-process adapter via ``_gateway_runner_ref()`` (the path that + existed before this change). + 2. The plugin's ``standalone_sender_fn`` registered on its + ``PlatformEntry`` (used when the gateway is not in this process, so + the runner weakref is ``None``). + 3. A descriptive error explaining both options. """ + runner = None try: from gateway.run import _gateway_runner_ref runner = _gateway_runner_ref() - if runner: + except Exception: + runner = None + + if runner is not None: + try: adapter = runner.adapters.get(platform) - if adapter: - from gateway.platforms.base import SendResult + except Exception: + adapter = None + if adapter is not None: + try: result = await adapter.send(chat_id=chat_id, content=chunk) - if result.success: - return {"success": True, "message_id": result.message_id} - return {"error": f"Adapter send failed: {result.error}"} - except Exception as e: - return {"error": f"Plugin platform send failed: {e}"} - return {"error": f"No live adapter for platform '{platform.value}'. Is the gateway running with this platform connected?"} + except asyncio.CancelledError: + raise + except Exception as e: + return {"error": f"Plugin platform send failed: {e}"} + if result.success: + return {"success": True, "message_id": result.message_id} + return {"error": f"Adapter send failed: {result.error}"} + + platform_name = platform.value if hasattr(platform, "value") else str(platform) + entry = None + try: + from gateway.platform_registry import platform_registry + entry = platform_registry.get(platform_name) + except Exception: + entry = None + + if entry is not None and entry.standalone_sender_fn is not None: + try: + result = await entry.standalone_sender_fn( + pconfig, + chat_id, + chunk, + thread_id=thread_id, + media_files=media_files, + force_document=force_document, + ) + except asyncio.CancelledError: + raise + except Exception as e: + logger.debug("Plugin standalone send for %s raised", platform_name, exc_info=True) + return {"error": f"Plugin standalone send failed: {e}"} + + if isinstance(result, dict) and (result.get("success") or result.get("error")): + return result + return { + "error": ( + f"Plugin standalone send for '{platform_name}' returned an " + f"invalid result: expected a dict with 'success' or 'error' " + f"keys, got {type(result).__name__}" + ) + } + + return { + "error": ( + f"No live adapter for platform '{platform_name}'. Is the gateway " + f"running with this platform connected? For out-of-process delivery " + f"(e.g. cron in a separate process), the platform plugin must " + f"register a standalone_sender_fn on its PlatformEntry." + ) + } async def _send_to_platform(platform, pconfig, chat_id, message, thread_id=None, media_files=None, force_document=False): @@ -660,9 +727,17 @@ async def _send_to_platform(platform, pconfig, chat_id, message, thread_id=None, elif platform == Platform.YUANBAO: result = await _send_yuanbao(chat_id, chunk) else: - # Plugin platform — route through the gateway's live adapter - # if available, otherwise report the error. - result = await _send_via_adapter(platform, pconfig, chat_id, chunk) + # Plugin platform: route through the gateway's live adapter if + # available, otherwise the plugin's standalone_sender_fn. + result = await _send_via_adapter( + platform, + pconfig, + chat_id, + chunk, + thread_id=thread_id, + media_files=media_files, + force_document=force_document, + ) if isinstance(result, dict) and result.get("error"): return result diff --git a/website/docs/developer-guide/adding-platform-adapters.md b/website/docs/developer-guide/adding-platform-adapters.md index 763f9e6d1fe..1ba4b9a34cd 100644 --- a/website/docs/developer-guide/adding-platform-adapters.md +++ b/website/docs/developer-guide/adding-platform-adapters.md @@ -253,6 +253,37 @@ ctx.register_platform( The scheduler reads this env var when resolving the home target for `deliver=my_platform` jobs, and also treats the platform as a valid cron target in `_KNOWN_DELIVERY_PLATFORMS`-style checks. If your `env_enablement_fn` seeds a `home_channel` dict (see above), that takes precedence — `cron_deliver_env_var` is the fallback for cron jobs that run before env seeding. +### Out-of-process cron delivery + +`cron_deliver_env_var` makes your platform a recognized `deliver=` target. To make the actual send succeed when the cron job runs in a separate process from the gateway (i.e., `hermes cron run` separate from `hermes gateway`), register a `standalone_sender_fn`: + +```python +async def _standalone_send( + pconfig, + chat_id, + message, + *, + thread_id=None, + media_files=None, + force_document=False, +): + """Open an ephemeral connection / acquire a fresh token, send, and close.""" + # ... open connection, send message, return result ... + return {"success": True, "message_id": "..."} + # or {"error": "..."} + +ctx.register_platform( + name="my_platform", + ... + cron_deliver_env_var="MY_PLATFORM_HOME_CHANNEL", + standalone_sender_fn=_standalone_send, +) +``` + +Why this hook is necessary: built-in platforms (Telegram, Discord, Slack, etc.) ship direct REST helpers in `tools/send_message_tool.py` so cron can deliver without holding the gateway in the same process. Plugin platforms historically depended on `_gateway_runner_ref()`, which returns `None` outside the gateway process, so without `standalone_sender_fn` the cron-side send fails with `No live adapter for platform ''`. + +The function receives the same `pconfig` and `chat_id` that the live adapter would, plus optional `thread_id`, `media_files`, and `force_document` keyword arguments. Returning `{"success": True, "message_id": ...}` is treated as a successful delivery; returning `{"error": "..."}` surfaces the message in cron's `delivery_errors`. Exceptions raised inside the function are caught by the dispatcher and reported as `Plugin standalone send failed: `. Reference implementations live in `plugins/platforms/{irc,teams,google_chat}/adapter.py`. + ## Surfacing Env Vars in `hermes config` `hermes_cli/config.py` scans `plugins/platforms/*/plugin.yaml` at import time and auto-populates `OPTIONAL_ENV_VARS` from `requires_env` and (optional) `optional_env` blocks. Use the rich-dict form to contribute proper descriptions, prompts, password flags, and URLs — the CLI setup UI picks them up for free.