From 397f750bb402f1807c0e7c732ff7637b1f2d52e8 Mon Sep 17 00:00:00 2001 From: Dilee Date: Thu, 7 May 2026 17:18:51 +0300 Subject: [PATCH] feat(teams): add pipeline outbound delivery via existing adapter --- plugins/platforms/teams/adapter.py | 239 ++++++++++++++++++++ plugins/teams_pipeline/pipeline.py | 5 +- tests/gateway/test_teams.py | 107 +++++++++ tests/plugins/test_teams_pipeline_plugin.py | 28 +++ 4 files changed, 378 insertions(+), 1 deletion(-) diff --git a/plugins/platforms/teams/adapter.py b/plugins/platforms/teams/adapter.py index 7e17a7c2be..32e01413c0 100644 --- a/plugins/platforms/teams/adapter.py +++ b/plugins/platforms/teams/adapter.py @@ -23,10 +23,14 @@ Configuration in config.yaml: from __future__ import annotations import asyncio +import html import json import logging import os from typing import Any, Dict, Optional +from urllib.parse import quote + +import httpx try: from aiohttp import web @@ -93,6 +97,241 @@ _DEFAULT_PORT = 3978 _WEBHOOK_PATH = "/api/messages" +def _parse_bool(value: Any, *, default: bool = False) -> bool: + if isinstance(value, bool): + return value + if isinstance(value, str): + normalized = value.strip().lower() + if normalized in {"1", "true", "yes", "on"}: + return True + if normalized in {"0", "false", "no", "off"}: + return False + return default + + +class _StaticAccessTokenProvider: + """Minimal token-provider shim so outbound Graph delivery can reuse the shared client.""" + + def __init__(self, access_token: str): + self._access_token = str(access_token or "").strip() + + async def get_access_token(self, *, force_refresh: bool = False) -> str: + del force_refresh + if not self._access_token: + raise ValueError("TEAMS_GRAPH_ACCESS_TOKEN is required for graph delivery mode.") + return self._access_token + + def clear_cache(self) -> None: + return None + + +class TeamsSummaryWriter: + """Pipeline-facing Teams outbound delivery surface. + + This stays inside the existing Teams platform plugin so the meeting-pipeline + PR can reuse one Teams integration surface instead of introducing a second + adapter elsewhere in the gateway core. + """ + + def __init__( + self, + platform_config: PlatformConfig | None = None, + *, + graph_client: Any | None = None, + transport: httpx.AsyncBaseTransport | None = None, + ) -> None: + self._platform_config = platform_config + self._graph_client = graph_client + self._transport = transport + + async def write_summary( + self, + payload: Any, + config: dict[str, Any] | None, + existing_record: Optional[dict[str, Any]] = None, + ) -> dict[str, Any]: + merged = self._resolve_delivery_config(config) + if existing_record and not _parse_bool(merged.get("force_resend"), default=False): + return dict(existing_record) + + mode = str(merged.get("delivery_mode") or merged.get("mode") or "").strip().lower() + if not mode: + if merged.get("incoming_webhook_url"): + mode = "incoming_webhook" + elif merged.get("chat_id") or ( + merged.get("team_id") and (merged.get("channel_id") or merged.get("chat_id")) + ): + mode = "graph" + if mode == "incoming_webhook": + return await self._write_summary_via_incoming_webhook(payload, merged) + if mode == "graph": + return await self._write_summary_via_graph(payload, merged) + raise ValueError( + "Teams delivery_mode must be 'incoming_webhook' or 'graph'." + ) + + def _resolve_delivery_config(self, config: dict[str, Any] | None) -> dict[str, Any]: + merged: dict[str, Any] = {} + platform_cfg = self._platform_config + if platform_cfg is not None: + merged.update(dict(platform_cfg.extra or {})) + if platform_cfg.token and "access_token" not in merged: + merged["access_token"] = platform_cfg.token + if platform_cfg.home_channel: + merged.setdefault("channel_id", platform_cfg.home_channel.chat_id) + merged.update(dict(config or {})) + + env_defaults = { + "delivery_mode": os.getenv("TEAMS_DELIVERY_MODE", ""), + "incoming_webhook_url": os.getenv("TEAMS_INCOMING_WEBHOOK_URL", ""), + "access_token": os.getenv("TEAMS_GRAPH_ACCESS_TOKEN", ""), + "team_id": os.getenv("TEAMS_TEAM_ID", ""), + "channel_id": os.getenv("TEAMS_CHANNEL_ID", ""), + "chat_id": os.getenv("TEAMS_CHAT_ID", ""), + } + for key, value in env_defaults.items(): + if value and not merged.get(key): + merged[key] = value + return merged + + async def _write_summary_via_incoming_webhook( + self, + payload: Any, + config: dict[str, Any], + ) -> dict[str, Any]: + webhook_url = str(config.get("incoming_webhook_url") or "").strip() + if not webhook_url: + raise ValueError("TEAMS_INCOMING_WEBHOOK_URL is required for incoming_webhook mode.") + body = {"text": self._render_summary_markdown(payload)} + async with httpx.AsyncClient(timeout=20.0, transport=self._transport) as client: + response = await client.post(webhook_url, json=body) + response.raise_for_status() + return { + "delivery_mode": "incoming_webhook", + "webhook_url": webhook_url, + "status_code": response.status_code, + "delivered": True, + } + + async def _write_summary_via_graph( + self, + payload: Any, + config: dict[str, Any], + ) -> dict[str, Any]: + graph_client = self._build_graph_client(config) + chat_id = str(config.get("chat_id") or "").strip() + if chat_id: + path = f"/chats/{quote(chat_id, safe='')}/messages" + response = await graph_client.post_json( + path, + json_body={"body": {"contentType": "html", "content": self._render_summary_html(payload)}}, + ) + return { + "delivery_mode": "graph", + "target_type": "chat", + "chat_id": chat_id, + "message_id": (response or {}).get("id"), + "web_url": (response or {}).get("webUrl"), + } + + team_id = str(config.get("team_id") or "").strip() + channel_id = str(config.get("channel_id") or "").strip() + if not team_id or not channel_id: + raise ValueError( + "Graph delivery mode requires chat_id, or both team_id and channel_id." + ) + path = ( + f"/teams/{quote(team_id, safe='')}/channels/" + f"{quote(channel_id, safe='')}/messages" + ) + response = await graph_client.post_json( + path, + json_body={"body": {"contentType": "html", "content": self._render_summary_html(payload)}}, + ) + return { + "delivery_mode": "graph", + "target_type": "channel", + "team_id": team_id, + "channel_id": channel_id, + "message_id": (response or {}).get("id"), + "web_url": (response or {}).get("webUrl"), + } + + def _build_graph_client(self, config: dict[str, Any]) -> Any: + if self._graph_client is not None: + return self._graph_client + + from tools.microsoft_graph_auth import MicrosoftGraphTokenProvider + from tools.microsoft_graph_client import MicrosoftGraphClient + + access_token = str(config.get("access_token") or "").strip() + if access_token: + return MicrosoftGraphClient( + _StaticAccessTokenProvider(access_token), + transport=self._transport, + ) + return MicrosoftGraphClient( + MicrosoftGraphTokenProvider.from_env(), + transport=self._transport, + ) + + def _render_summary_markdown(self, payload: Any) -> str: + lines = [ + f"**{self._title(payload)}**", + "", + f"Summary: {self._text(getattr(payload, 'summary', None), 'No summary available.')}", + "", + "Key decisions:", + *self._bullet_lines(getattr(payload, "key_decisions", None)), + "", + "Action items:", + *self._bullet_lines(getattr(payload, "action_items", None)), + "", + "Risks:", + *self._bullet_lines(getattr(payload, "risks", None)), + ] + return "\n".join(lines) + + def _render_summary_html(self, payload: Any) -> str: + sections = [ + ("Summary", [self._text(getattr(payload, "summary", None), "No summary available.")]), + ("Key decisions", list(getattr(payload, "key_decisions", None) or [])), + ("Action items", list(getattr(payload, "action_items", None) or [])), + ("Risks", list(getattr(payload, "risks", None) or [])), + ] + blocks = [f"

{html.escape(self._title(payload))}

"] + for heading, items in sections: + blocks.append(f"

{html.escape(heading)}

") + if len(items) == 1 and heading == "Summary": + blocks.append(f"

{html.escape(str(items[0]))}

") + continue + if items: + rendered = "".join(f"
  • {html.escape(str(item))}
  • " for item in items if str(item).strip()) + blocks.append(rendered and f"" or "

    None

    ") + else: + blocks.append("

    None

    ") + return "".join(blocks) + + @staticmethod + def _title(payload: Any) -> str: + title = getattr(payload, "title", None) + if title: + return str(title) + meeting_ref = getattr(payload, "meeting_ref", None) + meeting_id = getattr(meeting_ref, "meeting_id", None) if meeting_ref else None + return f"Meeting {meeting_id or 'summary'}" + + @staticmethod + def _text(value: Any, default: str) -> str: + text = str(value or "").strip() + return text or default + + @classmethod + def _bullet_lines(cls, values: Any) -> list[str]: + items = [str(item).strip() for item in (values or []) if str(item).strip()] + return [f"- {item}" for item in items] or ["- None"] + + class _AiohttpBridgeAdapter: """HttpServerAdapter that bridges the Teams SDK into an aiohttp server. diff --git a/plugins/teams_pipeline/pipeline.py b/plugins/teams_pipeline/pipeline.py index 21958de21c..d1d1616486 100644 --- a/plugins/teams_pipeline/pipeline.py +++ b/plugins/teams_pipeline/pipeline.py @@ -550,7 +550,10 @@ class TeamsMeetingPipeline: confidence_notes=parsed.get("confidence_notes"), notion_target=(self.config.notion or {}).get("database_id"), linear_target=(self.config.linear or {}).get("team_id"), - teams_target=(self.config.teams_delivery or {}).get("channel_id"), + teams_target=( + (self.config.teams_delivery or {}).get("channel_id") + or (self.config.teams_delivery or {}).get("chat_id") + ), ) async def _write_sinks(self, job: TeamsMeetingPipelineJob, payload: TeamsMeetingSummaryPayload) -> None: diff --git a/tests/gateway/test_teams.py b/tests/gateway/test_teams.py index 0e1e05bd1b..bd6add2107 100644 --- a/tests/gateway/test_teams.py +++ b/tests/gateway/test_teams.py @@ -1,15 +1,19 @@ """Tests for the Microsoft Teams platform adapter plugin.""" import asyncio +import json import os import sys import types from pathlib import Path +from types import SimpleNamespace from unittest.mock import AsyncMock, MagicMock, patch +import httpx import pytest from gateway.config import Platform, PlatformConfig, HomeChannel +from plugins.teams_pipeline.models import TeamsMeetingRef, TeamsMeetingSummaryPayload from tests.gateway._plugin_adapter_loader import load_plugin_adapter @@ -177,6 +181,7 @@ if _mt and _teams_mod.TypingActivityInput is None: _teams_mod.TypingActivityInput = _mt.TypingActivityInput TeamsAdapter = _teams_mod.TeamsAdapter +TeamsSummaryWriter = _teams_mod.TeamsSummaryWriter check_requirements = _teams_mod.check_requirements check_teams_requirements = _teams_mod.check_teams_requirements validate_config = _teams_mod.validate_config @@ -449,6 +454,108 @@ class TestTeamsSend: assert call_args[0][0] == "conv-id" +def _make_summary_payload(): + return TeamsMeetingSummaryPayload( + meeting_ref=TeamsMeetingRef(meeting_id="meeting-123"), + title="Weekly Sync", + summary="Discussed launch readiness.", + key_decisions=["Proceed with staged rollout."], + action_items=["Send launch checklist."], + risks=["QA sign-off still pending."], + ) + + +class TestTeamsSummaryWriter: + @pytest.mark.anyio + async def test_incoming_webhook_posts_summary_text(self): + seen = {} + + def _handler(request: httpx.Request) -> httpx.Response: + seen["url"] = str(request.url) + seen["body"] = json.loads(request.content.decode("utf-8")) + return httpx.Response(200, json={"ok": True}) + + writer = TeamsSummaryWriter(transport=httpx.MockTransport(_handler)) + payload = _make_summary_payload() + + result = await writer.write_summary( + payload, + { + "delivery_mode": "incoming_webhook", + "incoming_webhook_url": "https://example.test/teams-webhook", + }, + ) + + assert result["delivery_mode"] == "incoming_webhook" + assert seen["url"] == "https://example.test/teams-webhook" + assert "Weekly Sync" in seen["body"]["text"] + assert "Proceed with staged rollout." in seen["body"]["text"] + + @pytest.mark.anyio + async def test_graph_delivery_posts_to_channel(self): + graph_client = SimpleNamespace( + post_json=AsyncMock(return_value={"id": "msg-123", "webUrl": "https://teams.example/messages/123"}) + ) + writer = TeamsSummaryWriter(graph_client=graph_client) + payload = _make_summary_payload() + + result = await writer.write_summary( + payload, + { + "delivery_mode": "graph", + "team_id": "team-1", + "channel_id": "channel-1", + }, + ) + + assert result["target_type"] == "channel" + assert result["message_id"] == "msg-123" + graph_client.post_json.assert_awaited_once() + path = graph_client.post_json.await_args.args[0] + body = graph_client.post_json.await_args.kwargs["json_body"] + assert path == "/teams/team-1/channels/channel-1/messages" + assert body["body"]["contentType"] == "html" + assert "Weekly Sync" in body["body"]["content"] + + @pytest.mark.anyio + async def test_graph_delivery_falls_back_to_platform_home_channel(self): + graph_client = SimpleNamespace(post_json=AsyncMock(return_value={"id": "msg-home"})) + platform_config = PlatformConfig( + enabled=True, + extra={"team_id": "team-home", "delivery_mode": "graph"}, + home_channel=HomeChannel( + platform=Platform("teams"), + chat_id="channel-home", + name="Teams Home", + ), + ) + writer = TeamsSummaryWriter(platform_config=platform_config, graph_client=graph_client) + + await writer.write_summary(_make_summary_payload(), {}) + + graph_client.post_json.assert_awaited_once() + assert graph_client.post_json.await_args.args[0] == "/teams/team-home/channels/channel-home/messages" + + @pytest.mark.anyio + async def test_existing_record_is_reused_without_force_resend(self): + graph_client = SimpleNamespace(post_json=AsyncMock()) + writer = TeamsSummaryWriter(graph_client=graph_client) + existing = {"delivery_mode": "graph", "message_id": "msg-existing"} + + result = await writer.write_summary( + _make_summary_payload(), + { + "delivery_mode": "graph", + "team_id": "team-1", + "channel_id": "channel-1", + }, + existing_record=existing, + ) + + assert result == existing + graph_client.post_json.assert_not_awaited() + + # --------------------------------------------------------------------------- # Tests: Message Handling # --------------------------------------------------------------------------- diff --git a/tests/plugins/test_teams_pipeline_plugin.py b/tests/plugins/test_teams_pipeline_plugin.py index 3fb4728d23..8e929b0b8c 100644 --- a/tests/plugins/test_teams_pipeline_plugin.py +++ b/tests/plugins/test_teams_pipeline_plugin.py @@ -82,6 +82,34 @@ def test_runtime_config_uses_existing_teams_platform_settings(): } +def test_build_pipeline_runtime_reuses_existing_teams_adapter_surface(monkeypatch, tmp_path): + from plugins.teams_pipeline import runtime as runtime_module + + class FakeWriter: + def __init__(self, platform_config=None, **kwargs) -> None: + self.platform_config = platform_config + + monkeypatch.setattr(runtime_module, "build_graph_client", lambda: object()) + monkeypatch.setattr(runtime_module, "resolve_teams_pipeline_store_path", lambda: tmp_path / "teams-store.json") + monkeypatch.setattr("plugins.platforms.teams.adapter.TeamsSummaryWriter", FakeWriter) + + gateway = SimpleNamespace( + config=GatewayConfig( + platforms={ + Platform("teams"): PlatformConfig( + enabled=True, + extra={"delivery_mode": "incoming_webhook"}, + ) + } + ) + ) + + runtime = runtime_module.build_pipeline_runtime(gateway) + + assert isinstance(runtime.teams_sender, FakeWriter) + assert runtime.teams_sender.platform_config is gateway.config.platforms[Platform("teams")] + + @pytest.mark.anyio async def test_bind_gateway_runtime_attaches_scheduler(monkeypatch, tmp_path): from plugins.teams_pipeline import runtime as runtime_module