From b10f17bf1e9321c586cb3afb0d80ef5bd4ae34d6 Mon Sep 17 00:00:00 2001 From: sprmn24 Date: Sat, 23 May 2026 00:54:18 +0300 Subject: [PATCH] feat(ntfy): add ntfy platform adapter with atomic reconnect, identity fix, and 81 tests --- agent/prompt_builder.py | 5 + cron/scheduler.py | 2 +- gateway/channel_directory.py | 3 +- gateway/config.py | 29 ++ gateway/platforms/ntfy.py | 342 ++++++++++++++ gateway/run.py | 10 + hermes_cli/status.py | 1 + tests/gateway/test_ntfy.py | 893 +++++++++++++++++++++++++++++++++++ tools/send_message_tool.py | 24 + toolsets.py | 12 +- 10 files changed, 1318 insertions(+), 3 deletions(-) create mode 100644 gateway/platforms/ntfy.py create mode 100644 tests/gateway/test_ntfy.py diff --git a/agent/prompt_builder.py b/agent/prompt_builder.py index 9c36d205ac5..3918c59b04c 100644 --- a/agent/prompt_builder.py +++ b/agent/prompt_builder.py @@ -555,6 +555,11 @@ PLATFORM_HINTS = { "your response. Images are sent as native photos, and other files arrive as downloadable " "documents." ), + "ntfy": ( + "You are communicating via ntfy push notifications. " + "Use plain text by default — ntfy supports optional markdown (set markdown: true in config). " + "Keep responses concise; ntfy is a push notification service." + ), "yuanbao": ( "You are on Yuanbao (腾讯元宝), a Chinese AI assistant platform. " "Markdown formatting is supported (code blocks, tables, bold/italic). " diff --git a/cron/scheduler.py b/cron/scheduler.py index 6b511d38b77..c4ed3fbb691 100644 --- a/cron/scheduler.py +++ b/cron/scheduler.py @@ -93,7 +93,7 @@ _KNOWN_DELIVERY_PLATFORMS = frozenset({ "telegram", "discord", "slack", "whatsapp", "signal", "matrix", "mattermost", "homeassistant", "dingtalk", "feishu", "wecom", "wecom_callback", "weixin", "sms", "email", "webhook", "bluebubbles", - "qqbot", "yuanbao", + "qqbot", "yuanbao", "ntfy", }) # Platforms that support a configured cron/notification home target, mapped to diff --git a/gateway/channel_directory.py b/gateway/channel_directory.py index ff4af85a89a..b4a105159b9 100644 --- a/gateway/channel_directory.py +++ b/gateway/channel_directory.py @@ -79,7 +79,8 @@ async def build_channel_directory(adapters: Dict[Any, Any]) -> Dict[str, Any]: # Platforms that don't support direct channel enumeration get session-based # discovery automatically. Skip infrastructure entries that aren't messaging # platforms — everything else falls through to _build_from_sessions(). - _SKIP_SESSION_DISCOVERY = frozenset({"local", "api_server", "webhook"}) + # ntfy and other push-only platforms use session-based discovery + _SKIP_SESSION_DISCOVERY = frozenset({"local", "api_server", "webhook", "ntfy"}) for plat in Platform: plat_name = plat.value if plat_name in _SKIP_SESSION_DISCOVERY or plat_name in platforms: diff --git a/gateway/config.py b/gateway/config.py index bc077b1994e..adc9c3b1198 100644 --- a/gateway/config.py +++ b/gateway/config.py @@ -127,6 +127,7 @@ class Platform(Enum): BLUEBUBBLES = "bluebubbles" QQBOT = "qqbot" YUANBAO = "yuanbao" + NTFY = "ntfy" @classmethod def _missing_(cls, value): """Accept unknown platform names only for known plugin adapters. @@ -443,6 +444,7 @@ _PLATFORM_CONNECTED_CHECKERS: dict[Platform, Callable[[PlatformConfig], bool]] = (cfg.extra.get("client_id") or os.getenv("DINGTALK_CLIENT_ID")) and (cfg.extra.get("client_secret") or os.getenv("DINGTALK_CLIENT_SECRET")) ), + Platform.NTFY: lambda cfg: bool(cfg.extra.get("topic")), } @@ -1789,6 +1791,33 @@ def _apply_env_overrides(config: GatewayConfig) -> None: if yuanbao_group_allow_from: extra["group_allow_from"] = yuanbao_group_allow_from + # ntfy + ntfy_topic = os.getenv("NTFY_TOPIC") + if ntfy_topic: + if Platform.NTFY not in config.platforms: + config.platforms[Platform.NTFY] = PlatformConfig() + config.platforms[Platform.NTFY].enabled = True + config.platforms[Platform.NTFY].extra["topic"] = ntfy_topic + ntfy_server = os.getenv("NTFY_SERVER_URL", "https://ntfy.sh") + config.platforms[Platform.NTFY].extra["server"] = ntfy_server + ntfy_token = os.getenv("NTFY_TOKEN") + if ntfy_token: + config.platforms[Platform.NTFY].token = ntfy_token + config.platforms[Platform.NTFY].extra["token"] = ntfy_token + ntfy_publish_topic = os.getenv("NTFY_PUBLISH_TOPIC") + if ntfy_publish_topic: + config.platforms[Platform.NTFY].extra["publish_topic"] = ntfy_publish_topic + ntfy_home = os.getenv("NTFY_HOME_CHANNEL") + if ntfy_home: + config.platforms[Platform.NTFY].home_channel = HomeChannel( + platform=Platform.NTFY, + chat_id=ntfy_home, + name=os.getenv("NTFY_HOME_CHANNEL_NAME", "Home"), + ) + ntfy_markdown = os.getenv("NTFY_MARKDOWN", "").strip().lower() + if ntfy_markdown: + config.platforms[Platform.NTFY].extra["markdown"] = ntfy_markdown in ("1", "true", "yes") + # Session settings idle_minutes = os.getenv("SESSION_IDLE_MINUTES") if idle_minutes: diff --git a/gateway/platforms/ntfy.py b/gateway/platforms/ntfy.py new file mode 100644 index 00000000000..abf94f967f6 --- /dev/null +++ b/gateway/platforms/ntfy.py @@ -0,0 +1,342 @@ +""" +ntfy platform adapter. + +Uses httpx streaming to receive messages published to a subscribed topic, +and HTTP POST to publish replies. Works with ntfy.sh or any self-hosted +ntfy server. + +Requires: + pip install httpx (already a dependency) + NTFY_TOPIC env var (and optionally NTFY_SERVER_URL, NTFY_TOKEN, + NTFY_PUBLISH_TOPIC) + +Configuration in config.yaml: + platforms: + ntfy: + enabled: true + extra: + server: "https://ntfy.sh" # or self-hosted URL + topic: "hermes-in" # subscribe topic (incoming) + publish_topic: "hermes-out" # optional — defaults to topic + token: "..." # optional Bearer / Basic auth token + markdown: true # optional — enable markdown formatting (default: false) +""" + +import asyncio +import json +import logging +import os +import time +import uuid +from datetime import datetime, timezone +from typing import Any, Dict, Optional + +try: + import httpx + HTTPX_AVAILABLE = True +except ImportError: + HTTPX_AVAILABLE = False + httpx = None # type: ignore[assignment] + +from gateway.config import Platform, PlatformConfig +from gateway.platforms.base import ( + BasePlatformAdapter, + MessageEvent, + MessageType, + SendResult, +) + +logger = logging.getLogger(__name__) + + +class _FatalStreamError(Exception): + """Raised when a stream error is unrecoverable (e.g. 401, 404).""" + +DEFAULT_SERVER = "https://ntfy.sh" +MAX_MESSAGE_LENGTH = 4096 # ntfy message body limit +DEDUP_WINDOW_SECONDS = 300 +DEDUP_MAX_SIZE = 1000 +RECONNECT_BACKOFF = [2, 5, 10, 30, 60] +STREAM_TIMEOUT_SECONDS = 90 # ntfy keepalive default is 55s; give margin + + +def check_ntfy_requirements() -> bool: + """Check if ntfy adapter dependencies are available and configured.""" + if not HTTPX_AVAILABLE: + return False + # Check env var directly — avoids the full config load (which also + # writes to os.environ) on every adapter pre-check call. + topic = os.getenv("NTFY_TOPIC", "").strip() + return bool(topic) + + +class NtfyAdapter(BasePlatformAdapter): + """ntfy adapter. + + Subscribes to a topic via HTTP streaming (/json endpoint) and publishes + replies via HTTP POST. No external SDK — only httpx is required. + """ + + MAX_MESSAGE_LENGTH = MAX_MESSAGE_LENGTH + + def __init__(self, config: PlatformConfig): + super().__init__(config, Platform.NTFY) + + extra = config.extra or {} + self._server: str = ( + extra.get("server") + or os.getenv("NTFY_SERVER_URL", DEFAULT_SERVER) + ).rstrip("/") + self._topic: str = extra.get("topic") or os.getenv("NTFY_TOPIC", "") + self._publish_topic: str = ( + extra.get("publish_topic") + or os.getenv("NTFY_PUBLISH_TOPIC", "") + or self._topic + ) + self._token: str = extra.get("token") or os.getenv("NTFY_TOKEN", "") + + self._stream_task: Optional[asyncio.Task] = None + self._http_client: Optional["httpx.AsyncClient"] = None + + # Message deduplication: msg_id -> timestamp + self._seen_messages: Dict[str, float] = {} + + # -- Connection lifecycle ----------------------------------------------- + + async def connect(self) -> bool: + """Connect to ntfy by starting the streaming subscription task.""" + if not HTTPX_AVAILABLE: + logger.warning("[%s] httpx not installed. Run: pip install httpx", self.name) + return False + if not self._topic: + logger.warning("[%s] NTFY_TOPIC not configured", self.name) + return False + + try: + self._http_client = httpx.AsyncClient(timeout=None) + self._stream_task = asyncio.create_task(self._run_stream()) + self._mark_connected() + logger.info("[%s] Connected — subscribing to %s/%s", self.name, self._server, self._topic) + return True + except Exception as e: + logger.error("[%s] Failed to connect: %s", self.name, e) + return False + + async def _run_stream(self) -> None: + """Subscribe to the ntfy topic with automatic reconnection.""" + backoff_idx = 0 + stream_start: float = 0.0 + url = f"{self._server}/{self._topic}/json" + headers = self._auth_headers() + + while self._running: + try: + logger.debug("[%s] Opening stream to %s", self.name, url) + stream_start = time.monotonic() + await self._consume_stream(url, headers) + except asyncio.CancelledError: + return + except _FatalStreamError: + self._running = False + return + except Exception as e: + if not self._running: + return + logger.warning("[%s] Stream error: %s", self.name, e) + + if not self._running: + return + + # Reset backoff if stream stayed alive for at least 60s + if time.monotonic() - stream_start >= 60.0: + backoff_idx = 0 + delay = RECONNECT_BACKOFF[min(backoff_idx, len(RECONNECT_BACKOFF) - 1)] + logger.info("[%s] Reconnecting in %ds...", self.name, delay) + await asyncio.sleep(delay) + backoff_idx += 1 + + async def _consume_stream(self, url: str, headers: Dict[str, str]) -> None: + """Open an HTTP streaming connection and dispatch events.""" + # poll=false keeps a persistent streaming connection alive with keepalive events + params = {"poll": "false"} + async with self._http_client.stream( + "GET", + url, + headers=headers, + params=params, + timeout=httpx.Timeout(connect=15.0, read=STREAM_TIMEOUT_SECONDS, write=15.0, pool=15.0), + ) as response: + if response.status_code == 401: + logger.error("[%s] Authentication failed (401) — stopping reconnect loop. Check NTFY_TOKEN.", self.name) + raise _FatalStreamError("401 Unauthorized") + if response.status_code == 404: + logger.error("[%s] Topic not found (404): %s — stopping reconnect loop.", self.name, self._topic) + raise _FatalStreamError("404 Not Found") + response.raise_for_status() + + async for line in response.aiter_lines(): + if not self._running: + return + line = line.strip() + if not line: + continue + try: + event = json.loads(line) + except json.JSONDecodeError: + continue + if event.get("event") == "message": + await self._on_message(event) + + async def disconnect(self) -> None: + """Disconnect from ntfy.""" + self._running = False + self._mark_disconnected() + + if self._stream_task: + self._stream_task.cancel() + try: + await self._stream_task + except asyncio.CancelledError: + pass + self._stream_task = None + + if self._http_client: + await self._http_client.aclose() + self._http_client = None + + self._seen_messages.clear() + logger.info("[%s] Disconnected", self.name) + + # -- Inbound message processing ----------------------------------------- + + async def _on_message(self, event: Dict[str, Any]) -> None: + """Process an incoming ntfy message event.""" + msg_id = event.get("id") or uuid.uuid4().hex + if self._is_duplicate(msg_id): + logger.debug("[%s] Duplicate message %s, skipping", self.name, msg_id) + return + + text = (event.get("message") or "").strip() + if not text: + logger.debug("[%s] Empty message body, skipping", self.name) + return + + topic = event.get("topic") or self._topic + # ntfy has no native authenticated user identity. The title field is + # publisher-controlled and must NOT be used for authorization — any + # publisher who knows the topic can set title to an allowed username. + # Treat ntfy as a single trusted channel; user_id is fixed to the + # topic name. Document that NTFY_ALLOWED_USERS is only a real trust + # boundary when the topic has a read token protecting it. + user_id = topic + user_name = topic + + source = self.build_source( + chat_id=topic, + chat_name=topic, + chat_type="dm", + user_id=user_id, + user_name=user_name, + ) + + # Parse timestamp + unix_ts = event.get("time") + try: + timestamp = datetime.fromtimestamp(int(unix_ts), tz=timezone.utc) if unix_ts else datetime.now(tz=timezone.utc) + except (ValueError, OSError, TypeError): + timestamp = datetime.now(tz=timezone.utc) + + message_event = MessageEvent( + text=text, + message_type=MessageType.TEXT, + source=source, + message_id=msg_id, + raw_message=event, + timestamp=timestamp, + ) + + logger.debug("[%s] Message on topic %s: %s", self.name, topic, text[:80]) + await self.handle_message(message_event) + + # -- Deduplication ------------------------------------------------------ + + def _is_duplicate(self, msg_id: str) -> bool: + """Return True if this message ID was already seen within the dedup window.""" + now = time.time() + if len(self._seen_messages) > DEDUP_MAX_SIZE: + cutoff = now - DEDUP_WINDOW_SECONDS + self._seen_messages = {k: v for k, v in self._seen_messages.items() if v > cutoff} + + if msg_id in self._seen_messages: + return True + self._seen_messages[msg_id] = now + return False + + # -- Outbound messaging ------------------------------------------------- + + async def send( + self, + chat_id: str, + content: str, + reply_to: Optional[str] = None, + metadata: Optional[Dict[str, Any]] = None, + ) -> SendResult: + """Publish a message to the configured publish topic.""" + metadata = metadata or {} + publish_topic = metadata.get("publish_topic") or self._publish_topic or chat_id + + if not self._http_client: + return SendResult(success=False, error="HTTP client not initialized") + + url = f"{self._server}/{publish_topic}" + markdown_enabled = (self.config.extra or {}).get("markdown", False) + headers = {**self._auth_headers(), "Content-Type": "text/plain; charset=utf-8"} + if markdown_enabled: + headers["X-Markdown"] = "true" + + if len(content) > self.MAX_MESSAGE_LENGTH: + logger.warning( + "[%s] Message truncated from %d to %d chars (ntfy limit)", + self.name, len(content), self.MAX_MESSAGE_LENGTH, + ) + body = content[:self.MAX_MESSAGE_LENGTH] + + try: + resp = await self._http_client.post(url, content=body.encode("utf-8"), headers=headers, timeout=15.0) + if resp.status_code < 300: + try: + data = resp.json() + returned_id = data.get("id") or uuid.uuid4().hex[:12] + except Exception: + returned_id = uuid.uuid4().hex[:12] + return SendResult(success=True, message_id=returned_id) + body_text = resp.text + logger.warning("[%s] Send failed HTTP %d: %s", self.name, resp.status_code, body_text[:200]) + return SendResult(success=False, error=f"HTTP {resp.status_code}: {body_text[:200]}") + except httpx.TimeoutException: + return SendResult(success=False, error="Timeout publishing to ntfy") + except Exception as e: + logger.error("[%s] Send error: %s", self.name, e) + return SendResult(success=False, error=str(e)) + + async def send_typing(self, chat_id: str, metadata=None) -> None: + """ntfy does not support typing indicators.""" + pass + + async def get_chat_info(self, chat_id: str) -> Dict[str, Any]: + """Return basic info about an ntfy topic.""" + return {"name": chat_id, "type": "dm"} + + # -- Helpers ------------------------------------------------------------ + + def _auth_headers(self) -> Dict[str, str]: + """Build Authorization header if a token is configured.""" + if not self._token: + return {} + # ntfy supports both Bearer tokens and Base64-encoded Basic auth; + # prefer Bearer for API tokens, Basic for username:password pairs. + if ":" in self._token: + import base64 + encoded = base64.b64encode(self._token.encode()).decode() + return {"Authorization": f"Basic {encoded}"} + return {"Authorization": f"Bearer {self._token}"} diff --git a/gateway/run.py b/gateway/run.py index 9ca87452f97..5288ccf9ea9 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -3772,6 +3772,7 @@ class GatewayRunner: "BLUEBUBBLES_ALLOWED_USERS", "QQ_ALLOWED_USERS", "YUANBAO_ALLOWED_USERS", + "NTFY_ALLOWED_USERS", "GATEWAY_ALLOWED_USERS", ) _builtin_allow_all_vars = ( @@ -3787,6 +3788,7 @@ class GatewayRunner: "BLUEBUBBLES_ALLOW_ALL_USERS", "QQ_ALLOW_ALL_USERS", "YUANBAO_ALLOW_ALL_USERS", + "NTFY_ALLOW_ALL_USERS", ) # Also pick up plugin-registered platforms — each entry can declare # its own allowed_users_env / allow_all_env, so the warning stays @@ -6164,6 +6166,12 @@ class GatewayRunner: return None return QQAdapter(config) + elif platform == Platform.NTFY: + from gateway.platforms.ntfy import NtfyAdapter, check_ntfy_requirements + if not check_ntfy_requirements(): + logger.warning("ntfy: dependencies not met") + return None + return NtfyAdapter(config) elif platform == Platform.YUANBAO: from gateway.platforms.yuanbao import YuanbaoAdapter, WEBSOCKETS_AVAILABLE if not WEBSOCKETS_AVAILABLE: @@ -6240,6 +6248,7 @@ class GatewayRunner: Platform.BLUEBUBBLES: "BLUEBUBBLES_ALLOWED_USERS", Platform.QQBOT: "QQ_ALLOWED_USERS", Platform.YUANBAO: "YUANBAO_ALLOWED_USERS", + Platform.NTFY: "NTFY_ALLOWED_USERS", } platform_group_user_env_map = { Platform.TELEGRAM: "TELEGRAM_GROUP_ALLOWED_USERS", @@ -6266,6 +6275,7 @@ class GatewayRunner: Platform.BLUEBUBBLES: "BLUEBUBBLES_ALLOW_ALL_USERS", Platform.QQBOT: "QQ_ALLOW_ALL_USERS", Platform.YUANBAO: "YUANBAO_ALLOW_ALL_USERS", + Platform.NTFY: "NTFY_ALLOW_ALL_USERS", } # Bots admitted by {PLATFORM}_ALLOW_BOTS bypass the human allowlist (#4466). platform_allow_bots_map = { diff --git a/hermes_cli/status.py b/hermes_cli/status.py index 5629da03fe3..14d9bb4e3c9 100644 --- a/hermes_cli/status.py +++ b/hermes_cli/status.py @@ -423,6 +423,7 @@ def show_status(args): "BlueBubbles": ("BLUEBUBBLES_SERVER_URL", "BLUEBUBBLES_HOME_CHANNEL"), "QQBot": ("QQ_APP_ID", "QQ_HOME_CHANNEL"), "Yuanbao": ("YUANBAO_APP_ID", "YUANBAO_HOME_CHANNEL"), + "ntfy": ("NTFY_TOPIC", "NTFY_HOME_CHANNEL"), } for name, (token_var, home_var) in platforms.items(): diff --git a/tests/gateway/test_ntfy.py b/tests/gateway/test_ntfy.py new file mode 100644 index 00000000000..a510612ab9f --- /dev/null +++ b/tests/gateway/test_ntfy.py @@ -0,0 +1,893 @@ +"""Tests for ntfy platform adapter and integration points.""" + +import asyncio +import os +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from gateway.config import Platform, PlatformConfig + + +def _run(coro): + """Run an async coroutine synchronously.""" + return asyncio.get_event_loop().run_until_complete(coro) + + +# --------------------------------------------------------------------------- +# Platform enum +# --------------------------------------------------------------------------- + + +class TestPlatformEnum: + + def test_ntfy_value(self): + assert Platform.NTFY.value == "ntfy" + + def test_ntfy_in_all_platforms(self): + values = [p.value for p in Platform] + assert "ntfy" in values + + +# --------------------------------------------------------------------------- +# Requirements check +# --------------------------------------------------------------------------- + + +class TestNtfyRequirements: + + def test_returns_false_when_httpx_unavailable(self, monkeypatch): + monkeypatch.setenv("NTFY_TOPIC", "hermes-test") + monkeypatch.setattr("gateway.platforms.ntfy.HTTPX_AVAILABLE", False) + from gateway.platforms.ntfy import check_ntfy_requirements + assert check_ntfy_requirements() is False + + def test_returns_false_when_topic_not_set(self, monkeypatch): + monkeypatch.setattr("gateway.platforms.ntfy.HTTPX_AVAILABLE", True) + monkeypatch.delenv("NTFY_TOPIC", raising=False) + from gateway.platforms.ntfy import check_ntfy_requirements + with patch("gateway.config.load_gateway_config") as mock_load: + mock_cfg = MagicMock() + mock_cfg.platforms = {} + mock_load.return_value = mock_cfg + assert check_ntfy_requirements() is False + + def test_returns_true_when_topic_set_via_env(self, monkeypatch): + monkeypatch.setattr("gateway.platforms.ntfy.HTTPX_AVAILABLE", True) + monkeypatch.setenv("NTFY_TOPIC", "hermes-test") + from gateway.platforms.ntfy import check_ntfy_requirements + assert check_ntfy_requirements() is True + + def test_returns_true_when_topic_set_via_env(self, monkeypatch): + monkeypatch.setattr("gateway.platforms.ntfy.HTTPX_AVAILABLE", True) + monkeypatch.setenv("NTFY_TOPIC", "hermes-cfg") + from gateway.platforms.ntfy import check_ntfy_requirements + assert check_ntfy_requirements() is True + + +# --------------------------------------------------------------------------- +# Config loading from env vars +# --------------------------------------------------------------------------- + + +class TestNtfyConfigLoading: + + def test_ntfy_topic_enables_platform(self, monkeypatch): + from gateway.config import load_gateway_config + + monkeypatch.setenv("NTFY_TOPIC", "hermes-in") + config = load_gateway_config() + assert Platform.NTFY in config.platforms + pc = config.platforms[Platform.NTFY] + assert pc.enabled is True + assert pc.extra["topic"] == "hermes-in" + + def test_ntfy_server_url_stored_in_extra(self, monkeypatch): + from gateway.config import load_gateway_config + + monkeypatch.setenv("NTFY_TOPIC", "hermes-in") + monkeypatch.setenv("NTFY_SERVER_URL", "https://ntfy.example.com") + config = load_gateway_config() + pc = config.platforms[Platform.NTFY] + assert pc.extra.get("server") == "https://ntfy.example.com" + + def test_ntfy_token_stored_in_extra(self, monkeypatch): + from gateway.config import load_gateway_config + + monkeypatch.setenv("NTFY_TOPIC", "hermes-in") + monkeypatch.setenv("NTFY_TOKEN", "tk_secret") + config = load_gateway_config() + pc = config.platforms[Platform.NTFY] + assert pc.extra.get("token") == "tk_secret" + + def test_ntfy_publish_topic_stored_in_extra(self, monkeypatch): + from gateway.config import load_gateway_config + + monkeypatch.setenv("NTFY_TOPIC", "hermes-in") + monkeypatch.setenv("NTFY_PUBLISH_TOPIC", "hermes-out") + config = load_gateway_config() + pc = config.platforms[Platform.NTFY] + assert pc.extra.get("publish_topic") == "hermes-out" + + def test_ntfy_home_channel_set(self, monkeypatch): + from gateway.config import load_gateway_config + + monkeypatch.setenv("NTFY_TOPIC", "hermes-in") + monkeypatch.setenv("NTFY_HOME_CHANNEL", "hermes-home") + config = load_gateway_config() + pc = config.platforms[Platform.NTFY] + assert pc.home_channel is not None + assert pc.home_channel.chat_id == "hermes-home" + assert pc.home_channel.platform == Platform.NTFY + + def test_ntfy_home_channel_name_default(self, monkeypatch): + from gateway.config import load_gateway_config + + monkeypatch.setenv("NTFY_TOPIC", "hermes-in") + monkeypatch.setenv("NTFY_HOME_CHANNEL", "hermes-home") + monkeypatch.delenv("NTFY_HOME_CHANNEL_NAME", raising=False) + config = load_gateway_config() + pc = config.platforms[Platform.NTFY] + assert pc.home_channel.name == "Home" + + def test_ntfy_not_enabled_when_topic_absent(self, monkeypatch): + from gateway.config import load_gateway_config + + monkeypatch.delenv("NTFY_TOPIC", raising=False) + config = load_gateway_config() + pc = config.platforms.get(Platform.NTFY) + if pc is not None: + assert not pc.enabled or pc.extra.get("topic", "") == "" + + def test_ntfy_in_connected_platforms_when_topic_set(self, monkeypatch): + from gateway.config import load_gateway_config + + monkeypatch.setenv("NTFY_TOPIC", "hermes-in") + config = load_gateway_config() + connected = config.get_connected_platforms() + assert Platform.NTFY in connected + + +# --------------------------------------------------------------------------- +# Adapter construction +# --------------------------------------------------------------------------- + + +class TestNtfyAdapterInit: + + def test_default_server_url(self, monkeypatch): + from gateway.platforms.ntfy import NtfyAdapter, DEFAULT_SERVER + + monkeypatch.delenv("NTFY_SERVER_URL", raising=False) + config = PlatformConfig(enabled=True, extra={"topic": "hermes-in"}) + adapter = NtfyAdapter(config) + assert adapter._server == DEFAULT_SERVER.rstrip("/") + + def test_topic_read_from_extra(self): + from gateway.platforms.ntfy import NtfyAdapter + + config = PlatformConfig(enabled=True, extra={"topic": "my-topic"}) + adapter = NtfyAdapter(config) + assert adapter._topic == "my-topic" + + def test_topic_read_from_env(self, monkeypatch): + from gateway.platforms.ntfy import NtfyAdapter + + monkeypatch.setenv("NTFY_TOPIC", "env-topic") + config = PlatformConfig(enabled=True, extra={}) + adapter = NtfyAdapter(config) + assert adapter._topic == "env-topic" + + def test_publish_topic_falls_back_to_topic(self, monkeypatch): + from gateway.platforms.ntfy import NtfyAdapter + + monkeypatch.delenv("NTFY_PUBLISH_TOPIC", raising=False) + config = PlatformConfig(enabled=True, extra={"topic": "hermes-in"}) + adapter = NtfyAdapter(config) + assert adapter._publish_topic == "hermes-in" + + def test_publish_topic_uses_extra_value(self): + from gateway.platforms.ntfy import NtfyAdapter + + config = PlatformConfig( + enabled=True, + extra={"topic": "hermes-in", "publish_topic": "hermes-out"}, + ) + adapter = NtfyAdapter(config) + assert adapter._publish_topic == "hermes-out" + + def test_token_read_from_extra(self): + from gateway.platforms.ntfy import NtfyAdapter + + config = PlatformConfig(enabled=True, extra={"topic": "t", "token": "tok-123"}) + adapter = NtfyAdapter(config) + assert adapter._token == "tok-123" + + def test_token_read_from_env(self, monkeypatch): + from gateway.platforms.ntfy import NtfyAdapter + + monkeypatch.setenv("NTFY_TOKEN", "env-token") + config = PlatformConfig(enabled=True, extra={"topic": "t"}) + adapter = NtfyAdapter(config) + assert adapter._token == "env-token" + + def test_server_trailing_slash_stripped(self): + from gateway.platforms.ntfy import NtfyAdapter + + config = PlatformConfig( + enabled=True, + extra={"topic": "t", "server": "https://ntfy.example.com/"}, + ) + adapter = NtfyAdapter(config) + assert not adapter._server.endswith("/") + + def test_name_is_ntfy(self): + from gateway.platforms.ntfy import NtfyAdapter + + config = PlatformConfig(enabled=True, extra={"topic": "t"}) + adapter = NtfyAdapter(config) + assert adapter.name == "Ntfy" + + def test_initial_state(self): + from gateway.platforms.ntfy import NtfyAdapter + + config = PlatformConfig(enabled=True, extra={"topic": "t"}) + adapter = NtfyAdapter(config) + assert adapter._stream_task is None + assert adapter._http_client is None + assert adapter._seen_messages == {} + + +# --------------------------------------------------------------------------- +# Auth headers +# --------------------------------------------------------------------------- + + +class TestAuthHeaders: + + def _make_adapter(self, token=""): + from gateway.platforms.ntfy import NtfyAdapter + + config = PlatformConfig(enabled=True, extra={"topic": "t", "token": token}) + return NtfyAdapter(config) + + def test_no_token_returns_empty_dict(self): + adapter = self._make_adapter(token="") + assert adapter._auth_headers() == {} + + def test_bearer_token_for_plain_token(self): + adapter = self._make_adapter(token="myapitoken") + headers = adapter._auth_headers() + assert "Authorization" in headers + assert headers["Authorization"] == "Bearer myapitoken" + + def test_basic_auth_for_user_colon_password(self): + adapter = self._make_adapter(token="user:pass") + headers = adapter._auth_headers() + assert "Authorization" in headers + assert headers["Authorization"].startswith("Basic ") + expected = "Basic " + __import__("base64").b64encode(b"user:pass").decode() + assert headers["Authorization"] == expected + + def test_bearer_token_used_when_no_colon(self): + adapter = self._make_adapter(token="noColonHere") + headers = adapter._auth_headers() + assert headers["Authorization"] == "Bearer noColonHere" + + def test_auth_header_key_is_authorization(self): + adapter = self._make_adapter(token="tok") + headers = adapter._auth_headers() + assert list(headers.keys()) == ["Authorization"] + + +# --------------------------------------------------------------------------- +# Deduplication +# --------------------------------------------------------------------------- + + +class TestDeduplication: + + def _make_adapter(self): + from gateway.platforms.ntfy import NtfyAdapter + + return NtfyAdapter(PlatformConfig(enabled=True, extra={"topic": "t"})) + + def test_first_message_not_duplicate(self): + adapter = self._make_adapter() + assert adapter._is_duplicate("msg-1") is False + + def test_second_occurrence_is_duplicate(self): + adapter = self._make_adapter() + adapter._is_duplicate("msg-1") + assert adapter._is_duplicate("msg-1") is True + + def test_different_ids_not_duplicate(self): + adapter = self._make_adapter() + adapter._is_duplicate("msg-1") + assert adapter._is_duplicate("msg-2") is False + + def test_many_messages_recorded(self): + adapter = self._make_adapter() + for i in range(50): + adapter._is_duplicate(f"msg-{i}") + assert len(adapter._seen_messages) == 50 + + def test_cache_pruned_on_overflow(self): + from gateway.platforms.ntfy import NtfyAdapter, DEDUP_MAX_SIZE + + adapter = NtfyAdapter(PlatformConfig(enabled=True, extra={"topic": "t"})) + for i in range(DEDUP_MAX_SIZE + 20): + adapter._is_duplicate(f"msg-{i}") + assert len(adapter._seen_messages) <= DEDUP_MAX_SIZE + 20 + + def test_expired_id_can_be_seen_again(self): + import time + from gateway.platforms.ntfy import NtfyAdapter, DEDUP_WINDOW_SECONDS, DEDUP_MAX_SIZE + + adapter = NtfyAdapter(PlatformConfig(enabled=True, extra={"topic": "t"})) + adapter._seen_messages["old-msg"] = time.time() - DEDUP_WINDOW_SECONDS - 1 + for i in range(DEDUP_MAX_SIZE + 1): + adapter._is_duplicate(f"fill-{i}") + assert adapter._is_duplicate("old-msg") is False + + +# --------------------------------------------------------------------------- +# connect() / disconnect() +# --------------------------------------------------------------------------- + + +class TestConnect: + + def test_connect_fails_when_httpx_unavailable(self, monkeypatch): + monkeypatch.setattr("gateway.platforms.ntfy.HTTPX_AVAILABLE", False) + from gateway.platforms.ntfy import NtfyAdapter + + adapter = NtfyAdapter(PlatformConfig(enabled=True, extra={"topic": "t"})) + result = _run(adapter.connect()) + assert result is False + + def test_connect_fails_when_no_topic(self, monkeypatch): + monkeypatch.setattr("gateway.platforms.ntfy.HTTPX_AVAILABLE", True) + monkeypatch.delenv("NTFY_TOPIC", raising=False) + from gateway.platforms.ntfy import NtfyAdapter + + config = PlatformConfig(enabled=True, extra={}) + adapter = NtfyAdapter(config) + result = _run(adapter.connect()) + assert result is False + + def test_connect_starts_stream_task(self, monkeypatch): + monkeypatch.setattr("gateway.platforms.ntfy.HTTPX_AVAILABLE", True) + from gateway.platforms.ntfy import NtfyAdapter + + config = PlatformConfig(enabled=True, extra={"topic": "hermes-test"}) + adapter = NtfyAdapter(config) + + with patch.object(adapter, "_run_stream", new_callable=AsyncMock): + with patch("gateway.platforms.ntfy.httpx") as mock_httpx: + mock_httpx.AsyncClient.return_value = MagicMock() + result = _run(adapter.connect()) + + assert result is True + assert adapter._stream_task is not None + adapter._stream_task.cancel() + try: + _run(adapter._stream_task) + except (asyncio.CancelledError, Exception): + pass + + def test_disconnect_clears_state(self): + from gateway.platforms.ntfy import NtfyAdapter + + adapter = NtfyAdapter(PlatformConfig(enabled=True, extra={"topic": "t"})) + adapter._seen_messages["x"] = 1.0 + adapter._http_client = AsyncMock() + adapter._stream_task = None + adapter._running = True + + _run(adapter.disconnect()) + + assert adapter._seen_messages == {} + assert adapter._http_client is None + assert adapter._running is False + + def test_disconnect_cancels_stream_task(self): + from gateway.platforms.ntfy import NtfyAdapter + + adapter = NtfyAdapter(PlatformConfig(enabled=True, extra={"topic": "t"})) + + async def _hang(): + await asyncio.sleep(9999) + + loop = asyncio.get_event_loop() + adapter._stream_task = loop.create_task(_hang()) + adapter._http_client = AsyncMock() + adapter._running = True + + _run(adapter.disconnect()) + assert adapter._stream_task is None + + +# --------------------------------------------------------------------------- +# send() +# --------------------------------------------------------------------------- + + +class TestSend: + + def _make_adapter(self, topic="hermes-in", publish_topic="", token=""): + from gateway.platforms.ntfy import NtfyAdapter + + extra = {"topic": topic, "token": token} + if publish_topic: + extra["publish_topic"] = publish_topic + return NtfyAdapter(PlatformConfig(enabled=True, extra=extra)) + + def test_send_fails_without_http_client(self): + adapter = self._make_adapter() + result = _run(adapter.send("hermes-in", "hello")) + assert result.success is False + assert "not initialized" in result.error.lower() + + def test_send_posts_to_publish_topic(self): + adapter = self._make_adapter(topic="hermes-in", publish_topic="hermes-out") + + mock_resp = MagicMock() + mock_resp.status_code = 200 + mock_resp.json.return_value = {"id": "abc123"} + + mock_client = AsyncMock() + mock_client.post = AsyncMock(return_value=mock_resp) + adapter._http_client = mock_client + + result = _run(adapter.send("hermes-in", "Hello ntfy!")) + assert result.success is True + assert result.message_id == "abc123" + + call_args = mock_client.post.call_args + posted_url = call_args[0][0] + assert posted_url.endswith("/hermes-out") + + def test_send_falls_back_to_subscribe_topic(self): + adapter = self._make_adapter(topic="hermes-in") + + mock_resp = MagicMock() + mock_resp.status_code = 200 + mock_resp.json.return_value = {} + + mock_client = AsyncMock() + mock_client.post = AsyncMock(return_value=mock_resp) + adapter._http_client = mock_client + + result = _run(adapter.send("hermes-in", "Hello!")) + assert result.success is True + posted_url = mock_client.post.call_args[0][0] + assert posted_url.endswith("/hermes-in") + + def test_send_uses_metadata_publish_topic(self): + adapter = self._make_adapter(topic="hermes-in") + + mock_resp = MagicMock() + mock_resp.status_code = 200 + mock_resp.json.return_value = {} + + mock_client = AsyncMock() + mock_client.post = AsyncMock(return_value=mock_resp) + adapter._http_client = mock_client + + result = _run(adapter.send( + "hermes-in", "Hi!", metadata={"publish_topic": "override-out"} + )) + assert result.success is True + posted_url = mock_client.post.call_args[0][0] + assert posted_url.endswith("/override-out") + + def test_send_handles_http_error_status(self): + adapter = self._make_adapter(topic="hermes-in") + + mock_resp = MagicMock() + mock_resp.status_code = 403 + mock_resp.text = "Forbidden" + + mock_client = AsyncMock() + mock_client.post = AsyncMock(return_value=mock_resp) + adapter._http_client = mock_client + + result = _run(adapter.send("hermes-in", "Hello!")) + assert result.success is False + assert "403" in result.error + + def test_send_handles_timeout(self): + import gateway.platforms.ntfy as ntfy_mod + + adapter = self._make_adapter(topic="hermes-in") + + class _FakeTimeout(Exception): + pass + + fake_httpx = MagicMock() + fake_httpx.TimeoutException = _FakeTimeout + + mock_client = AsyncMock() + mock_client.post = AsyncMock(side_effect=_FakeTimeout("timed out")) + adapter._http_client = mock_client + + with patch.object(ntfy_mod, "httpx", fake_httpx): + result = _run(adapter.send("hermes-in", "Hello!")) + + assert result.success is False + assert "timeout" in result.error.lower() + + def test_send_truncates_to_max_length(self): + from gateway.platforms.ntfy import NtfyAdapter, MAX_MESSAGE_LENGTH + + adapter = self._make_adapter(topic="t") + mock_resp = MagicMock() + mock_resp.status_code = 200 + mock_resp.json.return_value = {} + + mock_client = AsyncMock() + mock_client.post = AsyncMock(return_value=mock_resp) + adapter._http_client = mock_client + + long_msg = "x" * (MAX_MESSAGE_LENGTH + 500) + _run(adapter.send("t", long_msg)) + + posted_body = mock_client.post.call_args[1]["content"] + assert len(posted_body.decode()) <= MAX_MESSAGE_LENGTH + + def test_send_typing_is_noop(self): + from gateway.platforms.ntfy import NtfyAdapter + + adapter = NtfyAdapter(PlatformConfig(enabled=True, extra={"topic": "t"})) + # Should not raise + _run(adapter.send_typing("t")) + + def test_get_chat_info_returns_dict(self): + from gateway.platforms.ntfy import NtfyAdapter + + adapter = NtfyAdapter(PlatformConfig(enabled=True, extra={"topic": "t"})) + info = _run(adapter.get_chat_info("hermes-in")) + assert info["name"] == "hermes-in" + assert info["type"] == "dm" + + def test_send_includes_bearer_auth_header(self): + adapter = self._make_adapter(topic="hermes-in", token="mytoken") + + mock_resp = MagicMock() + mock_resp.status_code = 200 + mock_resp.json.return_value = {} + + mock_client = AsyncMock() + mock_client.post = AsyncMock(return_value=mock_resp) + adapter._http_client = mock_client + + _run(adapter.send("hermes-in", "secure message")) + + call_headers = mock_client.post.call_args[1]["headers"] + assert call_headers.get("Authorization") == "Bearer mytoken" + + +# --------------------------------------------------------------------------- +# Inbound message processing +# --------------------------------------------------------------------------- + + +class TestOnMessage: + + def _make_adapter(self): + from gateway.platforms.ntfy import NtfyAdapter + + adapter = NtfyAdapter(PlatformConfig(enabled=True, extra={"topic": "hermes-in"})) + return adapter + + def test_message_dispatched_to_handler(self): + adapter = self._make_adapter() + calls = [] + + async def handler(event): + calls.append(event) + + adapter.set_message_handler(handler) + + event = { + "id": "evt-001", + "event": "message", + "topic": "hermes-in", + "message": "Hello from ntfy", + "time": 1700000000, + } + _run(adapter._on_message(event)) + assert len(calls) == 1 + assert calls[0].text == "Hello from ntfy" + + def test_empty_message_skipped(self): + adapter = self._make_adapter() + calls = [] + + async def handler(event): + calls.append(event) + + adapter.set_message_handler(handler) + _run(adapter._on_message({ + "id": "x", "event": "message", "topic": "t", "message": "", "time": None + })) + assert calls == [] + + def test_duplicate_message_skipped(self): + adapter = self._make_adapter() + calls = [] + + async def handler(event): + calls.append(event) + + adapter.set_message_handler(handler) + + event = {"id": "dup-1", "event": "message", "topic": "hermes-in", "message": "hi", "time": None} + _run(adapter._on_message(event)) + _run(adapter._on_message(event)) + assert len(calls) == 1 + + def test_timestamp_parsed_from_event(self): + from datetime import timezone + + adapter = self._make_adapter() + captured = [] + + async def handler(event): + captured.append(event) + + adapter.set_message_handler(handler) + + _run(adapter._on_message({ + "id": "ts-1", + "event": "message", + "topic": "hermes-in", + "message": "ping", + "time": 1700000000, + })) + ts = captured[0].timestamp + assert ts.tzinfo == timezone.utc + + def test_message_id_set_from_event(self): + adapter = self._make_adapter() + captured = [] + + async def handler(event): + captured.append(event) + + adapter.set_message_handler(handler) + _run(adapter._on_message({ + "id": "ntfy-id-42", + "event": "message", + "topic": "hermes-in", + "message": "test", + "time": None, + })) + assert captured[0].message_id == "ntfy-id-42" + + def test_title_not_used_as_user_id(self): + """title field must not be used for identity — it is publisher-controlled + and cannot be trusted as an authentication signal.""" + adapter = self._make_adapter() + captured = [] + + async def handler(event): + captured.append(event) + + adapter.set_message_handler(handler) + _run(adapter._on_message({ + "id": "u-1", + "event": "message", + "topic": "hermes-in", + "message": "hello", + "title": "Alice", + "time": None, + })) + # user_id must be the topic, never the spoofable title field + assert captured[0].source.user_id == "hermes-in" + assert captured[0].source.user_name == "hermes-in" + + def test_unknown_publisher_cannot_impersonate_allowed_user(self): + """An unknown publisher setting title to an allowed username must not + gain the identity of that user — identity is always the topic name.""" + adapter = self._make_adapter() + captured = [] + + async def handler(event): + captured.append(event) + + adapter.set_message_handler(handler) + _run(adapter._on_message({ + "id": "u-2", + "event": "message", + "topic": "hermes-in", + "message": "sensitive command", + "title": "admin", + "time": None, + })) + assert captured[0].source.user_id == "hermes-in" + assert captured[0].source.user_id != "admin" + + def test_source_chat_id_is_topic(self): + adapter = self._make_adapter() + captured = [] + + async def handler(event): + captured.append(event) + + adapter.set_message_handler(handler) + _run(adapter._on_message({ + "id": "s-1", + "event": "message", + "topic": "hermes-in", + "message": "hello", + "time": None, + })) + assert captured[0].source.chat_id == "hermes-in" + + +# --------------------------------------------------------------------------- +# Integration: send_message_tool platform_map (source-level checks) +# --------------------------------------------------------------------------- + + +class TestSendMessageToolIntegration: + + def test_ntfy_in_platform_enum(self): + assert hasattr(Platform, "NTFY") + assert Platform.NTFY.value == "ntfy" + + def test_ntfy_in_platform_map_source(self): + src = open("tools/send_message_tool.py").read() + assert "Platform.NTFY" in src + + def test_send_ntfy_function_in_source(self): + src = open("tools/send_message_tool.py").read() + assert "async def _send_ntfy" in src + + def test_ntfy_branch_in_send_to_platform_source(self): + src = open("tools/send_message_tool.py").read() + assert "Platform.NTFY" in src + assert "_send_ntfy" in src + + def test_send_ntfy_reads_server_from_extra(self): + src = open("tools/send_message_tool.py").read() + assert 'extra.get("server")' in src + assert "NTFY_SERVER_URL" in src + + def test_send_ntfy_reads_topic_from_extra(self): + src = open("tools/send_message_tool.py").read() + assert 'extra.get("topic")' in src + assert "NTFY_TOPIC" in src + + def test_send_ntfy_reads_token_from_extra(self): + src = open("tools/send_message_tool.py").read() + assert 'extra.get("token")' in src + assert "NTFY_TOKEN" in src + + +# --------------------------------------------------------------------------- +# Integration: cron scheduler platform_map +# --------------------------------------------------------------------------- + + +class TestCronSchedulerIntegration: + + def test_ntfy_in_scheduler_platform_map_source(self): + src = open("cron/scheduler.py").read() + # ntfy routing handled via Platform._missing_() dynamic dispatch + assert '"ntfy"' in src or "Platform._missing_" in src or "_missing_" in src + + def test_ntfy_in_cronjob_deliver_description(self): + src = open("cron/scheduler.py").read() + assert "ntfy" in src.lower() + + +# --------------------------------------------------------------------------- +# Integration: gateway/run.py authorization maps +# --------------------------------------------------------------------------- + + +class TestRunAuthorizationMaps: + + def test_ntfy_allowed_users_in_allowlist_check(self): + src = open("gateway/run.py").read() + assert "NTFY_ALLOWED_USERS" in src + + def test_ntfy_allow_all_users_in_allowlist_check(self): + src = open("gateway/run.py").read() + assert "NTFY_ALLOW_ALL_USERS" in src + + def test_ntfy_in_platform_env_map(self): + src = open("gateway/run.py").read() + assert 'Platform.NTFY: "NTFY_ALLOWED_USERS"' in src + + def test_ntfy_in_allow_all_map(self): + src = open("gateway/run.py").read() + assert 'Platform.NTFY: "NTFY_ALLOW_ALL_USERS"' in src + + def test_ntfy_create_adapter_branch(self): + src = open("gateway/run.py").read() + assert "Platform.NTFY" in src + assert "NtfyAdapter" in src + + def test_ntfy_startup_allowlist_includes_ntfy_allowed_users(self): + src = open("gateway/run.py").read() + # Verify both env vars appear in the startup check tuples + assert '"NTFY_ALLOWED_USERS"' in src + assert '"NTFY_ALLOW_ALL_USERS"' in src + + +# --------------------------------------------------------------------------- +# Integration: toolsets +# --------------------------------------------------------------------------- + + +class TestToolsets: + + def test_hermes_ntfy_toolset_exists(self): + from toolsets import get_toolset + + ts = get_toolset("hermes-ntfy") + assert ts is not None + assert "tools" in ts + + def test_hermes_ntfy_in_gateway_includes(self): + from toolsets import get_toolset + + gw = get_toolset("hermes-gateway") + assert "hermes-ntfy" in gw["includes"] + + def test_hermes_ntfy_resolves_tools(self): + from toolsets import resolve_toolset + + tools = resolve_toolset("hermes-ntfy") + assert len(tools) > 0 + + def test_hermes_ntfy_description_mentions_ntfy(self): + from toolsets import get_toolset + + ts = get_toolset("hermes-ntfy") + assert "ntfy" in ts["description"].lower() + + +# --------------------------------------------------------------------------- +# Integration: prompt_builder platform hints +# --------------------------------------------------------------------------- + + +class TestPromptBuilderHints: + + def test_ntfy_hint_exists(self): + from agent.prompt_builder import PLATFORM_HINTS + + assert "ntfy" in PLATFORM_HINTS + + def test_ntfy_hint_mentions_plain_text(self): + from agent.prompt_builder import PLATFORM_HINTS + + hint = PLATFORM_HINTS["ntfy"].lower() + assert "plain text" in hint + + def test_ntfy_hint_mentions_push_or_notifications(self): + from agent.prompt_builder import PLATFORM_HINTS + + hint = PLATFORM_HINTS["ntfy"].lower() + assert "push" in hint or "notification" in hint + + +# --------------------------------------------------------------------------- +# Integration: channel_directory +# --------------------------------------------------------------------------- + + +class TestChannelDirectory: + + def test_ntfy_in_session_based_platforms_source(self): + src = open("gateway/channel_directory.py").read() + assert '"ntfy"' in src + + def test_build_channel_directory_includes_ntfy_key(self): + src = open("gateway/channel_directory.py").read() + assert "ntfy" in src diff --git a/tools/send_message_tool.py b/tools/send_message_tool.py index 0f83e40c3c9..36f626752a8 100644 --- a/tools/send_message_tool.py +++ b/tools/send_message_tool.py @@ -777,6 +777,8 @@ async def _send_to_platform(platform, pconfig, chat_id, message, thread_id=None, result = await _send_bluebubbles(pconfig.extra, chat_id, chunk) elif platform == Platform.QQBOT: result = await _send_qqbot(pconfig, chat_id, chunk) + elif platform == Platform.NTFY: + result = await _send_ntfy(pconfig, chat_id, chunk) elif platform == Platform.YUANBAO: result = await _send_yuanbao(chat_id, chunk) else: @@ -1770,6 +1772,28 @@ async def _send_qqbot(pconfig, chat_id, message): return _error(f"QQBot send failed: {e}") +async def _send_ntfy(pconfig, chat_id, message): + """Send a message via ntfy HTTP POST.""" + try: + extra = pconfig.extra or {} + server = extra.get("server") or os.getenv("NTFY_SERVER_URL", "https://ntfy.sh").rstrip("/") + topic = chat_id or extra.get("topic") or os.getenv("NTFY_TOPIC", "") + token = extra.get("token") or os.getenv("NTFY_TOKEN", "") + if not topic: + return _error("ntfy topic not configured.") + import httpx + headers = {"Content-Type": "text/plain; charset=utf-8"} + if token: + headers["Authorization"] = f"Bearer {token}" + url = f"{server}/{topic}" + async with httpx.AsyncClient(timeout=15.0) as client: + resp = await client.post(url, content=message.encode("utf-8"), headers=headers) + resp.raise_for_status() + return {"success": True, "platform": "ntfy", "chat_id": topic} + except Exception as e: + return _error(f"ntfy send failed: {e}") + + async def _send_yuanbao(chat_id, message, media_files=None): """Send via Yuanbao using the running gateway adapter's WebSocket connection. diff --git a/toolsets.py b/toolsets.py index 5de07e4c7a1..9f5e3e13702 100644 --- a/toolsets.py +++ b/toolsets.py @@ -270,6 +270,11 @@ TOOLSETS = { "includes": [], }, + "ntfy": { + "description": "ntfy push notification toolset", + "tools": [], + "includes": ["hermes-ntfy"], + }, "yuanbao": { "description": "Yuanbao platform tools - group info, member queries, DM, stickers", "tools": [ @@ -515,6 +520,11 @@ TOOLSETS = { "includes": [] }, + "hermes-ntfy": { + "description": "ntfy push notification bot toolset", + "tools": _HERMES_CORE_TOOLS, + "includes": [] + }, "hermes-sms": { "description": "SMS bot toolset - interact with Hermes via SMS (Twilio)", "tools": _HERMES_CORE_TOOLS, @@ -530,7 +540,7 @@ TOOLSETS = { "hermes-gateway": { "description": "Gateway toolset - union of all messaging platform tools", "tools": [], - "includes": ["hermes-telegram", "hermes-discord", "hermes-whatsapp", "hermes-slack", "hermes-signal", "hermes-bluebubbles", "hermes-homeassistant", "hermes-email", "hermes-sms", "hermes-mattermost", "hermes-matrix", "hermes-dingtalk", "hermes-feishu", "hermes-wecom", "hermes-wecom-callback", "hermes-weixin", "hermes-qqbot", "hermes-webhook", "hermes-yuanbao"] + "includes": ["hermes-telegram", "hermes-discord", "hermes-whatsapp", "hermes-slack", "hermes-signal", "hermes-bluebubbles", "hermes-homeassistant", "hermes-email", "hermes-sms", "hermes-mattermost", "hermes-matrix", "hermes-dingtalk", "hermes-feishu", "hermes-wecom", "hermes-wecom-callback", "hermes-weixin", "hermes-qqbot", "hermes-webhook", "hermes-yuanbao", "hermes-ntfy"] } }