From 46a6f3902462d7f813741eab56b0abf13c4a777b Mon Sep 17 00:00:00 2001 From: Dilee Date: Thu, 7 May 2026 16:30:38 +0300 Subject: [PATCH] feat(msgraph): add webhook listener platform --- gateway/config.py | 44 +++ gateway/platforms/msgraph_webhook.py | 283 ++++++++++++++++++ gateway/run.py | 10 + tests/gateway/test_msgraph_webhook.py | 187 ++++++++++++ .../test_platform_connected_checkers.py | 7 +- 5 files changed, 530 insertions(+), 1 deletion(-) create mode 100644 gateway/platforms/msgraph_webhook.py create mode 100644 tests/gateway/test_msgraph_webhook.py diff --git a/gateway/config.py b/gateway/config.py index 6df6b5f4a5..7813b16b4a 100644 --- a/gateway/config.py +++ b/gateway/config.py @@ -101,6 +101,7 @@ class Platform(Enum): DINGTALK = "dingtalk" API_SERVER = "api_server" WEBHOOK = "webhook" + MSGRAPH_WEBHOOK = "msgraph_webhook" FEISHU = "feishu" WECOM = "wecom" WECOM_CALLBACK = "wecom_callback" @@ -376,6 +377,7 @@ _PLATFORM_CONNECTED_CHECKERS: dict[Platform, Callable[[PlatformConfig], bool]] = Platform.SMS: lambda cfg: bool(os.getenv("TWILIO_ACCOUNT_SID")), Platform.API_SERVER: lambda cfg: True, Platform.WEBHOOK: lambda cfg: True, + Platform.MSGRAPH_WEBHOOK: lambda cfg: True, Platform.FEISHU: lambda cfg: bool(cfg.extra.get("app_id")), Platform.WECOM: lambda cfg: bool(cfg.extra.get("bot_id")), Platform.WECOM_CALLBACK: lambda cfg: bool( @@ -1407,6 +1409,48 @@ def _apply_env_overrides(config: GatewayConfig) -> None: if webhook_secret: config.platforms[Platform.WEBHOOK].extra["secret"] = webhook_secret + # Microsoft Graph webhook platform + msgraph_webhook_enabled = os.getenv("MSGRAPH_WEBHOOK_ENABLED", "").lower() in ( + "true", + "1", + "yes", + ) + msgraph_webhook_port = os.getenv("MSGRAPH_WEBHOOK_PORT") + msgraph_webhook_client_state = os.getenv("MSGRAPH_WEBHOOK_CLIENT_STATE", "") + msgraph_webhook_resources = os.getenv("MSGRAPH_WEBHOOK_ACCEPTED_RESOURCES", "") + if ( + msgraph_webhook_enabled + or Platform.MSGRAPH_WEBHOOK in config.platforms + or msgraph_webhook_port + or msgraph_webhook_client_state + or msgraph_webhook_resources + ): + if Platform.MSGRAPH_WEBHOOK not in config.platforms: + config.platforms[Platform.MSGRAPH_WEBHOOK] = PlatformConfig() + if msgraph_webhook_enabled: + config.platforms[Platform.MSGRAPH_WEBHOOK].enabled = True + if msgraph_webhook_port: + try: + config.platforms[Platform.MSGRAPH_WEBHOOK].extra["port"] = int( + msgraph_webhook_port + ) + except ValueError: + pass + if msgraph_webhook_client_state: + config.platforms[Platform.MSGRAPH_WEBHOOK].extra["client_state"] = ( + msgraph_webhook_client_state + ) + if msgraph_webhook_resources: + resources = [ + resource.strip() + for resource in msgraph_webhook_resources.split(",") + if resource.strip() + ] + if resources: + config.platforms[Platform.MSGRAPH_WEBHOOK].extra[ + "accepted_resources" + ] = resources + # DingTalk dingtalk_client_id = os.getenv("DINGTALK_CLIENT_ID") dingtalk_client_secret = os.getenv("DINGTALK_CLIENT_SECRET") diff --git a/gateway/platforms/msgraph_webhook.py b/gateway/platforms/msgraph_webhook.py new file mode 100644 index 0000000000..9c6feabad8 --- /dev/null +++ b/gateway/platforms/msgraph_webhook.py @@ -0,0 +1,283 @@ +"""Microsoft Graph webhook adapter for change-notification ingress.""" + +from __future__ import annotations + +import asyncio +import json +import logging +from hashlib import sha1 +from typing import Any, Awaitable, Callable, Dict, Optional + +try: + from aiohttp import web + + AIOHTTP_AVAILABLE = True +except ImportError: + AIOHTTP_AVAILABLE = False + web = None # type: ignore[assignment] + +from gateway.config import Platform, PlatformConfig +from gateway.platforms.base import ( + BasePlatformAdapter, + MessageEvent, + MessageType, + SendResult, +) + +logger = logging.getLogger(__name__) + +DEFAULT_HOST = "0.0.0.0" +DEFAULT_PORT = 8646 +DEFAULT_WEBHOOK_PATH = "/msgraph/webhook" +NotificationScheduler = Callable[[Dict[str, Any], MessageEvent], Awaitable[None] | None] + + +def check_msgraph_webhook_requirements() -> bool: + """Return whether required webhook dependencies are available.""" + return AIOHTTP_AVAILABLE + + +class MSGraphWebhookAdapter(BasePlatformAdapter): + """Receive Microsoft Graph change notifications and surface them internally.""" + + def __init__(self, config: PlatformConfig): + super().__init__(config, Platform.MSGRAPH_WEBHOOK) + extra = config.extra or {} + self._host: str = str(extra.get("host", DEFAULT_HOST)) + self._port: int = int(extra.get("port", DEFAULT_PORT)) + self._webhook_path: str = self._normalize_path( + extra.get("webhook_path", DEFAULT_WEBHOOK_PATH) + ) + self._health_path: str = self._normalize_path(extra.get("health_path", "/health")) + self._accepted_resources: list[str] = [ + str(value).strip() + for value in (extra.get("accepted_resources") or []) + if str(value).strip() + ] + self._client_state: Optional[str] = self._string_or_none(extra.get("client_state")) + self._runner = None + self._notification_scheduler: Optional[NotificationScheduler] = None + self._seen_receipts: set[str] = set() + self._accepted_count = 0 + self._duplicate_count = 0 + + @staticmethod + def _string_or_none(value: Any) -> Optional[str]: + if value is None: + return None + text = str(value).strip() + return text or None + + @staticmethod + def _normalize_path(path: Any) -> str: + raw = str(path or "").strip() or "/" + return raw if raw.startswith("/") else f"/{raw}" + + @staticmethod + def _build_receipt_key(notification: Dict[str, Any]) -> str: + explicit_id = str(notification.get("id") or "").strip() + if explicit_id: + return f"id:{explicit_id}" + payload = "|".join( + [ + str(notification.get("subscriptionId") or ""), + str(notification.get("changeType") or ""), + str(notification.get("resource") or ""), + json.dumps(notification.get("resourceData") or {}, sort_keys=True), + ] + ) + return f"sha1:{sha1(payload.encode('utf-8')).hexdigest()}" + + def set_notification_scheduler(self, scheduler: Optional[NotificationScheduler]) -> None: + self._notification_scheduler = scheduler + + async def connect(self) -> bool: + app = web.Application() + app.router.add_get(self._health_path, self._handle_health) + app.router.add_get(self._webhook_path, self._handle_notification) + app.router.add_post(self._webhook_path, self._handle_notification) + + self._runner = web.AppRunner(app) + await self._runner.setup() + site = web.TCPSite(self._runner, self._host, self._port) + await site.start() + self._mark_connected() + logger.info( + "[msgraph_webhook] Listening on %s:%d%s", + self._host, + self._port, + self._webhook_path, + ) + return True + + async def disconnect(self) -> None: + if self._runner is not None: + await self._runner.cleanup() + self._runner = None + self._mark_disconnected() + + async def send( + self, + chat_id: str, + content: str, + reply_to: Optional[str] = None, + metadata: Optional[Dict[str, Any]] = None, + ) -> SendResult: + logger.info("[msgraph_webhook] Response for %s: %s", chat_id, content[:200]) + return SendResult(success=True) + + async def get_chat_info(self, chat_id: str) -> Dict[str, Any]: + return {"name": chat_id, "type": "webhook"} + + async def _handle_health(self, request: "web.Request") -> "web.Response": + return web.json_response( + { + "status": "ok", + "platform": self.platform.value, + "webhook_path": self._webhook_path, + "accepted": self._accepted_count, + "duplicates": self._duplicate_count, + } + ) + + async def _handle_notification(self, request: "web.Request") -> "web.Response": + validation_token = request.query.get("validationToken", "") + if validation_token: + return web.Response(text=validation_token, content_type="text/plain") + + try: + body = await request.json() + except Exception: + return web.json_response({"error": "Invalid JSON body"}, status=400) + + notifications = body.get("value") + if not isinstance(notifications, list): + return web.json_response({"error": "Missing notification batch"}, status=400) + + accepted = 0 + duplicates = 0 + rejected = 0 + scheduled = 0 + + for raw_notification in notifications: + if not isinstance(raw_notification, dict): + rejected += 1 + continue + notification = dict(raw_notification) + if not self._resource_accepted(str(notification.get("resource") or "")): + rejected += 1 + continue + if not self._verify_client_state(notification): + rejected += 1 + continue + + receipt_key = self._build_receipt_key(notification) + if receipt_key in self._seen_receipts: + duplicates += 1 + continue + self._seen_receipts.add(receipt_key) + + accepted += 1 + scheduled += 1 + self._accepted_count += 1 + event = self._build_message_event(notification, receipt_key) + self._schedule_notification(notification, event) + + self._duplicate_count += duplicates + status = 202 if accepted or duplicates else 403 + return web.json_response( + { + "status": "accepted" if accepted or duplicates else "rejected", + "accepted": accepted, + "duplicates": duplicates, + "rejected": rejected, + "scheduled": scheduled, + }, + status=status, + ) + + def _resource_accepted(self, resource: str) -> bool: + if not self._accepted_resources: + return True + for pattern in self._accepted_resources: + if pattern.endswith("*") and resource.startswith(pattern[:-1]): + return True + if resource == pattern or resource.startswith(f"{pattern}/"): + return True + return False + + def _verify_client_state(self, notification: Dict[str, Any]) -> bool: + expected = self._client_state + if expected is None: + return True + provided = self._string_or_none(notification.get("clientState")) + return provided == expected + + def _build_message_event( + self, + notification: Dict[str, Any], + receipt_key: str, + ) -> MessageEvent: + source = self.build_source( + chat_id=f"msgraph:{notification.get('subscriptionId', 'unknown')}", + chat_name="msgraph/webhook", + chat_type="webhook", + user_id="msgraph", + user_name="Microsoft Graph", + ) + return MessageEvent( + text=self._render_prompt(notification), + message_type=MessageType.TEXT, + source=source, + raw_message=notification, + message_id=receipt_key, + internal=True, + ) + + def _render_prompt(self, notification: Dict[str, Any]) -> str: + template = self.config.extra.get("prompt", "") + if template: + payload = { + "notification": notification, + "resource": notification.get("resource", ""), + "change_type": notification.get("changeType", ""), + "subscription_id": notification.get("subscriptionId", ""), + } + return self._render_template(template, payload) + rendered = json.dumps(notification, indent=2, sort_keys=True)[:4000] + return f"Microsoft Graph change notification:\n\n```json\n{rendered}\n```" + + def _render_template(self, template: str, payload: Dict[str, Any]) -> str: + import re + + def _resolve(match: "re.Match[str]") -> str: + key = match.group(1) + value: Any = payload + for part in key.split("."): + if isinstance(value, dict): + value = value.get(part, f"{{{key}}}") + else: + return f"{{{key}}}" + if isinstance(value, (dict, list)): + return json.dumps(value, sort_keys=True)[:2000] + return str(value) + + return re.sub(r"\{([a-zA-Z0-9_.]+)\}", _resolve, template) + + def _schedule_notification( + self, + notification: Dict[str, Any], + event: MessageEvent, + ) -> None: + scheduler = self._notification_scheduler + if scheduler is not None: + result = scheduler(notification, event) + if asyncio.iscoroutine(result): + task = asyncio.create_task(result) + self._background_tasks.add(task) + task.add_done_callback(self._background_tasks.discard) + return + + task = asyncio.create_task(self.handle_message(event)) + self._background_tasks.add(task) + task.add_done_callback(self._background_tasks.discard) diff --git a/gateway/run.py b/gateway/run.py index 321f9b5ad1..53482a7a74 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -4600,6 +4600,16 @@ class GatewayRunner: adapter.gateway_runner = self # For cross-platform delivery return adapter + elif platform == Platform.MSGRAPH_WEBHOOK: + from gateway.platforms.msgraph_webhook import ( + MSGraphWebhookAdapter, + check_msgraph_webhook_requirements, + ) + if not check_msgraph_webhook_requirements(): + logger.warning("MSGraph webhook: aiohttp not installed") + return None + return MSGraphWebhookAdapter(config) + elif platform == Platform.BLUEBUBBLES: from gateway.platforms.bluebubbles import BlueBubblesAdapter, check_bluebubbles_requirements if not check_bluebubbles_requirements(): diff --git a/tests/gateway/test_msgraph_webhook.py b/tests/gateway/test_msgraph_webhook.py new file mode 100644 index 0000000000..a281603a3c --- /dev/null +++ b/tests/gateway/test_msgraph_webhook.py @@ -0,0 +1,187 @@ +"""Tests for the Microsoft Graph webhook adapter.""" + +import asyncio +import json + +import pytest + +from gateway.config import GatewayConfig, Platform, PlatformConfig, _apply_env_overrides +from gateway.platforms.msgraph_webhook import MSGraphWebhookAdapter + + +def _make_adapter(**extra_overrides) -> MSGraphWebhookAdapter: + extra = { + "client_state": "expected-client-state", + "accepted_resources": ["communications/onlineMeetings"], + } + extra.update(extra_overrides) + return MSGraphWebhookAdapter(PlatformConfig(enabled=True, extra=extra)) + + +class _FakeRequest: + def __init__(self, *, query=None, json_payload=None): + self.query = query or {} + self._json_payload = json_payload + + async def json(self): + if isinstance(self._json_payload, Exception): + raise self._json_payload + return self._json_payload + + +class TestMSGraphWebhookConfig: + def test_gateway_config_accepts_msgraph_webhook_platform(self): + config = GatewayConfig.from_dict( + { + "platforms": { + "msgraph_webhook": { + "enabled": True, + "extra": {"client_state": "expected"}, + } + } + } + ) + + assert Platform.MSGRAPH_WEBHOOK in config.platforms + assert Platform.MSGRAPH_WEBHOOK in config.get_connected_platforms() + + def test_env_overrides_apply_to_existing_msgraph_webhook_platform(self, monkeypatch): + config = GatewayConfig( + platforms={Platform.MSGRAPH_WEBHOOK: PlatformConfig(enabled=True, extra={})} + ) + + monkeypatch.setenv("MSGRAPH_WEBHOOK_PORT", "8650") + monkeypatch.setenv("MSGRAPH_WEBHOOK_CLIENT_STATE", "env-state") + monkeypatch.setenv( + "MSGRAPH_WEBHOOK_ACCEPTED_RESOURCES", + "communications/onlineMeetings, chats/getAllMessages", + ) + + _apply_env_overrides(config) + + extra = config.platforms[Platform.MSGRAPH_WEBHOOK].extra + assert extra["port"] == 8650 + assert extra["client_state"] == "env-state" + assert extra["accepted_resources"] == [ + "communications/onlineMeetings", + "chats/getAllMessages", + ] + + +class TestMSGraphValidationHandshake: + @pytest.mark.anyio + async def test_validation_token_echo(self): + adapter = _make_adapter() + resp = await adapter._handle_notification( + _FakeRequest(query={"validationToken": "abc123"}) + ) + assert resp.status == 200 + assert resp.text == "abc123" + assert resp.content_type == "text/plain" + + +class TestMSGraphNotifications: + @pytest.mark.anyio + async def test_valid_notification_accepted_and_scheduled(self): + adapter = _make_adapter() + scheduled: list[tuple[dict, object]] = [] + + async def _capture(notification, event): + scheduled.append((notification, event)) + + adapter.set_notification_scheduler(_capture) + payload = { + "value": [ + { + "id": "notif-1", + "subscriptionId": "sub-1", + "changeType": "updated", + "resource": "communications/onlineMeetings/meeting-1", + "clientState": "expected-client-state", + "resourceData": {"id": "meeting-1"}, + } + ] + } + + resp = await adapter._handle_notification(_FakeRequest(json_payload=payload)) + assert resp.status == 202 + data = json.loads(resp.text) + assert data["accepted"] == 1 + assert data["duplicates"] == 0 + assert data["rejected"] == 0 + assert data["scheduled"] == 1 + + await asyncio.sleep(0.05) + + assert len(scheduled) == 1 + notification, event = scheduled[0] + assert notification["id"] == "notif-1" + assert event.source.platform == Platform.MSGRAPH_WEBHOOK + assert event.source.chat_type == "webhook" + assert event.message_id == "id:notif-1" + + @pytest.mark.anyio + async def test_bad_client_state_rejected(self): + adapter = _make_adapter() + scheduled: list[tuple[dict, object]] = [] + + async def _capture(notification, event): + scheduled.append((notification, event)) + + adapter.set_notification_scheduler(_capture) + payload = { + "value": [ + { + "id": "notif-2", + "subscriptionId": "sub-1", + "changeType": "updated", + "resource": "communications/onlineMeetings/meeting-2", + "clientState": "wrong-state", + } + ] + } + + resp = await adapter._handle_notification(_FakeRequest(json_payload=payload)) + assert resp.status == 403 + data = json.loads(resp.text) + assert data["accepted"] == 0 + assert data["duplicates"] == 0 + assert data["rejected"] == 1 + + await asyncio.sleep(0.05) + + assert scheduled == [] + + @pytest.mark.anyio + async def test_duplicate_notification_deduped(self): + adapter = _make_adapter() + scheduled: list[tuple[dict, object]] = [] + + async def _capture(notification, event): + scheduled.append((notification, event)) + + adapter.set_notification_scheduler(_capture) + payload = { + "value": [ + { + "id": "notif-dup", + "subscriptionId": "sub-1", + "changeType": "updated", + "resource": "communications/onlineMeetings/meeting-3", + "clientState": "expected-client-state", + } + ] + } + + first = await adapter._handle_notification(_FakeRequest(json_payload=payload)) + assert first.status == 202 + second = await adapter._handle_notification(_FakeRequest(json_payload=payload)) + assert second.status == 202 + second_data = json.loads(second.text) + assert second_data["accepted"] == 0 + assert second_data["duplicates"] == 1 + assert second_data["scheduled"] == 0 + + await asyncio.sleep(0.05) + + assert len(scheduled) == 1 diff --git a/tests/gateway/test_platform_connected_checkers.py b/tests/gateway/test_platform_connected_checkers.py index ba16ac4954..307c79b308 100644 --- a/tests/gateway/test_platform_connected_checkers.py +++ b/tests/gateway/test_platform_connected_checkers.py @@ -76,7 +76,12 @@ def test_checker_returns_true_when_configured(platform, checker, monkeypatch): elif platform == Platform.SMS: monkeypatch.setenv("TWILIO_ACCOUNT_SID", "ACtest") mock_config.extra = {} - elif platform in (Platform.API_SERVER, Platform.WEBHOOK, Platform.WHATSAPP): + elif platform in ( + Platform.API_SERVER, + Platform.WEBHOOK, + Platform.MSGRAPH_WEBHOOK, + Platform.WHATSAPP, + ): mock_config.extra = {} elif platform == Platform.FEISHU: mock_config.extra = {"app_id": "app"}