mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-10 03:22:05 +00:00
fix(msgraph): normalize webhook dedupe and resource matching
This commit is contained in:
parent
2a215de9af
commit
26a59e4f6c
2 changed files with 84 additions and 19 deletions
|
|
@ -80,19 +80,15 @@ class MSGraphWebhookAdapter(BasePlatformAdapter):
|
|||
return raw if raw.startswith("/") else f"/{raw}"
|
||||
|
||||
@staticmethod
|
||||
def _build_receipt_key(notification: Dict[str, Any]) -> str:
|
||||
def _build_receipt_key(notification: Dict[str, Any]) -> Optional[str]:
|
||||
explicit_id = str(notification.get("id") or "").strip()
|
||||
if explicit_id:
|
||||
return f"id:{explicit_id}"
|
||||
payload = "|".join(
|
||||
[
|
||||
str(notification.get("subscriptionId") or ""),
|
||||
str(notification.get("changeType") or ""),
|
||||
str(notification.get("resource") or ""),
|
||||
json.dumps(notification.get("resourceData") or {}, sort_keys=True),
|
||||
]
|
||||
)
|
||||
return f"sha1:{sha1(payload.encode('utf-8')).hexdigest()}"
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def _normalize_resource_value(resource: str) -> str:
|
||||
return str(resource or "").strip().strip("/")
|
||||
|
||||
def set_notification_scheduler(self, scheduler: Optional[NotificationScheduler]) -> None:
|
||||
self._notification_scheduler = scheduler
|
||||
|
|
@ -178,10 +174,11 @@ class MSGraphWebhookAdapter(BasePlatformAdapter):
|
|||
continue
|
||||
|
||||
receipt_key = self._build_receipt_key(notification)
|
||||
if self._has_seen_receipt(receipt_key):
|
||||
duplicates += 1
|
||||
continue
|
||||
self._remember_receipt(receipt_key)
|
||||
if receipt_key is not None:
|
||||
if self._has_seen_receipt(receipt_key):
|
||||
duplicates += 1
|
||||
continue
|
||||
self._remember_receipt(receipt_key)
|
||||
|
||||
accepted += 1
|
||||
scheduled += 1
|
||||
|
|
@ -205,10 +202,20 @@ class MSGraphWebhookAdapter(BasePlatformAdapter):
|
|||
def _resource_accepted(self, resource: str) -> bool:
|
||||
if not self._accepted_resources:
|
||||
return True
|
||||
normalized_resource = self._normalize_resource_value(resource)
|
||||
for pattern in self._accepted_resources:
|
||||
if pattern.endswith("*") and resource.startswith(pattern[:-1]):
|
||||
return True
|
||||
if resource == pattern or resource.startswith(f"{pattern}/"):
|
||||
normalized_pattern = self._normalize_resource_value(pattern)
|
||||
if not normalized_pattern:
|
||||
continue
|
||||
if normalized_pattern.endswith("*"):
|
||||
prefix = normalized_pattern[:-1].rstrip("/")
|
||||
if normalized_resource == prefix or normalized_resource.startswith(f"{prefix}/"):
|
||||
return True
|
||||
continue
|
||||
if (
|
||||
normalized_resource == normalized_pattern
|
||||
or normalized_resource.startswith(f"{normalized_pattern}/")
|
||||
):
|
||||
return True
|
||||
return False
|
||||
|
||||
|
|
@ -232,8 +239,9 @@ class MSGraphWebhookAdapter(BasePlatformAdapter):
|
|||
def _build_message_event(
|
||||
self,
|
||||
notification: Dict[str, Any],
|
||||
receipt_key: str,
|
||||
receipt_key: Optional[str],
|
||||
) -> MessageEvent:
|
||||
message_id = receipt_key or f"sha1:{sha1(json.dumps(notification, sort_keys=True).encode('utf-8')).hexdigest()}"
|
||||
source = self.build_source(
|
||||
chat_id=f"msgraph:{notification.get('subscriptionId', 'unknown')}",
|
||||
chat_name="msgraph/webhook",
|
||||
|
|
@ -246,7 +254,7 @@ class MSGraphWebhookAdapter(BasePlatformAdapter):
|
|||
message_type=MessageType.TEXT,
|
||||
source=source,
|
||||
raw_message=notification,
|
||||
message_id=receipt_key,
|
||||
message_id=message_id,
|
||||
internal=True,
|
||||
)
|
||||
|
||||
|
|
|
|||
|
|
@ -186,6 +186,63 @@ class TestMSGraphNotifications:
|
|||
|
||||
assert len(scheduled) == 1
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_notifications_without_id_are_not_deduped(self):
|
||||
adapter = _make_adapter()
|
||||
scheduled: list[tuple[dict, object]] = []
|
||||
|
||||
async def _capture(notification, event):
|
||||
scheduled.append((notification, event))
|
||||
|
||||
adapter.set_notification_scheduler(_capture)
|
||||
payload = {
|
||||
"value": [
|
||||
{
|
||||
"subscriptionId": "sub-1",
|
||||
"changeType": "updated",
|
||||
"resource": "communications/onlineMeetings/meeting-3",
|
||||
"clientState": "expected-client-state",
|
||||
"resourceData": {"id": "meeting-3"},
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
first = await adapter._handle_notification(_FakeRequest(json_payload=payload))
|
||||
second = await adapter._handle_notification(_FakeRequest(json_payload=payload))
|
||||
|
||||
assert first.status == 202
|
||||
assert second.status == 202
|
||||
second_data = json.loads(second.text)
|
||||
assert second_data["accepted"] == 1
|
||||
assert second_data["duplicates"] == 0
|
||||
assert second_data["scheduled"] == 1
|
||||
|
||||
await asyncio.sleep(0.05)
|
||||
|
||||
assert len(scheduled) == 2
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_resource_patterns_accept_leading_slash(self):
|
||||
adapter = _make_adapter(accepted_resources=["/communications/onlineMeetings"])
|
||||
payload = {
|
||||
"value": [
|
||||
{
|
||||
"id": "notif-slash",
|
||||
"subscriptionId": "sub-1",
|
||||
"changeType": "updated",
|
||||
"resource": "communications/onlineMeetings/meeting-4",
|
||||
"clientState": "expected-client-state",
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
resp = await adapter._handle_notification(_FakeRequest(json_payload=payload))
|
||||
data = json.loads(resp.text)
|
||||
|
||||
assert resp.status == 202
|
||||
assert data["accepted"] == 1
|
||||
assert data["rejected"] == 0
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_seen_receipts_are_bounded(self):
|
||||
adapter = _make_adapter(max_seen_receipts=2)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue