hermes-agent/plugins/platforms/ntfy/adapter.py
Teknium 3b096d6f6d ntfy: tighten robustness, dedupe auth/truncation, add docs
Robustness:
- Surface 401/404 stream failures via _set_fatal_error() so the gateway's
  runtime status reflects 'fatal: ntfy_unauthorized' / 'ntfy_topic_not_found'
  instead of staying 'connected' when the reconnect loop halts. Matches
  the pattern in whatsapp / telegram / sms adapters.
- Strip whitespace from auth tokens so pasted tokens with trailing
  newlines don't produce malformed Authorization headers.

Simplicity:
- Extract _build_auth_header() and _truncate_body() to module-level
  helpers, used by both NtfyAdapter and _standalone_send. Removes the
  duplicated auth/truncation logic between the two paths.

Docs:
- website/docs/user-guide/messaging/ntfy.md — full setup guide,
  identity-model warning, self-hosting, cron usage, troubleshooting.
- website/docs/reference/environment-variables.md — all 9 NTFY_* vars.
- website/docs/user-guide/messaging/index.md — platform comparison row.
- website/sidebars.ts — sidebar entry between simplex and open-webui.

Tests: 78/78 (+ 10 new robustness tests covering token hygiene, fatal
error propagation for 401/404, and the _truncate_body helper).
2026-05-23 16:13:01 -07:00

582 lines
22 KiB
Python

