diff --git a/gateway/platforms/msgraph_webhook.py b/gateway/platforms/msgraph_webhook.py index 9c6feabad8..9b8e01f6a1 100644 --- a/gateway/platforms/msgraph_webhook.py +++ b/gateway/platforms/msgraph_webhook.py @@ -5,6 +5,7 @@ from __future__ import annotations import asyncio import json import logging +from collections import deque from hashlib import sha1 from typing import Any, Awaitable, Callable, Dict, Optional @@ -29,6 +30,7 @@ logger = logging.getLogger(__name__) DEFAULT_HOST = "0.0.0.0" DEFAULT_PORT = 8646 DEFAULT_WEBHOOK_PATH = "/msgraph/webhook" +DEFAULT_MAX_SEEN_RECEIPTS = 5000 NotificationScheduler = Callable[[Dict[str, Any], MessageEvent], Awaitable[None] | None] @@ -55,9 +57,13 @@ class MSGraphWebhookAdapter(BasePlatformAdapter): if str(value).strip() ] self._client_state: Optional[str] = self._string_or_none(extra.get("client_state")) + self._max_seen_receipts = max( + 1, int(extra.get("max_seen_receipts", DEFAULT_MAX_SEEN_RECEIPTS)) + ) self._runner = None self._notification_scheduler: Optional[NotificationScheduler] = None self._seen_receipts: set[str] = set() + self._seen_receipt_order: deque[str] = deque() self._accepted_count = 0 self._duplicate_count = 0 @@ -172,10 +178,10 @@ class MSGraphWebhookAdapter(BasePlatformAdapter): continue receipt_key = self._build_receipt_key(notification) - if receipt_key in self._seen_receipts: + if self._has_seen_receipt(receipt_key): duplicates += 1 continue - self._seen_receipts.add(receipt_key) + self._remember_receipt(receipt_key) accepted += 1 scheduled += 1 @@ -213,6 +219,16 @@ class MSGraphWebhookAdapter(BasePlatformAdapter): provided = self._string_or_none(notification.get("clientState")) return provided == expected + def _has_seen_receipt(self, receipt_key: str) -> bool: + return receipt_key in self._seen_receipts + + def _remember_receipt(self, receipt_key: str) -> None: + self._seen_receipts.add(receipt_key) + self._seen_receipt_order.append(receipt_key) + while len(self._seen_receipt_order) > self._max_seen_receipts: + oldest = self._seen_receipt_order.popleft() + self._seen_receipts.discard(oldest) + def _build_message_event( self, notification: Dict[str, Any], diff --git a/tests/gateway/test_msgraph_webhook.py b/tests/gateway/test_msgraph_webhook.py index a281603a3c..404156c45f 100644 --- a/tests/gateway/test_msgraph_webhook.py +++ b/tests/gateway/test_msgraph_webhook.py @@ -185,3 +185,41 @@ class TestMSGraphNotifications: await asyncio.sleep(0.05) assert len(scheduled) == 1 + + @pytest.mark.anyio + async def test_seen_receipts_are_bounded(self): + adapter = _make_adapter(max_seen_receipts=2) + + async def _capture(notification, event): + return None + + adapter.set_notification_scheduler(_capture) + + async def _post(notification_id: str): + payload = { + "value": [ + { + "id": notification_id, + "subscriptionId": "sub-1", + "changeType": "updated", + "resource": "communications/onlineMeetings/meeting-3", + "clientState": "expected-client-state", + } + ] + } + return await adapter._handle_notification(_FakeRequest(json_payload=payload)) + + first = await _post("notif-a") + second = await _post("notif-b") + third = await _post("notif-c") + + assert first.status == 202 + assert second.status == 202 + assert third.status == 202 + assert len(adapter._seen_receipts) == 2 + assert list(adapter._seen_receipt_order) == ["id:notif-b", "id:notif-c"] + + replay = await _post("notif-a") + replay_data = json.loads(replay.text) + assert replay_data["accepted"] == 1 + assert replay_data["duplicates"] == 0