feat(teams): add pipeline outbound delivery via existing adapter

This commit is contained in:
Dilee 2026-05-07 17:18:51 +03:00 committed by Teknium
parent a99547740d
commit 397f750bb4
4 changed files with 378 additions and 1 deletions

View file

@ -23,10 +23,14 @@ Configuration in config.yaml:
from __future__ import annotations
import asyncio
import html
import json
import logging
import os
from typing import Any, Dict, Optional
from urllib.parse import quote
import httpx
try:
from aiohttp import web
@ -93,6 +97,241 @@ _DEFAULT_PORT = 3978
_WEBHOOK_PATH = "/api/messages"
def _parse_bool(value: Any, *, default: bool = False) -> bool:
if isinstance(value, bool):
return value
if isinstance(value, str):
normalized = value.strip().lower()
if normalized in {"1", "true", "yes", "on"}:
return True
if normalized in {"0", "false", "no", "off"}:
return False
return default
class _StaticAccessTokenProvider:
"""Minimal token-provider shim so outbound Graph delivery can reuse the shared client."""
def __init__(self, access_token: str):
self._access_token = str(access_token or "").strip()
async def get_access_token(self, *, force_refresh: bool = False) -> str:
del force_refresh
if not self._access_token:
raise ValueError("TEAMS_GRAPH_ACCESS_TOKEN is required for graph delivery mode.")
return self._access_token
def clear_cache(self) -> None:
return None
class TeamsSummaryWriter:
"""Pipeline-facing Teams outbound delivery surface.
This stays inside the existing Teams platform plugin so the meeting-pipeline
PR can reuse one Teams integration surface instead of introducing a second
adapter elsewhere in the gateway core.
"""
def __init__(
self,
platform_config: PlatformConfig | None = None,
*,
graph_client: Any | None = None,
transport: httpx.AsyncBaseTransport | None = None,
) -> None:
self._platform_config = platform_config
self._graph_client = graph_client
self._transport = transport
async def write_summary(
self,
payload: Any,
config: dict[str, Any] | None,
existing_record: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
merged = self._resolve_delivery_config(config)
if existing_record and not _parse_bool(merged.get("force_resend"), default=False):
return dict(existing_record)
mode = str(merged.get("delivery_mode") or merged.get("mode") or "").strip().lower()
if not mode:
if merged.get("incoming_webhook_url"):
mode = "incoming_webhook"
elif merged.get("chat_id") or (
merged.get("team_id") and (merged.get("channel_id") or merged.get("chat_id"))
):
mode = "graph"
if mode == "incoming_webhook":
return await self._write_summary_via_incoming_webhook(payload, merged)
if mode == "graph":
return await self._write_summary_via_graph(payload, merged)
raise ValueError(
"Teams delivery_mode must be 'incoming_webhook' or 'graph'."
)
def _resolve_delivery_config(self, config: dict[str, Any] | None) -> dict[str, Any]:
merged: dict[str, Any] = {}
platform_cfg = self._platform_config
if platform_cfg is not None:
merged.update(dict(platform_cfg.extra or {}))
if platform_cfg.token and "access_token" not in merged:
merged["access_token"] = platform_cfg.token
if platform_cfg.home_channel:
merged.setdefault("channel_id", platform_cfg.home_channel.chat_id)
merged.update(dict(config or {}))
env_defaults = {
"delivery_mode": os.getenv("TEAMS_DELIVERY_MODE", ""),
"incoming_webhook_url": os.getenv("TEAMS_INCOMING_WEBHOOK_URL", ""),
"access_token": os.getenv("TEAMS_GRAPH_ACCESS_TOKEN", ""),
"team_id": os.getenv("TEAMS_TEAM_ID", ""),
"channel_id": os.getenv("TEAMS_CHANNEL_ID", ""),
"chat_id": os.getenv("TEAMS_CHAT_ID", ""),
}
for key, value in env_defaults.items():
if value and not merged.get(key):
merged[key] = value
return merged
async def _write_summary_via_incoming_webhook(
self,
payload: Any,
config: dict[str, Any],
) -> dict[str, Any]:
webhook_url = str(config.get("incoming_webhook_url") or "").strip()
if not webhook_url:
raise ValueError("TEAMS_INCOMING_WEBHOOK_URL is required for incoming_webhook mode.")
body = {"text": self._render_summary_markdown(payload)}
async with httpx.AsyncClient(timeout=20.0, transport=self._transport) as client:
response = await client.post(webhook_url, json=body)
response.raise_for_status()
return {
"delivery_mode": "incoming_webhook",
"webhook_url": webhook_url,
"status_code": response.status_code,
"delivered": True,
}
async def _write_summary_via_graph(
self,
payload: Any,
config: dict[str, Any],
) -> dict[str, Any]:
graph_client = self._build_graph_client(config)
chat_id = str(config.get("chat_id") or "").strip()
if chat_id:
path = f"/chats/{quote(chat_id, safe='')}/messages"
response = await graph_client.post_json(
path,
json_body={"body": {"contentType": "html", "content": self._render_summary_html(payload)}},
)
return {
"delivery_mode": "graph",
"target_type": "chat",
"chat_id": chat_id,
"message_id": (response or {}).get("id"),
"web_url": (response or {}).get("webUrl"),
}
team_id = str(config.get("team_id") or "").strip()
channel_id = str(config.get("channel_id") or "").strip()
if not team_id or not channel_id:
raise ValueError(
"Graph delivery mode requires chat_id, or both team_id and channel_id."
)
path = (
f"/teams/{quote(team_id, safe='')}/channels/"
f"{quote(channel_id, safe='')}/messages"
)
response = await graph_client.post_json(
path,
json_body={"body": {"contentType": "html", "content": self._render_summary_html(payload)}},
)
return {
"delivery_mode": "graph",
"target_type": "channel",
"team_id": team_id,
"channel_id": channel_id,
"message_id": (response or {}).get("id"),
"web_url": (response or {}).get("webUrl"),
}
def _build_graph_client(self, config: dict[str, Any]) -> Any:
if self._graph_client is not None:
return self._graph_client
from tools.microsoft_graph_auth import MicrosoftGraphTokenProvider
from tools.microsoft_graph_client import MicrosoftGraphClient
access_token = str(config.get("access_token") or "").strip()
if access_token:
return MicrosoftGraphClient(
_StaticAccessTokenProvider(access_token),
transport=self._transport,
)
return MicrosoftGraphClient(
MicrosoftGraphTokenProvider.from_env(),
transport=self._transport,
)
def _render_summary_markdown(self, payload: Any) -> str:
lines = [
f"**{self._title(payload)}**",
"",
f"Summary: {self._text(getattr(payload, 'summary', None), 'No summary available.')}",
"",
"Key decisions:",
*self._bullet_lines(getattr(payload, "key_decisions", None)),
"",
"Action items:",
*self._bullet_lines(getattr(payload, "action_items", None)),
"",
"Risks:",
*self._bullet_lines(getattr(payload, "risks", None)),
]
return "\n".join(lines)
def _render_summary_html(self, payload: Any) -> str:
sections = [
("Summary", [self._text(getattr(payload, "summary", None), "No summary available.")]),
("Key decisions", list(getattr(payload, "key_decisions", None) or [])),
("Action items", list(getattr(payload, "action_items", None) or [])),
("Risks", list(getattr(payload, "risks", None) or [])),
]
blocks = [f"<h2>{html.escape(self._title(payload))}</h2>"]
for heading, items in sections:
blocks.append(f"<h3>{html.escape(heading)}</h3>")
if len(items) == 1 and heading == "Summary":
blocks.append(f"<p>{html.escape(str(items[0]))}</p>")
continue
if items:
rendered = "".join(f"<li>{html.escape(str(item))}</li>" for item in items if str(item).strip())
blocks.append(rendered and f"<ul>{rendered}</ul>" or "<p>None</p>")
else:
blocks.append("<p>None</p>")
return "".join(blocks)
@staticmethod
def _title(payload: Any) -> str:
title = getattr(payload, "title", None)
if title:
return str(title)
meeting_ref = getattr(payload, "meeting_ref", None)
meeting_id = getattr(meeting_ref, "meeting_id", None) if meeting_ref else None
return f"Meeting {meeting_id or 'summary'}"
@staticmethod
def _text(value: Any, default: str) -> str:
text = str(value or "").strip()
return text or default
@classmethod
def _bullet_lines(cls, values: Any) -> list[str]:
items = [str(item).strip() for item in (values or []) if str(item).strip()]
return [f"- {item}" for item in items] or ["- None"]
class _AiohttpBridgeAdapter:
"""HttpServerAdapter that bridges the Teams SDK into an aiohttp server.

View file

@ -550,7 +550,10 @@ class TeamsMeetingPipeline:
confidence_notes=parsed.get("confidence_notes"),
notion_target=(self.config.notion or {}).get("database_id"),
linear_target=(self.config.linear or {}).get("team_id"),
teams_target=(self.config.teams_delivery or {}).get("channel_id"),
teams_target=(
(self.config.teams_delivery or {}).get("channel_id")
or (self.config.teams_delivery or {}).get("chat_id")
),
)
async def _write_sinks(self, job: TeamsMeetingPipelineJob, payload: TeamsMeetingSummaryPayload) -> None:

View file

@ -1,15 +1,19 @@
"""Tests for the Microsoft Teams platform adapter plugin."""
import asyncio
import json
import os
import sys
import types
from pathlib import Path
from types import SimpleNamespace
from unittest.mock import AsyncMock, MagicMock, patch
import httpx
import pytest
from gateway.config import Platform, PlatformConfig, HomeChannel
from plugins.teams_pipeline.models import TeamsMeetingRef, TeamsMeetingSummaryPayload
from tests.gateway._plugin_adapter_loader import load_plugin_adapter
@ -177,6 +181,7 @@ if _mt and _teams_mod.TypingActivityInput is None:
_teams_mod.TypingActivityInput = _mt.TypingActivityInput
TeamsAdapter = _teams_mod.TeamsAdapter
TeamsSummaryWriter = _teams_mod.TeamsSummaryWriter
check_requirements = _teams_mod.check_requirements
check_teams_requirements = _teams_mod.check_teams_requirements
validate_config = _teams_mod.validate_config
@ -449,6 +454,108 @@ class TestTeamsSend:
assert call_args[0][0] == "conv-id"
def _make_summary_payload():
return TeamsMeetingSummaryPayload(
meeting_ref=TeamsMeetingRef(meeting_id="meeting-123"),
title="Weekly Sync",
summary="Discussed launch readiness.",
key_decisions=["Proceed with staged rollout."],
action_items=["Send launch checklist."],
risks=["QA sign-off still pending."],
)
class TestTeamsSummaryWriter:
@pytest.mark.anyio
async def test_incoming_webhook_posts_summary_text(self):
seen = {}
def _handler(request: httpx.Request) -> httpx.Response:
seen["url"] = str(request.url)
seen["body"] = json.loads(request.content.decode("utf-8"))
return httpx.Response(200, json={"ok": True})
writer = TeamsSummaryWriter(transport=httpx.MockTransport(_handler))
payload = _make_summary_payload()
result = await writer.write_summary(
payload,
{
"delivery_mode": "incoming_webhook",
"incoming_webhook_url": "https://example.test/teams-webhook",
},
)
assert result["delivery_mode"] == "incoming_webhook"
assert seen["url"] == "https://example.test/teams-webhook"
assert "Weekly Sync" in seen["body"]["text"]
assert "Proceed with staged rollout." in seen["body"]["text"]
@pytest.mark.anyio
async def test_graph_delivery_posts_to_channel(self):
graph_client = SimpleNamespace(
post_json=AsyncMock(return_value={"id": "msg-123", "webUrl": "https://teams.example/messages/123"})
)
writer = TeamsSummaryWriter(graph_client=graph_client)
payload = _make_summary_payload()
result = await writer.write_summary(
payload,
{
"delivery_mode": "graph",
"team_id": "team-1",
"channel_id": "channel-1",
},
)
assert result["target_type"] == "channel"
assert result["message_id"] == "msg-123"
graph_client.post_json.assert_awaited_once()
path = graph_client.post_json.await_args.args[0]
body = graph_client.post_json.await_args.kwargs["json_body"]
assert path == "/teams/team-1/channels/channel-1/messages"
assert body["body"]["contentType"] == "html"
assert "Weekly Sync" in body["body"]["content"]
@pytest.mark.anyio
async def test_graph_delivery_falls_back_to_platform_home_channel(self):
graph_client = SimpleNamespace(post_json=AsyncMock(return_value={"id": "msg-home"}))
platform_config = PlatformConfig(
enabled=True,
extra={"team_id": "team-home", "delivery_mode": "graph"},
home_channel=HomeChannel(
platform=Platform("teams"),
chat_id="channel-home",
name="Teams Home",
),
)
writer = TeamsSummaryWriter(platform_config=platform_config, graph_client=graph_client)
await writer.write_summary(_make_summary_payload(), {})
graph_client.post_json.assert_awaited_once()
assert graph_client.post_json.await_args.args[0] == "/teams/team-home/channels/channel-home/messages"
@pytest.mark.anyio
async def test_existing_record_is_reused_without_force_resend(self):
graph_client = SimpleNamespace(post_json=AsyncMock())
writer = TeamsSummaryWriter(graph_client=graph_client)
existing = {"delivery_mode": "graph", "message_id": "msg-existing"}
result = await writer.write_summary(
_make_summary_payload(),
{
"delivery_mode": "graph",
"team_id": "team-1",
"channel_id": "channel-1",
},
existing_record=existing,
)
assert result == existing
graph_client.post_json.assert_not_awaited()
# ---------------------------------------------------------------------------
# Tests: Message Handling
# ---------------------------------------------------------------------------

View file

@ -82,6 +82,34 @@ def test_runtime_config_uses_existing_teams_platform_settings():
}
def test_build_pipeline_runtime_reuses_existing_teams_adapter_surface(monkeypatch, tmp_path):
from plugins.teams_pipeline import runtime as runtime_module
class FakeWriter:
def __init__(self, platform_config=None, **kwargs) -> None:
self.platform_config = platform_config
monkeypatch.setattr(runtime_module, "build_graph_client", lambda: object())
monkeypatch.setattr(runtime_module, "resolve_teams_pipeline_store_path", lambda: tmp_path / "teams-store.json")
monkeypatch.setattr("plugins.platforms.teams.adapter.TeamsSummaryWriter", FakeWriter)
gateway = SimpleNamespace(
config=GatewayConfig(
platforms={
Platform("teams"): PlatformConfig(
enabled=True,
extra={"delivery_mode": "incoming_webhook"},
)
}
)
)
runtime = runtime_module.build_pipeline_runtime(gateway)
assert isinstance(runtime.teams_sender, FakeWriter)
assert runtime.teams_sender.platform_config is gateway.config.platforms[Platform("teams")]
@pytest.mark.anyio
async def test_bind_gateway_runtime_attaches_scheduler(monkeypatch, tmp_path):
from plugins.teams_pipeline import runtime as runtime_module