"""ntfy platform adapter (Hermes plugin).
Subscribes to a topic on ntfy.sh or any self-hosted ntfy server via
HTTP streaming (``/json`` endpoint with ``poll=false``) and publishes
replies via HTTP POST. No external SDK — only httpx, which is already
a Hermes dependency.
This adapter ships as a Hermes platform plugin under
``plugins/platforms/ntfy/``. The Hermes plugin loader scans the
directory at startup, calls :func:`register`, and the platform becomes
available to ``gateway/run.py`` and ``tools/send_message_tool`` through
the registry — no edits to core files required.
Configuration in config.yaml::
platforms:
ntfy:
enabled: true
extra:
server: "https://ntfy.sh" # or self-hosted URL
topic: "hermes-in" # subscribe topic (incoming)
publish_topic: "hermes-out" # optional — defaults to topic
token: "..." # optional Bearer / Basic auth token
markdown: true # optional — enable markdown (default: false)
Environment variables (all read at adapter construct time, env wins over
config.yaml ``extra``):
NTFY_TOPIC Topic to subscribe to (required)
NTFY_SERVER_URL Server URL (default: https://ntfy.sh)
NTFY_TOKEN Bearer token or 'user:pass' for Basic auth
NTFY_PUBLISH_TOPIC Reply topic (defaults to NTFY_TOPIC)
NTFY_MARKDOWN "true"/"1"/"yes" enables X-Markdown header
NTFY_ALLOWED_USERS Allowlist (treated by gateway as user IDs;
on ntfy these are topic names)
NTFY_ALLOW_ALL_USERS Allow any topic — dev only
NTFY_HOME_CHANNEL Default topic for cron / notification delivery
NTFY_HOME_CHANNEL_NAME Human label for the home channel
Identity model: ntfy has no native authenticated user identity. The
``title`` field is publisher-controlled and is NOT used for
authorization. Each topic is treated as a single trusted channel —
``user_id`` is fixed to the topic name. Use a private topic protected
by a read token for any real trust boundary.
"""
import asyncio
import json
import logging
import os
import time
import uuid
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional
try:
import httpx
HTTPX_AVAILABLE = True
except ImportError:
HTTPX_AVAILABLE = False
httpx = None # type: ignore[assignment]
from gateway.config import Platform, PlatformConfig
from gateway.platforms.base import (
BasePlatformAdapter,
MessageEvent,
MessageType,
SendResult,
)
logger = logging.getLogger(__name__)
class _FatalStreamError(Exception):
"""Raised when a stream error is unrecoverable (e.g. 401, 404)."""
DEFAULT_SERVER = "https://ntfy.sh"
MAX_MESSAGE_LENGTH = 4096 # ntfy message body limit
DEDUP_WINDOW_SECONDS = 300
DEDUP_MAX_SIZE = 1000
RECONNECT_BACKOFF = [2, 5, 10, 30, 60]
STREAM_TIMEOUT_SECONDS = 90 # ntfy keepalive default is 55s; give margin
def _build_auth_header(token: str) -> Dict[str, str]:
"""Build an ``Authorization`` header from an ntfy token.
Shared by :class:`NtfyAdapter._auth_headers` and :func:`_standalone_send`
so both paths follow the same auth shape and whitespace-stripping rules.
Tokens are stripped of surrounding whitespace — pasted tokens often
carry trailing newlines that would otherwise render the header
malformed (``Authorization: Bearer foo\\n``). ``user:pass`` tokens
become Basic auth; anything else is treated as a Bearer token.
Returns ``{}`` when no token is configured.
"""
if not token:
return {}
token = token.strip()
if not token:
return {}
if ":" in token:
import base64
encoded = base64.b64encode(token.encode()).decode()
return {"Authorization": f"Basic {encoded}"}
return {"Authorization": f"Bearer {token}"}
def _truncate_body(message: str, *, context: str) -> bytes:
"""Apply the ntfy 4096-char limit, logging a warning on truncation.
``context`` is included in the log message so adapter and standalone
truncations can be told apart in logs.
"""
if len(message) > MAX_MESSAGE_LENGTH:
logger.warning(
"%s: truncating message from %d to %d chars (ntfy limit)",
context, len(message), MAX_MESSAGE_LENGTH,
)
return message[:MAX_MESSAGE_LENGTH].encode("utf-8")
def check_requirements() -> bool:
"""Check whether the ntfy adapter is installable and minimally configured.
Reads ``NTFY_TOPIC`` directly to avoid the cost of a full
``load_gateway_config()`` (which also writes to ``os.environ``) on
every pre-flight check.
"""
if not HTTPX_AVAILABLE:
return False
topic = os.getenv("NTFY_TOPIC", "").strip()
return bool(topic)
def validate_config(config) -> bool:
"""Validate that the configured ntfy platform has a topic set."""
extra = getattr(config, "extra", {}) or {}
topic = extra.get("topic") or os.getenv("NTFY_TOPIC", "")
return bool(topic)
def is_connected(config) -> bool:
"""Check whether ntfy is configured (env or config.yaml)."""
extra = getattr(config, "extra", {}) or {}
topic = os.getenv("NTFY_TOPIC") or extra.get("topic", "")
return bool(topic)
class NtfyAdapter(BasePlatformAdapter):
"""ntfy adapter.
Subscribes to a topic via HTTP streaming (``/json`` endpoint) and
publishes replies via HTTP POST. No external SDK — only httpx.
"""
MAX_MESSAGE_LENGTH = MAX_MESSAGE_LENGTH
def __init__(self, config: PlatformConfig):
platform = Platform("ntfy")
super().__init__(config=config, platform=platform)
extra = config.extra or {}
self._server: str = (
extra.get("server")
or os.getenv("NTFY_SERVER_URL", DEFAULT_SERVER)
).rstrip("/")
self._topic: str = extra.get("topic") or os.getenv("NTFY_TOPIC", "")
self._publish_topic: str = (
extra.get("publish_topic")
or os.getenv("NTFY_PUBLISH_TOPIC", "")
or self._topic
)
self._token: str = extra.get("token") or os.getenv("NTFY_TOKEN", "")
self._stream_task: Optional[asyncio.Task] = None
self._http_client: Optional["httpx.AsyncClient"] = None
# Message deduplication: msg_id -> timestamp
self._seen_messages: Dict[str, float] = {}
# -- Connection lifecycle -----------------------------------------------
async def connect(self) -> bool:
"""Connect to ntfy by starting the streaming subscription task."""
if not HTTPX_AVAILABLE:
logger.warning("[%s] httpx not installed. Run: pip install httpx", self.name)
return False
if not self._topic:
logger.warning("[%s] NTFY_TOPIC not configured", self.name)
return False
try:
self._http_client = httpx.AsyncClient(timeout=None)
self._stream_task = asyncio.create_task(self._run_stream())
self._mark_connected()
logger.info("[%s] Connected — subscribing to %s/%s", self.name, self._server, self._topic)
return True
except Exception as e:
logger.error("[%s] Failed to connect: %s", self.name, e)
return False
async def _run_stream(self) -> None:
"""Subscribe to the ntfy topic with automatic reconnection."""
backoff_idx = 0
stream_start: float = 0.0
url = f"{self._server}/{self._topic}/json"
headers = self._auth_headers()
while self._running:
try:
logger.debug("[%s] Opening stream to %s", self.name, url)
stream_start = time.monotonic()
await self._consume_stream(url, headers)
except asyncio.CancelledError:
return
except _FatalStreamError:
self._running = False
return
except Exception as e:
if not self._running:
return
logger.warning("[%s] Stream error: %s", self.name, e)
if not self._running:
return
# Reset backoff if stream stayed alive for at least 60s
if time.monotonic() - stream_start >= 60.0:
backoff_idx = 0
delay = RECONNECT_BACKOFF[min(backoff_idx, len(RECONNECT_BACKOFF) - 1)]
logger.info("[%s] Reconnecting in %ds...", self.name, delay)
await asyncio.sleep(delay)
backoff_idx += 1
async def _consume_stream(self, url: str, headers: Dict[str, str]) -> None:
"""Open an HTTP streaming connection and dispatch events."""
# poll=false keeps a persistent streaming connection alive with keepalive events
params = {"poll": "false"}
async with self._http_client.stream(
"GET",
url,
headers=headers,
params=params,
timeout=httpx.Timeout(connect=15.0, read=STREAM_TIMEOUT_SECONDS, write=15.0, pool=15.0),
) as response:
if response.status_code == 401:
logger.error(
"[%s] Authentication failed (401) — stopping reconnect loop. Check NTFY_TOKEN.",
self.name,
)
self._set_fatal_error(
"ntfy_unauthorized",
"ntfy server rejected auth (401). Check NTFY_TOKEN.",
retryable=False,
)
raise _FatalStreamError("401 Unauthorized")
if response.status_code == 404:
logger.error(
"[%s] Topic not found (404): %s — stopping reconnect loop.",
self.name, self._topic,
)
self._set_fatal_error(
"ntfy_topic_not_found",
f"ntfy topic '{self._topic}' returned 404. Check NTFY_TOPIC.",
retryable=False,
)
raise _FatalStreamError("404 Not Found")
response.raise_for_status()
async for line in response.aiter_lines():
if not self._running:
return
line = line.strip()
if not line:
continue
try:
event = json.loads(line)
except json.JSONDecodeError:
continue
if event.get("event") == "message":
await self._on_message(event)
async def disconnect(self) -> None:
"""Disconnect from ntfy."""
self._running = False
self._mark_disconnected()
if self._stream_task:
self._stream_task.cancel()
try:
await self._stream_task
except asyncio.CancelledError:
pass
self._stream_task = None
if self._http_client:
await self._http_client.aclose()
self._http_client = None
self._seen_messages.clear()
logger.info("[%s] Disconnected", self.name)
# -- Inbound message processing -----------------------------------------
async def _on_message(self, event: Dict[str, Any]) -> None:
"""Process an incoming ntfy message event."""
msg_id = event.get("id") or uuid.uuid4().hex
if self._is_duplicate(msg_id):
logger.debug("[%s] Duplicate message %s, skipping", self.name, msg_id)
return
text = (event.get("message") or "").strip()
if not text:
logger.debug("[%s] Empty message body, skipping", self.name)
return
topic = event.get("topic") or self._topic
# ntfy has no native authenticated user identity. The title field is
# publisher-controlled and must NOT be used for authorization — any
# publisher who knows the topic can set title to an allowed username.
# Treat ntfy as a single trusted channel; user_id is fixed to the
# topic name. NTFY_ALLOWED_USERS is only a real trust boundary when
# the topic itself is protected by a read token.
user_id = topic
user_name = topic
source = self.build_source(
chat_id=topic,
chat_name=topic,
chat_type="dm",
user_id=user_id,
user_name=user_name,
)
unix_ts = event.get("time")
try:
timestamp = (
datetime.fromtimestamp(int(unix_ts), tz=timezone.utc)
if unix_ts else datetime.now(tz=timezone.utc)
)
except (ValueError, OSError, TypeError):
timestamp = datetime.now(tz=timezone.utc)
message_event = MessageEvent(
text=text,
message_type=MessageType.TEXT,
source=source,
message_id=msg_id,
raw_message=event,
timestamp=timestamp,
)
logger.debug("[%s] Message on topic %s: %s", self.name, topic, text[:80])
await self.handle_message(message_event)
# -- Deduplication ------------------------------------------------------
def _is_duplicate(self, msg_id: str) -> bool:
"""Return True if this message ID was already seen within the dedup window."""
now = time.time()
if len(self._seen_messages) > DEDUP_MAX_SIZE:
cutoff = now - DEDUP_WINDOW_SECONDS
self._seen_messages = {k: v for k, v in self._seen_messages.items() if v > cutoff}
if msg_id in self._seen_messages:
return True
self._seen_messages[msg_id] = now
return False
# -- Outbound messaging -------------------------------------------------
async def send(
self,
chat_id: str,
content: str,
reply_to: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> SendResult:
"""Publish a message to the configured publish topic."""
metadata = metadata or {}
publish_topic = metadata.get("publish_topic") or self._publish_topic or chat_id
if not self._http_client:
return SendResult(success=False, error="HTTP client not initialized")
url = f"{self._server}/{publish_topic}"
markdown_enabled = (self.config.extra or {}).get("markdown", False)
headers = {**self._auth_headers(), "Content-Type": "text/plain; charset=utf-8"}
if markdown_enabled:
headers["X-Markdown"] = "true"
if len(content) > self.MAX_MESSAGE_LENGTH:
logger.warning(
"[%s] Message truncated from %d to %d chars (ntfy limit)",
self.name, len(content), self.MAX_MESSAGE_LENGTH,
)
body = content[:self.MAX_MESSAGE_LENGTH]
try:
resp = await self._http_client.post(
url, content=body.encode("utf-8"), headers=headers, timeout=15.0,
)
if resp.status_code < 300:
try:
data = resp.json()
returned_id = data.get("id") or uuid.uuid4().hex[:12]
except Exception:
returned_id = uuid.uuid4().hex[:12]
return SendResult(success=True, message_id=returned_id)
body_text = resp.text
logger.warning("[%s] Send failed HTTP %d: %s", self.name, resp.status_code, body_text[:200])
return SendResult(success=False, error=f"HTTP {resp.status_code}: {body_text[:200]}")
except httpx.TimeoutException:
return SendResult(success=False, error="Timeout publishing to ntfy")
except Exception as e:
logger.error("[%s] Send error: %s", self.name, e)
return SendResult(success=False, error=str(e))
async def send_typing(self, chat_id: str, metadata=None) -> None:
"""ntfy does not support typing indicators."""
pass
async def get_chat_info(self, chat_id: str) -> Dict[str, Any]:
"""Return basic info about an ntfy topic."""
return {"name": chat_id, "type": "dm"}
# -- Helpers ------------------------------------------------------------
def _auth_headers(self) -> Dict[str, str]:
"""Build Authorization header if a token is configured."""
return _build_auth_header(self._token)
# ---------------------------------------------------------------------------
# Plugin registration
# ---------------------------------------------------------------------------
def _env_enablement() -> dict | None:
"""Seed ``PlatformConfig.extra`` from env vars during gateway config load.
Called by the platform registry's env-enablement hook BEFORE adapter
construction, so ``gateway status`` and ``get_connected_platforms()``
reflect env-only configuration without instantiating the HTTP client.
Returns ``None`` when ntfy isn't minimally configured; the caller skips
auto-enabling.
The special ``home_channel`` key in the returned dict is handled by the
core hook — it becomes a proper ``HomeChannel`` dataclass on the
``PlatformConfig`` rather than being merged into ``extra``.
"""
topic = os.getenv("NTFY_TOPIC", "").strip()
if not topic:
return None
seed: dict = {
"topic": topic,
"server": os.getenv("NTFY_SERVER_URL", DEFAULT_SERVER).rstrip("/"),
}
publish_topic = os.getenv("NTFY_PUBLISH_TOPIC", "").strip()
if publish_topic:
seed["publish_topic"] = publish_topic
token = os.getenv("NTFY_TOKEN", "").strip()
if token:
seed["token"] = token
markdown = os.getenv("NTFY_MARKDOWN", "").strip().lower()
if markdown:
seed["markdown"] = markdown in ("1", "true", "yes")
home = os.getenv("NTFY_HOME_CHANNEL", "").strip() or topic
if home:
seed["home_channel"] = {
"chat_id": home,
"name": os.getenv("NTFY_HOME_CHANNEL_NAME", home),
}
return seed
async def _standalone_send(
pconfig,
chat_id: str,
message: str,
*,
thread_id: Optional[str] = None,
media_files: Optional[List[str]] = None,
force_document: bool = False,
) -> Dict[str, Any]:
"""Out-of-process publish for cron / send_message_tool fallbacks.
Used by ``tools/send_message_tool._send_via_adapter`` and the cron
scheduler when the gateway runner is not in this process (e.g.
``hermes cron`` running standalone). Without this hook,
``deliver=ntfy`` cron jobs fail with ``No live adapter for platform``.
``thread_id`` and ``media_files`` are accepted for signature parity
only — ntfy has no thread or attachment primitive. Markdown is
honored if ``NTFY_MARKDOWN`` is set OR ``pconfig.extra["markdown"]``
is True.
"""
if not HTTPX_AVAILABLE:
return {"error": "ntfy standalone send: httpx not installed"}
extra = getattr(pconfig, "extra", {}) or {}
server = (
extra.get("server")
or os.getenv("NTFY_SERVER_URL", DEFAULT_SERVER)
).rstrip("/")
publish_topic = (
chat_id
or extra.get("publish_topic")
or os.getenv("NTFY_PUBLISH_TOPIC", "").strip()
or extra.get("topic")
or os.getenv("NTFY_TOPIC", "").strip()
)
if not publish_topic:
return {"error": "ntfy standalone send: NTFY_TOPIC not configured"}
token = extra.get("token") or os.getenv("NTFY_TOKEN", "")
markdown_env = os.getenv("NTFY_MARKDOWN", "").strip().lower()
markdown_enabled = bool(extra.get("markdown")) or markdown_env in ("1", "true", "yes")
headers = {"Content-Type": "text/plain; charset=utf-8", **_build_auth_header(token)}
if markdown_enabled:
headers["X-Markdown"] = "true"
body = _truncate_body(message, context="ntfy standalone")
url = f"{server}/{publish_topic}"
try:
async with httpx.AsyncClient(timeout=15.0) as client:
resp = await client.post(url, content=body, headers=headers)
if resp.status_code >= 300:
return {"error": f"ntfy HTTP {resp.status_code}: {resp.text[:200]}"}
try:
data = resp.json()
msg_id = data.get("id") or uuid.uuid4().hex[:12]
except Exception:
msg_id = uuid.uuid4().hex[:12]
return {"success": True, "platform": "ntfy", "chat_id": publish_topic, "message_id": msg_id}
except Exception as e:
return {"error": f"ntfy standalone send failed: {e}"}
def register(ctx) -> None:
"""Plugin entry point — called by the Hermes plugin system at startup."""
ctx.register_platform(
name="ntfy",
label="ntfy",
adapter_factory=lambda cfg: NtfyAdapter(cfg),
check_fn=check_requirements,
validate_config=validate_config,
is_connected=is_connected,
required_env=["NTFY_TOPIC"],
install_hint="pip install httpx # already a Hermes dependency",
# Env-driven auto-configuration: seeds PlatformConfig.extra so
# env-only setups show up in `hermes gateway status` without
# instantiating the HTTP client.
env_enablement_fn=_env_enablement,
# Cron home-channel delivery support — `deliver=ntfy` cron jobs
# route to NTFY_HOME_CHANNEL when set.
cron_deliver_env_var="NTFY_HOME_CHANNEL",
# Out-of-process cron delivery. Without this hook, deliver=ntfy
# cron jobs fail with "No live adapter" when cron runs separately
# from the gateway.
standalone_sender_fn=_standalone_send,
# Auth env vars for _is_user_authorized() integration.
allowed_users_env="NTFY_ALLOWED_USERS",
allow_all_env="NTFY_ALLOW_ALL_USERS",
max_message_length=MAX_MESSAGE_LENGTH,
emoji="🔔",
# ntfy publishers have no persistent identity — topic names are
# the only identifier, no phone numbers / emails to redact.
pii_safe=True,
allow_update_command=True,
platform_hint=(
"You are communicating via ntfy push notifications. "
"Use plain text by default — ntfy supports optional markdown "
"(set markdown: true in config or NTFY_MARKDOWN=true). "
"Keep responses concise; ntfy is a push notification service "
"with a 4096-character per-message limit."
),
)