feat(plugins): add standalone_sender_fn for out-of-process cron delivery

Plugin platforms (IRC, Teams, Google Chat) currently fail with
`No live adapter for platform '<name>'` when a `deliver=<plugin>` cron
job runs in a separate process from the gateway, even though the
platforms are eligible cron targets via `cron_deliver_env_var` (added
in #21306). Built-in platforms (Telegram, Discord, Slack, etc.) use
direct REST helpers in `tools/send_message_tool.py` so cron can deliver
without holding the gateway in the same process; plugin platforms
historically depended on `_gateway_runner_ref()` which returns `None`
out of process.

This change adds an optional `standalone_sender_fn` field to
`PlatformEntry` so plugins can register an ephemeral send path that
opens its own connection, sends, and closes without needing the live
adapter. The dispatch site in `_send_via_adapter` falls through to the
hook when the gateway runner is unavailable, with a descriptive error
when neither path applies. The hook is optional, so existing plugins
are unaffected.

Reference migrations land in the same change for IRC, Teams, and
Google Chat, exercising the hook across stdlib (asyncio + IRC protocol),
Bot Framework OAuth client_credentials, and Google service-account
flows respectively.

Security hardening on the new code paths:
* IRC: control-character stripping on chat_id and message body to
  block CRLF command injection; bounded nick-collision retries; JOIN
  before PRIVMSG so channels with the default `+n` mode accept the
  delivery.
* Teams: TEAMS_SERVICE_URL validated against an allowlist of known
  Bot Framework hosts (`smba.trafficmanager.net`,
  `smba.infra.gov.teams.microsoft.us`) to block SSRF; chat_id and
  tenant_id constrained to the documented Bot Framework character set;
  per-request timeouts so a slow STS endpoint cannot starve the
  activity POST.
* Google Chat: chat_id and thread_id validated against strict
  resource-name regexes; service-account refresh wrapped in
  `asyncio.wait_for` so a hung token endpoint cannot stall the
  scheduler.

Test coverage: 20 new tests covering happy path, missing-config errors,
network failure modes, and each defensive validation. Existing tests
unchanged. `bash scripts/run_tests.sh tests/tools/test_send_message_tool.py
tests/gateway/test_irc_adapter.py tests/gateway/test_teams.py
tests/gateway/test_google_chat.py` reports 341 passed, 0 regressions.

Documentation: new "Out-of-process cron delivery" section in
website/docs/developer-guide/adding-platform-adapters.md and an entry
in gateway/platforms/ADDING_A_PLATFORM.md naming the hook.
This commit is contained in:
GodsBoy 2026-05-08 12:23:26 +02:00 committed by kshitij
parent 3801825efd
commit 93e25ceb13
11 changed files with 1456 additions and 24 deletions

View file

@ -423,25 +423,92 @@ def _maybe_skip_cron_duplicate_send(platform_name: str, chat_id: str, thread_id:
}
async def _send_via_adapter(platform, pconfig, chat_id, chunk):
"""Send a message via a live gateway adapter (for plugin platforms).
async def _send_via_adapter(
platform,
pconfig,
chat_id,
chunk,
*,
thread_id=None,
media_files=None,
force_document=False,
):
"""Send a message via a live gateway adapter, with a standalone fallback
for out-of-process callers (e.g. cron running separately from the gateway).
Falls back to error if no adapter is connected for this platform.
Order of attempts:
1. Live in-process adapter via ``_gateway_runner_ref()`` (the path that
existed before this change).
2. The plugin's ``standalone_sender_fn`` registered on its
``PlatformEntry`` (used when the gateway is not in this process, so
the runner weakref is ``None``).
3. A descriptive error explaining both options.
"""
runner = None
try:
from gateway.run import _gateway_runner_ref
runner = _gateway_runner_ref()
if runner:
except Exception:
runner = None
if runner is not None:
try:
adapter = runner.adapters.get(platform)
if adapter:
from gateway.platforms.base import SendResult
except Exception:
adapter = None
if adapter is not None:
try:
result = await adapter.send(chat_id=chat_id, content=chunk)
if result.success:
return {"success": True, "message_id": result.message_id}
return {"error": f"Adapter send failed: {result.error}"}
except Exception as e:
return {"error": f"Plugin platform send failed: {e}"}
return {"error": f"No live adapter for platform '{platform.value}'. Is the gateway running with this platform connected?"}
except asyncio.CancelledError:
raise
except Exception as e:
return {"error": f"Plugin platform send failed: {e}"}
if result.success:
return {"success": True, "message_id": result.message_id}
return {"error": f"Adapter send failed: {result.error}"}
platform_name = platform.value if hasattr(platform, "value") else str(platform)
entry = None
try:
from gateway.platform_registry import platform_registry
entry = platform_registry.get(platform_name)
except Exception:
entry = None
if entry is not None and entry.standalone_sender_fn is not None:
try:
result = await entry.standalone_sender_fn(
pconfig,
chat_id,
chunk,
thread_id=thread_id,
media_files=media_files,
force_document=force_document,
)
except asyncio.CancelledError:
raise
except Exception as e:
logger.debug("Plugin standalone send for %s raised", platform_name, exc_info=True)
return {"error": f"Plugin standalone send failed: {e}"}
if isinstance(result, dict) and (result.get("success") or result.get("error")):
return result
return {
"error": (
f"Plugin standalone send for '{platform_name}' returned an "
f"invalid result: expected a dict with 'success' or 'error' "
f"keys, got {type(result).__name__}"
)
}
return {
"error": (
f"No live adapter for platform '{platform_name}'. Is the gateway "
f"running with this platform connected? For out-of-process delivery "
f"(e.g. cron in a separate process), the platform plugin must "
f"register a standalone_sender_fn on its PlatformEntry."
)
}
async def _send_to_platform(platform, pconfig, chat_id, message, thread_id=None, media_files=None, force_document=False):
@ -660,9 +727,17 @@ async def _send_to_platform(platform, pconfig, chat_id, message, thread_id=None,
elif platform == Platform.YUANBAO:
result = await _send_yuanbao(chat_id, chunk)
else:
# Plugin platform — route through the gateway's live adapter
# if available, otherwise report the error.
result = await _send_via_adapter(platform, pconfig, chat_id, chunk)
# Plugin platform: route through the gateway's live adapter if
# available, otherwise the plugin's standalone_sender_fn.
result = await _send_via_adapter(
platform,
pconfig,
chat_id,
chunk,
thread_id=thread_id,
media_files=media_files,
force_document=force_document,
)
if isinstance(result, dict) and result.get("error"):
return result