feat(teams-pipeline): add plugin runtime and operator cli

Third slice of the Microsoft Teams meeting pipeline stack, salvaged
onto current main. Adds the standalone teams_pipeline plugin that
consumes Graph change notifications from the webhook listener,
resolves meeting artifacts (transcript first, recording + STT fallback
later), persists job state in a durable store, and exposes an operator
CLI for inspection, replay, subscription management, and validation.

Design choices follow maintainer review feedback on PR #19815:

- Standalone plugin rather than bolted-on core surface
  (plugins/teams_pipeline/, kind: standalone in plugin.yaml).
- Zero new model tools. The agent drives the pipeline by invoking
  the operator CLI via the terminal tool, guided by the skill that
  ships with a follow-up PR.
- Reuses the existing msgraph_webhook gateway platform for Graph
  ingress. Pipeline runtime is wired in via bind_gateway_runtime and
  gated on plugins.enabled so gateways that don't run the plugin
  boot cleanly.

Additions:

- plugins/teams_pipeline/: runtime (gateway wiring + config builder),
  pipeline core, durable SQLite store, subscription maintenance
  helpers, Graph artifact resolution, operator CLI (list, show,
  run/replay, fetch dry-run, subscriptions list, subscribe,
  renew-subscription, delete-subscription, maintain-subscriptions,
  token-health, validate).
- hermes_cli/main.py: second-pass plugin CLI discovery so any
  standalone plugin registered via ctx.register_cli_command()
  outside the memory-plugin convention path gets its subcommand
  wired into argparse without touching core.
- gateway/run.py: _teams_pipeline_plugin_enabled() config gate,
  _wire_teams_pipeline_runtime() binding after adapter setup, and
  the two runner attributes used by the runtime.

Credit to @dlkakbs for the entire plugin implementation.
This commit is contained in:
Dilee 2026-05-08 10:48:15 -07:00 committed by Teknium
parent ea86714cc0
commit 07bbd93337
14 changed files with 3332 additions and 1 deletions

View file

@ -847,6 +847,15 @@ def _platform_config_key(platform: "Platform") -> str:
return "cli" if platform == Platform.LOCAL else platform.value
def _teams_pipeline_plugin_enabled() -> bool:
"""Return True when the standalone Teams pipeline plugin is enabled."""
config = _load_gateway_config()
enabled = cfg_get(config, "plugins", "enabled", default=[])
if not isinstance(enabled, list):
return False
return "teams_pipeline" in enabled or "teams-pipeline" in enabled
def _load_gateway_config() -> dict:
"""Load and parse ~/.hermes/config.yaml, returning {} on any error.
@ -1154,6 +1163,9 @@ class GatewayRunner:
# Per-session reasoning effort overrides from /reasoning.
# Key: session_key, Value: parsed reasoning config dict.
self._session_reasoning_overrides: Dict[str, Dict[str, Any]] = {}
# Teams meeting pipeline runtime (bound later when msgraph_webhook adapter exists).
self._teams_pipeline_runtime = None
self._teams_pipeline_runtime_error: Optional[str] = None
# Track pending exec approvals per session
# Key: session_key, Value: {"command": str, "pattern_key": str, ...}
self._pending_approvals: Dict[str, Dict[str, Any]] = {}
@ -1251,6 +1263,37 @@ class GatewayRunner:
self._background_tasks: set = set()
def _wire_teams_pipeline_runtime(self) -> None:
"""Bind the Teams meeting pipeline runtime to Graph webhook ingress.
No-op when the msgraph_webhook adapter isn't running or the
teams_pipeline plugin isn't enabled — lets the gateway start cleanly
whether or not the user has opted into the pipeline.
"""
if Platform.MSGRAPH_WEBHOOK not in self.adapters:
return
if not _teams_pipeline_plugin_enabled():
logger.debug("Teams pipeline plugin is disabled; skipping runtime wiring")
return
try:
from plugins.teams_pipeline.runtime import bind_gateway_runtime
except Exception as exc:
logger.warning("Teams pipeline runtime import failed: %s", exc)
return
try:
bound = bind_gateway_runtime(self)
except Exception as exc:
logger.warning("Teams pipeline runtime wiring failed: %s", exc)
return
if bound:
logger.info("Teams pipeline runtime bound to msgraph webhook ingress")
elif self._teams_pipeline_runtime_error:
logger.warning(
"Teams pipeline runtime unavailable: %s",
self._teams_pipeline_runtime_error,
)
def _warn_if_docker_media_delivery_is_risky(self) -> None:
"""Warn when Docker-backed gateways lack an explicit export mount.
@ -3304,7 +3347,8 @@ class GatewayRunner:
# Update delivery router with adapters
self.delivery_router.adapters = self.adapters
self._wire_teams_pipeline_runtime()
self._running = True
self._update_runtime_status("running")

View file

@ -10052,7 +10052,9 @@ Examples:
# =========================================================================
try:
from plugins.memory import discover_plugin_cli_commands
from hermes_cli.plugins import discover_plugins, get_plugin_manager
seen_plugin_commands = set()
for cmd_info in discover_plugin_cli_commands():
plugin_parser = subparsers.add_parser(
cmd_info["name"],
@ -10061,6 +10063,23 @@ Examples:
formatter_class=__import__("argparse").RawDescriptionHelpFormatter,
)
cmd_info["setup_fn"](plugin_parser)
if cmd_info.get("handler_fn") is not None:
plugin_parser.set_defaults(func=cmd_info["handler_fn"])
seen_plugin_commands.add(cmd_info["name"])
discover_plugins()
for cmd_info in get_plugin_manager()._cli_commands.values():
if cmd_info["name"] in seen_plugin_commands:
continue
plugin_parser = subparsers.add_parser(
cmd_info["name"],
help=cmd_info["help"],
description=cmd_info.get("description", ""),
formatter_class=__import__("argparse").RawDescriptionHelpFormatter,
)
cmd_info["setup_fn"](plugin_parser)
if cmd_info.get("handler_fn") is not None:
plugin_parser.set_defaults(func=cmd_info["handler_fn"])
except Exception as _exc:
logging.getLogger(__name__).debug("Plugin CLI discovery failed: %s", _exc)

View file

@ -0,0 +1,23 @@
"""Teams meeting pipeline plugin.
Registers only operator-facing CLI surfaces. The agent should invoke these via
the terminal tool; no model tools are added by this plugin.
"""
from __future__ import annotations
from plugins.teams_pipeline.cli import register_cli, teams_pipeline_command
def register(ctx) -> None:
ctx.register_cli_command(
name="teams-pipeline",
help="Inspect and operate the Microsoft Teams meeting pipeline",
setup_fn=register_cli,
handler_fn=teams_pipeline_command,
description=(
"Operator CLI for the Microsoft Teams meeting pipeline. "
"Lists jobs, inspects stored runs, replays jobs, validates Graph "
"setup, and maintains Graph subscriptions."
),
)

View file

@ -0,0 +1,462 @@
"""CLI commands for the Teams meeting pipeline plugin."""
from __future__ import annotations
import argparse
import asyncio
import json
import os
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any
from hermes_constants import display_hermes_home
from gateway.config import Platform, load_gateway_config
from plugins.teams_pipeline.meetings import (
enrich_meeting_with_call_record,
fetch_preferred_transcript_text,
list_recording_artifacts,
resolve_meeting_reference,
)
from plugins.teams_pipeline.models import GraphSubscription
from plugins.teams_pipeline.pipeline import TeamsMeetingPipeline
from plugins.teams_pipeline.store import TeamsPipelineStore, resolve_teams_pipeline_store_path
from plugins.teams_pipeline.subscriptions import (
build_graph_client,
maintain_graph_subscriptions,
sync_graph_subscription_record,
)
from tools.microsoft_graph_auth import MicrosoftGraphConfigError, MicrosoftGraphTokenProvider
def register_cli(subparser: argparse.ArgumentParser) -> None:
subs = subparser.add_subparsers(dest="teams_pipeline_action")
list_p = subs.add_parser("list", aliases=["ls"], help="List recent Teams pipeline jobs")
list_p.add_argument("--limit", type=int, default=20)
list_p.add_argument("--status", default="")
list_p.add_argument("--store-path", default="")
show_p = subs.add_parser("show", help="Show a stored Teams pipeline job")
show_p.add_argument("job_id")
show_p.add_argument("--store-path", default="")
run_p = subs.add_parser("run", aliases=["replay"], help="Replay a stored Teams pipeline job")
run_p.add_argument("job_id")
run_p.add_argument("--store-path", default="")
fetch_p = subs.add_parser("fetch", aliases=["test"], help="Dry-run meeting artifact resolution")
fetch_p.add_argument("--meeting-id", default="")
fetch_p.add_argument("--join-web-url", default="")
fetch_p.add_argument("--tenant-id", default="")
fetch_p.add_argument("--call-record-id", default="")
subs_p = subs.add_parser("subscriptions", aliases=["subs"], help="List Graph subscriptions")
subs_p.add_argument("--store-path", default="")
sub_p = subs.add_parser("subscribe", help="Create a Microsoft Graph subscription")
sub_p.add_argument("--resource", required=True)
sub_p.add_argument("--notification-url", required=True)
sub_p.add_argument("--change-type", default="")
sub_p.add_argument("--expiration", default="")
sub_p.add_argument("--client-state", default="")
sub_p.add_argument("--lifecycle-notification-url", default="")
sub_p.add_argument("--latest-supported-tls-version", default="v1_2")
sub_p.add_argument("--store-path", default="")
renew_p = subs.add_parser("renew-subscription", help="Renew a Microsoft Graph subscription")
renew_p.add_argument("subscription_id")
renew_p.add_argument("--expiration", required=True)
renew_p.add_argument("--store-path", default="")
delete_p = subs.add_parser("delete-subscription", help="Delete a Microsoft Graph subscription")
delete_p.add_argument("subscription_id")
delete_p.add_argument("--store-path", default="")
maintain_p = subs.add_parser("maintain-subscriptions", help="Renew near-expiry managed subscriptions")
maintain_p.add_argument("--renew-within-hours", type=int, default=24)
maintain_p.add_argument("--extend-hours", type=int, default=24)
maintain_p.add_argument("--dry-run", action="store_true")
maintain_p.add_argument("--store-path", default="")
maintain_p.add_argument("--client-state", default="")
token_p = subs.add_parser("token-health", aliases=["token"], help="Inspect Graph token health")
token_p.add_argument("--force-refresh", action="store_true")
validate_p = subs.add_parser("validate", help="Validate Teams pipeline configuration snapshot")
validate_p.add_argument("--store-path", default="")
subparser.set_defaults(func=teams_pipeline_command)
def teams_pipeline_command(args: argparse.Namespace) -> int:
action = getattr(args, "teams_pipeline_action", None)
if not action:
print(
"Usage: hermes teams-pipeline "
"{list|show|run|fetch|subscriptions|subscribe|renew-subscription|delete-subscription|maintain-subscriptions|token-health|validate}"
)
return 2
try:
if action in ("list", "ls"):
_cmd_list(args)
elif action == "show":
_cmd_show(args)
elif action in ("run", "replay"):
_cmd_run(args)
elif action in ("fetch", "test"):
_cmd_fetch(args)
elif action in ("subscriptions", "subs"):
_cmd_subscriptions(args)
elif action == "subscribe":
_cmd_subscribe(args)
elif action == "renew-subscription":
_cmd_renew_subscription(args)
elif action == "delete-subscription":
_cmd_delete_subscription(args)
elif action == "maintain-subscriptions":
_cmd_maintain_subscriptions(args)
elif action in ("token-health", "token"):
_cmd_token_health(args)
elif action == "validate":
_cmd_validate(args)
else:
print(f"Unknown teams-pipeline action: {action}")
return 2
return 0
except MicrosoftGraphConfigError:
print(_graph_setup_hint())
return 1
def _run_async(coro):
return asyncio.run(coro)
def _store_path(path_arg: str | None) -> Path:
return resolve_teams_pipeline_store_path(path_arg)
def _graph_setup_hint() -> str:
return f"""
Microsoft Graph is not configured. Add these to {display_hermes_home()}/.env:
MSGRAPH_TENANT_ID=...
MSGRAPH_CLIENT_ID=...
MSGRAPH_CLIENT_SECRET=...
Then restart the gateway or rerun this command.
"""
def _iso_utc_timestamp(hours_from_now: int) -> str:
return (datetime.now(timezone.utc) + timedelta(hours=hours_from_now)).replace(
microsecond=0
).isoformat().replace("+00:00", "Z")
def _default_change_type_for_resource(resource: str) -> str:
normalized = str(resource or "").strip().lower()
if normalized.startswith("communications/onlinemeetings/getalltranscripts"):
return "created"
if normalized.startswith("communications/onlinemeetings/getallrecordings"):
return "created"
if normalized.startswith("communications/callrecords"):
return "created"
return "updated"
def _compact_job(job: dict) -> dict:
payload = dict(job)
summary = dict(payload.get("summary_payload") or {})
transcript = summary.pop("transcript_text", None)
if transcript:
summary["transcript_preview"] = str(transcript)[:240]
payload["summary_payload"] = summary or None
return payload
def _sync_subscription_record(
store: TeamsPipelineStore,
subscription_payload: dict[str, Any],
*,
status: str = "active",
renewed: bool = False,
) -> dict[str, Any]:
normalized = GraphSubscription.from_dict(subscription_payload).to_dict()
normalized["status"] = status
if renewed:
normalized["latest_renewal_at"] = _iso_utc_timestamp(0)
return store.upsert_subscription(normalized["subscription_id"], normalized)
def _validate_configuration_snapshot(store: TeamsPipelineStore) -> dict[str, Any]:
env = os.environ
issues: list[str] = []
warnings: list[str] = []
gateway_config = load_gateway_config()
webhook_config = gateway_config.platforms.get(Platform.MSGRAPH_WEBHOOK)
teams_config = gateway_config.platforms.get(Platform("teams"))
graph = {
"tenant_id": bool(env.get("MSGRAPH_TENANT_ID")),
"client_id": bool(env.get("MSGRAPH_CLIENT_ID")),
"client_secret": bool(env.get("MSGRAPH_CLIENT_SECRET")),
}
webhook_enabled = bool(webhook_config and webhook_config.enabled)
teams_enabled = bool(teams_config and teams_config.enabled)
teams_extra = dict((teams_config.extra or {}) if teams_config else {})
teams_mode = str(teams_extra.get("delivery_mode") or "").strip() or None
if not all(graph.values()):
issues.append("Microsoft Graph app-only credentials are incomplete.")
if not webhook_enabled:
issues.append("MSGRAPH_WEBHOOK_ENABLED is not enabled.")
if not teams_enabled:
warnings.append("Teams outbound delivery is disabled.")
elif teams_mode == "incoming_webhook":
if not teams_extra.get("incoming_webhook_url"):
issues.append("TEAMS_INCOMING_WEBHOOK_URL is required for incoming_webhook mode.")
elif teams_mode == "graph":
missing: list[str] = []
has_graph_delivery_token = bool(
(teams_config.token if teams_config else "") or teams_extra.get("access_token")
)
has_graph_app_credentials = all(graph.values())
if not has_graph_delivery_token and not has_graph_app_credentials:
missing.append(
"TEAMS_GRAPH_ACCESS_TOKEN or complete MSGRAPH_* app credentials"
)
if not teams_extra.get("team_id"):
missing.append("TEAMS_TEAM_ID")
channel_id = teams_extra.get("channel_id") or teams_extra.get("chat_id")
if not channel_id and not (teams_config and teams_config.home_channel):
missing.append("TEAMS_CHANNEL_ID")
for key in missing:
issues.append(f"{key} is required for graph delivery mode.")
else:
warnings.append("TEAMS_DELIVERY_MODE is not set.")
return {
"ok": not issues,
"issues": issues,
"warnings": warnings,
"graph_config": graph,
"webhook_enabled": webhook_enabled,
"teams_enabled": teams_enabled,
"teams_delivery_mode": teams_mode,
"store_path": str(store.path),
"store_stats": store.stats(),
}
def _cmd_list(args) -> None:
store = TeamsPipelineStore(_store_path(getattr(args, "store_path", None)))
jobs = list(store.list_jobs().values())
status = str(getattr(args, "status", "") or "").strip().lower()
if status:
jobs = [job for job in jobs if str(job.get("status") or "").lower() == status]
jobs.sort(key=lambda item: str((item or {}).get("updated_at") or ""), reverse=True)
limit = max(1, min(int(getattr(args, "limit", 20) or 20), 100))
jobs = jobs[:limit]
if not jobs:
print("No Teams meeting pipeline jobs found.")
return
print(f"\n{len(jobs)} Teams pipeline job(s):\n")
for job in jobs:
meeting_id = ((job.get("meeting_ref") or {}).get("meeting_id") or "unknown")
print(f"{job.get('job_id')}")
print(f" status: {job.get('status')}")
print(f" meeting: {meeting_id}")
if job.get("selected_artifact_strategy"):
print(f" strategy: {job.get('selected_artifact_strategy')}")
if job.get("updated_at"):
print(f" updated: {job.get('updated_at')}")
if job.get("error_info"):
print(f" error: {job.get('error_info')}")
print()
def _cmd_show(args) -> None:
job_id = str(getattr(args, "job_id", "") or "").strip()
if not job_id:
print("job_id is required")
return
store = TeamsPipelineStore(_store_path(getattr(args, "store_path", None)))
job = store.get_job(job_id)
if not job:
print(f"Unknown job: {job_id}")
return
print(json.dumps(_compact_job(job), indent=2, sort_keys=True))
def _cmd_run(args) -> None:
job_id = str(getattr(args, "job_id", "") or "").strip()
if not job_id:
print("job_id is required")
return
store = TeamsPipelineStore(_store_path(getattr(args, "store_path", None)))
pipeline = TeamsMeetingPipeline(graph_client=build_graph_client(), store=store, config={})
result = _run_async(pipeline.run_job(job_id))
print(json.dumps(_compact_job(result.to_dict()), indent=2, sort_keys=True))
def _cmd_fetch(args) -> None:
meeting_id = str(getattr(args, "meeting_id", "") or "").strip() or None
join_web_url = str(getattr(args, "join_web_url", "") or "").strip() or None
tenant_id = str(getattr(args, "tenant_id", "") or "").strip() or None
call_record_id = str(getattr(args, "call_record_id", "") or "").strip() or None
if not meeting_id and not join_web_url:
print("meeting_id or join_web_url is required")
return
client = build_graph_client()
meeting_ref = _run_async(
resolve_meeting_reference(
client,
meeting_id=meeting_id,
join_web_url=join_web_url,
tenant_id=tenant_id,
)
)
transcript_artifact, transcript_text = _run_async(fetch_preferred_transcript_text(client, meeting_ref))
recordings = _run_async(list_recording_artifacts(client, meeting_ref))
call_record = _run_async(
enrich_meeting_with_call_record(client, meeting_ref, call_record_id=call_record_id)
)
print(
json.dumps(
{
"meeting_ref": meeting_ref.to_dict(),
"transcript_available": bool(transcript_artifact and transcript_text),
"transcript_artifact": transcript_artifact.to_dict() if transcript_artifact else None,
"transcript_preview": (transcript_text or "")[:240] or None,
"recording_count": len(recordings),
"recordings": [recording.to_dict() for recording in recordings[:5]],
"call_record": call_record.to_dict() if call_record else None,
},
indent=2,
sort_keys=True,
)
)
def _cmd_subscriptions(args) -> None:
store = TeamsPipelineStore(_store_path(getattr(args, "store_path", None)))
client = build_graph_client()
subscriptions = _run_async(client.collect_paginated("/subscriptions"))
for sub in subscriptions:
try:
_sync_subscription_record(store, sub, status="active")
except Exception:
continue
if not subscriptions:
print("No Microsoft Graph subscriptions found.")
return
print(f"\n{len(subscriptions)} Microsoft Graph subscription(s):\n")
for sub in subscriptions:
print(f"{sub.get('id') or 'unknown'}")
print(f" resource: {sub.get('resource') or 'unknown'}")
print(f" changeType: {sub.get('changeType') or 'unknown'}")
if sub.get("expirationDateTime"):
print(f" expires: {sub.get('expirationDateTime')}")
if sub.get("notificationUrl"):
print(f" notify: {sub.get('notificationUrl')}")
print()
def _cmd_subscribe(args) -> None:
store = TeamsPipelineStore(_store_path(getattr(args, "store_path", None)))
resource = str(getattr(args, "resource", "") or "").strip()
notification_url = str(getattr(args, "notification_url", "") or "").strip()
change_type = str(getattr(args, "change_type", "") or "").strip() or _default_change_type_for_resource(resource)
expiration = str(getattr(args, "expiration", "") or "").strip() or _iso_utc_timestamp(1)
client_state = str(getattr(args, "client_state", "") or "").strip()
lifecycle_url = str(getattr(args, "lifecycle_notification_url", "") or "").strip()
tls_version = str(getattr(args, "latest_supported_tls_version", "") or "").strip() or "v1_2"
payload = {
"changeType": change_type,
"notificationUrl": notification_url,
"resource": resource,
"expirationDateTime": expiration,
"latestSupportedTlsVersion": tls_version,
}
if client_state:
payload["clientState"] = client_state
if lifecycle_url:
payload["lifecycleNotificationUrl"] = lifecycle_url
result = _run_async(build_graph_client().post_json("/subscriptions", json_body=payload))
_sync_subscription_record(store, result, status="active")
print(json.dumps(result, indent=2, sort_keys=True))
def _cmd_renew_subscription(args) -> None:
subscription_id = str(getattr(args, "subscription_id", "") or "").strip()
expiration = str(getattr(args, "expiration", "") or "").strip()
if not subscription_id or not expiration:
print("subscription_id and --expiration are required")
return
store = TeamsPipelineStore(_store_path(getattr(args, "store_path", None)))
result = _run_async(
build_graph_client().patch_json(
f"/subscriptions/{subscription_id}",
json_body={"expirationDateTime": expiration},
)
)
merged = {"id": subscription_id, **(result or {}), "expirationDateTime": expiration}
_sync_subscription_record(store, merged, status="active", renewed=True)
print(json.dumps(merged, indent=2, sort_keys=True))
def _cmd_delete_subscription(args) -> None:
subscription_id = str(getattr(args, "subscription_id", "") or "").strip()
if not subscription_id:
print("subscription_id is required")
return
store = TeamsPipelineStore(_store_path(getattr(args, "store_path", None)))
result = _run_async(build_graph_client().delete(f"/subscriptions/{subscription_id}"))
store.delete_subscription(subscription_id)
print(json.dumps({"subscription_id": subscription_id, "result": result}, indent=2, sort_keys=True))
def _cmd_maintain_subscriptions(args) -> None:
store = TeamsPipelineStore(_store_path(getattr(args, "store_path", None)))
result = _run_async(
maintain_graph_subscriptions(
client=build_graph_client(),
store=store,
renew_within_hours=int(getattr(args, "renew_within_hours", 24) or 24),
extend_hours=int(getattr(args, "extend_hours", 24) or 24),
dry_run=bool(getattr(args, "dry_run", False)),
client_state=str(getattr(args, "client_state", "") or "").strip() or None,
)
)
print(json.dumps(result, indent=2, sort_keys=True))
def _cmd_token_health(args) -> None:
provider = MicrosoftGraphTokenProvider.from_env()
health = provider.inspect_token_health()
payload = dict(health)
if getattr(args, "force_refresh", False):
try:
token = _run_async(provider.get_access_token(force_refresh=True))
payload["last_refresh_succeeded"] = True
payload["access_token_length"] = len(token or "")
except Exception as exc:
payload["last_refresh_succeeded"] = False
payload["refresh_error"] = str(exc)
print(json.dumps(payload, indent=2, sort_keys=True))
def _cmd_validate(args) -> None:
store = TeamsPipelineStore(_store_path(getattr(args, "store_path", None)))
snapshot = _validate_configuration_snapshot(store)
print(json.dumps(snapshot, indent=2, sort_keys=True))

View file

@ -0,0 +1,333 @@
"""Graph-backed Teams meeting helpers for the plugin runtime."""
from __future__ import annotations
import tempfile
from pathlib import Path
from typing import Any
from urllib.parse import quote
from plugins.teams_pipeline.models import MeetingArtifact, TeamsMeetingRef
from tools.microsoft_graph_client import MicrosoftGraphAPIError, MicrosoftGraphClient
class TeamsMeetingError(RuntimeError):
"""Base class for Teams meeting pipeline failures."""
class TeamsMeetingNotFoundError(TeamsMeetingError):
"""Raised when the meeting cannot be resolved from Graph."""
class TeamsMeetingArtifactNotFoundError(TeamsMeetingError):
"""Raised when a transcript or recording cannot be found."""
class TeamsMeetingPermissionError(TeamsMeetingError):
"""Raised when Graph access is denied for the requested resource."""
def _meeting_path(meeting_ref: TeamsMeetingRef | str) -> str:
meeting_id = meeting_ref.meeting_id if isinstance(meeting_ref, TeamsMeetingRef) else str(meeting_ref)
return f"/communications/onlineMeetings/{quote(meeting_id, safe='')}"
def _wrap_graph_error(exc: MicrosoftGraphAPIError, *, missing_message: str) -> TeamsMeetingError:
if exc.status_code in (401, 403):
return TeamsMeetingPermissionError(str(exc))
if exc.status_code == 404:
return TeamsMeetingNotFoundError(missing_message)
return TeamsMeetingError(str(exc))
def _parse_organizer_user_id(payload: dict[str, Any]) -> str | None:
organizer = payload.get("organizer")
if not isinstance(organizer, dict):
return None
identity = organizer.get("identity")
if not isinstance(identity, dict):
return None
user = identity.get("user")
if not isinstance(user, dict):
return None
return user.get("id")
def _parse_thread_id(payload: dict[str, Any]) -> str | None:
chat = payload.get("chatInfo")
if isinstance(chat, dict):
thread_id = chat.get("threadId")
if thread_id:
return str(thread_id)
return payload.get("threadId")
def _normalize_meeting_ref(payload: dict[str, Any], *, tenant_id: str | None = None) -> TeamsMeetingRef:
metadata = {
key: payload.get(key)
for key in ("subject", "startDateTime", "endDateTime", "createdDateTime")
if payload.get(key) is not None
}
participants = payload.get("participants")
if participants is not None:
metadata["participants"] = participants
return TeamsMeetingRef(
meeting_id=str(payload.get("id") or "").strip(),
organizer_user_id=_parse_organizer_user_id(payload),
join_web_url=payload.get("joinWebUrl"),
calendar_event_id=payload.get("calendarEventId"),
thread_id=_parse_thread_id(payload),
tenant_id=tenant_id or payload.get("tenantId"),
metadata=metadata,
)
def _normalize_artifact(
artifact_type: str,
payload: dict[str, Any],
*,
default_source_url: str | None = None,
) -> MeetingArtifact:
metadata = dict(payload)
download_url = (
payload.get("@microsoft.graph.downloadUrl")
or payload.get("downloadUrl")
or payload.get("recordingContentUrl")
or payload.get("transcriptContentUrl")
)
source_url = payload.get("webUrl") or payload.get("contentUrl") or default_source_url
return MeetingArtifact(
artifact_type=artifact_type, # type: ignore[arg-type]
artifact_id=str(payload.get("id") or "").strip(),
display_name=payload.get("displayName") or payload.get("name"),
content_type=payload.get("contentType") or payload.get("fileMimeType"),
source_url=source_url,
download_url=download_url,
created_at=payload.get("createdDateTime"),
available_at=payload.get("lastModifiedDateTime") or payload.get("meetingEndDateTime"),
size_bytes=payload.get("size"),
metadata=metadata,
)
def _transcript_sort_key(artifact: MeetingArtifact) -> tuple[int, int, str]:
status = str(artifact.metadata.get("status") or "").lower()
has_download = int(bool(artifact.download_url or artifact.source_url))
is_completed = int(status in {"available", "completed", "succeeded"})
timestamp = ""
if artifact.available_at is not None:
timestamp = artifact.available_at.isoformat()
elif artifact.created_at is not None:
timestamp = artifact.created_at.isoformat()
return (is_completed, has_download, timestamp)
def _recording_download_path(meeting_ref: TeamsMeetingRef, artifact: MeetingArtifact) -> str:
if artifact.download_url:
return artifact.download_url
return f"{_meeting_path(meeting_ref)}/recordings/{quote(artifact.artifact_id, safe='')}/content"
def _transcript_download_path(meeting_ref: TeamsMeetingRef, artifact: MeetingArtifact) -> str:
if artifact.download_url:
return artifact.download_url
return f"{_meeting_path(meeting_ref)}/transcripts/{quote(artifact.artifact_id, safe='')}/content"
async def resolve_meeting_reference(
client: MicrosoftGraphClient,
*,
meeting_id: str | None = None,
join_web_url: str | None = None,
tenant_id: str | None = None,
) -> TeamsMeetingRef:
if meeting_id:
try:
payload = await client.get_json(_meeting_path(meeting_id))
except MicrosoftGraphAPIError as exc:
raise _wrap_graph_error(exc, missing_message=f"Teams meeting not found: {meeting_id}") from exc
if not isinstance(payload, dict) or not payload.get("id"):
raise TeamsMeetingNotFoundError(f"Teams meeting not found: {meeting_id}")
return _normalize_meeting_ref(payload, tenant_id=tenant_id)
if join_web_url:
escaped_join_url = join_web_url.replace("'", "''")
try:
payload = await client.get_json(
"/communications/onlineMeetings",
params={"$filter": f"JoinWebUrl eq '{escaped_join_url}'"},
)
except MicrosoftGraphAPIError as exc:
raise _wrap_graph_error(
exc,
missing_message=f"Teams meeting not found for join URL: {join_web_url}",
) from exc
candidates = payload.get("value") if isinstance(payload, dict) else None
if not isinstance(candidates, list) or not candidates:
raise TeamsMeetingNotFoundError(f"Teams meeting not found for join URL: {join_web_url}")
return _normalize_meeting_ref(candidates[0], tenant_id=tenant_id)
raise ValueError("Either meeting_id or join_web_url is required.")
async def list_transcript_artifacts(
client: MicrosoftGraphClient,
meeting_ref: TeamsMeetingRef,
) -> list[MeetingArtifact]:
try:
payloads = await client.collect_paginated(f"{_meeting_path(meeting_ref)}/transcripts")
except MicrosoftGraphAPIError as exc:
raise _wrap_graph_error(
exc,
missing_message=f"No transcripts found for Teams meeting {meeting_ref.meeting_id}",
) from exc
return [_normalize_artifact("transcript", payload) for payload in payloads if isinstance(payload, dict)]
def select_preferred_transcript(candidates: list[MeetingArtifact]) -> MeetingArtifact | None:
transcripts = [candidate for candidate in candidates if candidate.artifact_type == "transcript"]
if not transcripts:
return None
return sorted(transcripts, key=_transcript_sort_key, reverse=True)[0]
async def download_transcript_text(
client: MicrosoftGraphClient,
meeting_ref: TeamsMeetingRef,
transcript: MeetingArtifact,
*,
encoding: str = "utf-8",
) -> str:
suffix = Path(transcript.display_name or "transcript.vtt").suffix or ".txt"
with tempfile.NamedTemporaryFile(prefix="teams-transcript-", suffix=suffix, delete=False) as handle:
destination = Path(handle.name)
try:
await client.download_to_file(_transcript_download_path(meeting_ref, transcript), destination)
text = destination.read_text(encoding=encoding).strip()
except MicrosoftGraphAPIError as exc:
raise _wrap_graph_error(
exc,
missing_message=(
f"Transcript {transcript.artifact_id} not found for meeting {meeting_ref.meeting_id}"
),
) from exc
finally:
try:
destination.unlink(missing_ok=True)
except OSError:
pass
if not text:
raise TeamsMeetingArtifactNotFoundError(
f"Transcript {transcript.artifact_id} for meeting {meeting_ref.meeting_id} was empty."
)
return text
async def fetch_preferred_transcript_text(
client: MicrosoftGraphClient,
meeting_ref: TeamsMeetingRef,
) -> tuple[MeetingArtifact | None, str | None]:
transcripts = await list_transcript_artifacts(client, meeting_ref)
transcript = select_preferred_transcript(transcripts)
if transcript is None:
return None, None
try:
return transcript, await download_transcript_text(client, meeting_ref, transcript)
except TeamsMeetingArtifactNotFoundError:
return None, None
async def list_recording_artifacts(
client: MicrosoftGraphClient,
meeting_ref: TeamsMeetingRef,
) -> list[MeetingArtifact]:
try:
payloads = await client.collect_paginated(f"{_meeting_path(meeting_ref)}/recordings")
except MicrosoftGraphAPIError as exc:
raise _wrap_graph_error(
exc,
missing_message=f"No recordings found for Teams meeting {meeting_ref.meeting_id}",
) from exc
return [_normalize_artifact("recording", payload) for payload in payloads if isinstance(payload, dict)]
async def download_recording_artifact(
client: MicrosoftGraphClient,
meeting_ref: TeamsMeetingRef,
recording: MeetingArtifact,
destination: str | Path,
) -> dict[str, Any]:
destination_path = Path(destination)
try:
result = await client.download_to_file(
_recording_download_path(meeting_ref, recording),
destination_path,
)
except MicrosoftGraphAPIError as exc:
raise _wrap_graph_error(
exc,
missing_message=f"Recording {recording.artifact_id} not found for meeting {meeting_ref.meeting_id}",
) from exc
return {
"artifact": recording.to_dict(),
"path": str(destination_path),
"size_bytes": result.get("size_bytes") or recording.size_bytes,
"content_type": result.get("content_type") or recording.content_type,
}
async def fetch_call_record_artifact(
client: MicrosoftGraphClient,
*,
call_record_id: str,
allow_permission_errors: bool = True,
) -> MeetingArtifact | None:
try:
payload = await client.get_json(f"/communications/callRecords/{quote(call_record_id, safe='')}")
except MicrosoftGraphAPIError as exc:
if exc.status_code in (401, 403) and allow_permission_errors:
return None
if exc.status_code == 404:
return None
raise _wrap_graph_error(exc, missing_message=f"Call record not found: {call_record_id}") from exc
if not isinstance(payload, dict) or not payload.get("id"):
return None
metrics = {
"version": payload.get("version"),
"modalities": payload.get("modalities"),
"participant_count": len(payload.get("participants") or []),
"organizer": _parse_organizer_user_id(payload),
}
sessions = payload.get("sessions") or []
if sessions:
metrics["session_count"] = len(sessions)
return MeetingArtifact(
artifact_type="call_record",
artifact_id=str(payload["id"]),
display_name=payload.get("type") or "call_record",
source_url=payload.get("webUrl"),
created_at=payload.get("startDateTime"),
available_at=payload.get("endDateTime"),
metadata={"call_record": payload, "metrics": metrics},
)
async def enrich_meeting_with_call_record(
client: MicrosoftGraphClient,
meeting_ref: TeamsMeetingRef,
*,
call_record_id: str | None = None,
allow_permission_errors: bool = True,
) -> MeetingArtifact | None:
resolved_call_record_id = call_record_id or meeting_ref.metadata.get("call_record_id")
if not resolved_call_record_id:
return None
return await fetch_call_record_artifact(
client,
call_record_id=str(resolved_call_record_id),
allow_permission_errors=allow_permission_errors,
)

View file

@ -0,0 +1,350 @@
"""Normalized models for the Teams meeting pipeline plugin."""
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any, Literal
ArtifactType = Literal["transcript", "recording", "call_record"]
def _parse_datetime(value: Any) -> datetime | None:
if value is None or isinstance(value, datetime):
return value
text = str(value).strip()
if not text:
return None
if text.endswith("Z"):
text = f"{text[:-1]}+00:00"
parsed = datetime.fromisoformat(text)
if parsed.tzinfo is None:
return parsed.replace(tzinfo=timezone.utc)
return parsed
def _serialize_datetime(value: datetime | None) -> str | None:
if value is None:
return None
normalized = value.astimezone(timezone.utc)
return normalized.isoformat().replace("+00:00", "Z")
def _clean_dict(values: dict[str, Any]) -> dict[str, Any]:
return {key: value for key, value in values.items() if value is not None}
@dataclass
class GraphSubscription:
subscription_id: str
resource: str
change_type: str
notification_url: str
expiration_datetime: datetime
client_state: str | None = None
latest_renewal_at: datetime | None = None
status: str | None = None
def __post_init__(self) -> None:
if not self.subscription_id.strip():
raise ValueError("GraphSubscription.subscription_id is required.")
if not self.resource.strip():
raise ValueError("GraphSubscription.resource is required.")
if not self.change_type.strip():
raise ValueError("GraphSubscription.change_type is required.")
if not self.notification_url.strip():
raise ValueError("GraphSubscription.notification_url is required.")
self.expiration_datetime = _parse_datetime(self.expiration_datetime)
self.latest_renewal_at = _parse_datetime(self.latest_renewal_at)
if self.expiration_datetime is None:
raise ValueError("GraphSubscription.expiration_datetime is required.")
@classmethod
def from_dict(cls, payload: dict[str, Any]) -> "GraphSubscription":
return cls(
subscription_id=str(payload.get("subscription_id") or payload.get("id") or "").strip(),
resource=str(payload.get("resource") or "").strip(),
change_type=str(payload.get("change_type") or payload.get("changeType") or "").strip(),
notification_url=str(
payload.get("notification_url") or payload.get("notificationUrl") or ""
).strip(),
expiration_datetime=payload.get("expiration_datetime")
or payload.get("expirationDateTime"),
client_state=payload.get("client_state") or payload.get("clientState"),
latest_renewal_at=payload.get("latest_renewal_at") or payload.get("latestRenewalAt"),
status=payload.get("status"),
)
def to_dict(self) -> dict[str, Any]:
return _clean_dict(
{
"subscription_id": self.subscription_id,
"resource": self.resource,
"change_type": self.change_type,
"notification_url": self.notification_url,
"expiration_datetime": _serialize_datetime(self.expiration_datetime),
"client_state": self.client_state,
"latest_renewal_at": _serialize_datetime(self.latest_renewal_at),
"status": self.status,
}
)
@dataclass
class TeamsMeetingRef:
meeting_id: str
organizer_user_id: str | None = None
join_web_url: str | None = None
calendar_event_id: str | None = None
thread_id: str | None = None
tenant_id: str | None = None
metadata: dict[str, Any] = field(default_factory=dict)
def __post_init__(self) -> None:
if not self.meeting_id.strip():
raise ValueError("TeamsMeetingRef.meeting_id is required.")
@classmethod
def from_dict(cls, payload: dict[str, Any]) -> "TeamsMeetingRef":
return cls(
meeting_id=str(payload.get("meeting_id") or payload.get("id") or "").strip(),
organizer_user_id=payload.get("organizer_user_id") or payload.get("organizerUserId"),
join_web_url=payload.get("join_web_url") or payload.get("joinWebUrl"),
calendar_event_id=payload.get("calendar_event_id") or payload.get("calendarEventId"),
thread_id=payload.get("thread_id") or payload.get("threadId"),
tenant_id=payload.get("tenant_id") or payload.get("tenantId"),
metadata=dict(payload.get("metadata") or {}),
)
def to_dict(self) -> dict[str, Any]:
return _clean_dict(
{
"meeting_id": self.meeting_id,
"organizer_user_id": self.organizer_user_id,
"join_web_url": self.join_web_url,
"calendar_event_id": self.calendar_event_id,
"thread_id": self.thread_id,
"tenant_id": self.tenant_id,
"metadata": self.metadata or None,
}
)
@dataclass
class MeetingArtifact:
artifact_type: ArtifactType
artifact_id: str
display_name: str | None = None
content_type: str | None = None
source_url: str | None = None
download_url: str | None = None
created_at: datetime | None = None
available_at: datetime | None = None
size_bytes: int | None = None
metadata: dict[str, Any] = field(default_factory=dict)
def __post_init__(self) -> None:
if self.artifact_type not in ("transcript", "recording", "call_record"):
raise ValueError(
"MeetingArtifact.artifact_type must be transcript, recording, or call_record."
)
if not self.artifact_id.strip():
raise ValueError("MeetingArtifact.artifact_id is required.")
self.created_at = _parse_datetime(self.created_at)
self.available_at = _parse_datetime(self.available_at)
if self.size_bytes is not None:
self.size_bytes = int(self.size_bytes)
@classmethod
def from_dict(cls, payload: dict[str, Any]) -> "MeetingArtifact":
return cls(
artifact_type=payload.get("artifact_type") or payload.get("artifactType"),
artifact_id=str(payload.get("artifact_id") or payload.get("id") or "").strip(),
display_name=payload.get("display_name")
or payload.get("displayName")
or payload.get("name"),
content_type=payload.get("content_type") or payload.get("contentType"),
source_url=payload.get("source_url") or payload.get("sourceUrl") or payload.get("webUrl"),
download_url=payload.get("download_url")
or payload.get("downloadUrl")
or payload.get("@microsoft.graph.downloadUrl"),
created_at=payload.get("created_at") or payload.get("createdDateTime"),
available_at=payload.get("available_at")
or payload.get("availableDateTime")
or payload.get("lastModifiedDateTime"),
size_bytes=payload.get("size_bytes") or payload.get("size"),
metadata=dict(payload.get("metadata") or {}),
)
def to_dict(self) -> dict[str, Any]:
return _clean_dict(
{
"artifact_type": self.artifact_type,
"artifact_id": self.artifact_id,
"display_name": self.display_name,
"content_type": self.content_type,
"source_url": self.source_url,
"download_url": self.download_url,
"created_at": _serialize_datetime(self.created_at),
"available_at": _serialize_datetime(self.available_at),
"size_bytes": self.size_bytes,
"metadata": self.metadata or None,
}
)
@dataclass
class TeamsMeetingSummaryPayload:
meeting_ref: TeamsMeetingRef
title: str | None = None
start_time: datetime | None = None
end_time: datetime | None = None
participants: list[str] = field(default_factory=list)
transcript_text: str | None = None
summary: str | None = None
key_decisions: list[str] = field(default_factory=list)
action_items: list[str] = field(default_factory=list)
risks: list[str] = field(default_factory=list)
call_metrics: dict[str, Any] = field(default_factory=dict)
source_artifacts: list[MeetingArtifact] = field(default_factory=list)
confidence: str | None = None
confidence_notes: str | None = None
notion_target: str | None = None
linear_target: str | None = None
teams_target: str | None = None
def __post_init__(self) -> None:
self.start_time = _parse_datetime(self.start_time)
self.end_time = _parse_datetime(self.end_time)
@classmethod
def from_dict(cls, payload: dict[str, Any]) -> "TeamsMeetingSummaryPayload":
return cls(
meeting_ref=TeamsMeetingRef.from_dict(payload["meeting_ref"]),
title=payload.get("title"),
start_time=payload.get("start_time") or payload.get("startTime"),
end_time=payload.get("end_time") or payload.get("endTime"),
participants=list(payload.get("participants") or []),
transcript_text=payload.get("transcript_text") or payload.get("transcriptText"),
summary=payload.get("summary"),
key_decisions=list(payload.get("key_decisions") or payload.get("keyDecisions") or []),
action_items=list(payload.get("action_items") or payload.get("actionItems") or []),
risks=list(payload.get("risks") or []),
call_metrics=dict(payload.get("call_metrics") or payload.get("callMetrics") or {}),
source_artifacts=[
MeetingArtifact.from_dict(item) for item in payload.get("source_artifacts", [])
],
confidence=payload.get("confidence"),
confidence_notes=payload.get("confidence_notes") or payload.get("confidenceNotes"),
notion_target=payload.get("notion_target") or payload.get("notionTarget"),
linear_target=payload.get("linear_target") or payload.get("linearTarget"),
teams_target=payload.get("teams_target") or payload.get("teamsTarget"),
)
def to_dict(self) -> dict[str, Any]:
return _clean_dict(
{
"meeting_ref": self.meeting_ref.to_dict(),
"title": self.title,
"start_time": _serialize_datetime(self.start_time),
"end_time": _serialize_datetime(self.end_time),
"participants": self.participants or None,
"transcript_text": self.transcript_text,
"summary": self.summary,
"key_decisions": self.key_decisions or None,
"action_items": self.action_items or None,
"risks": self.risks or None,
"call_metrics": self.call_metrics or None,
"source_artifacts": [artifact.to_dict() for artifact in self.source_artifacts]
or None,
"confidence": self.confidence,
"confidence_notes": self.confidence_notes,
"notion_target": self.notion_target,
"linear_target": self.linear_target,
"teams_target": self.teams_target,
}
)
@dataclass
class TeamsMeetingPipelineJob:
job_id: str
event_id: str
source_event_type: str
dedupe_key: str
status: str
retry_count: int = 0
created_at: datetime | None = None
updated_at: datetime | None = None
meeting_ref: TeamsMeetingRef | None = None
selected_artifact_strategy: str | None = None
summary_payload: TeamsMeetingSummaryPayload | None = None
error_info: dict[str, Any] = field(default_factory=dict)
def __post_init__(self) -> None:
if not self.job_id.strip():
raise ValueError("TeamsMeetingPipelineJob.job_id is required.")
if not self.event_id.strip():
raise ValueError("TeamsMeetingPipelineJob.event_id is required.")
if not self.source_event_type.strip():
raise ValueError("TeamsMeetingPipelineJob.source_event_type is required.")
if not self.dedupe_key.strip():
raise ValueError("TeamsMeetingPipelineJob.dedupe_key is required.")
if not self.status.strip():
raise ValueError("TeamsMeetingPipelineJob.status is required.")
self.retry_count = int(self.retry_count)
self.created_at = _parse_datetime(self.created_at)
self.updated_at = _parse_datetime(self.updated_at)
@classmethod
def from_dict(cls, payload: dict[str, Any]) -> "TeamsMeetingPipelineJob":
meeting_ref_payload = payload.get("meeting_ref") or payload.get("meetingRef")
summary_payload = payload.get("summary_payload") or payload.get("summaryPayload")
return cls(
job_id=str(payload.get("job_id") or payload.get("jobId") or "").strip(),
event_id=str(payload.get("event_id") or payload.get("eventId") or "").strip(),
source_event_type=str(
payload.get("source_event_type") or payload.get("sourceEventType") or ""
).strip(),
dedupe_key=str(payload.get("dedupe_key") or payload.get("dedupeKey") or "").strip(),
status=str(payload.get("status") or "").strip(),
retry_count=payload.get("retry_count") or payload.get("retryCount") or 0,
created_at=payload.get("created_at") or payload.get("createdAt"),
updated_at=payload.get("updated_at") or payload.get("updatedAt"),
meeting_ref=TeamsMeetingRef.from_dict(meeting_ref_payload) if meeting_ref_payload else None,
selected_artifact_strategy=payload.get("selected_artifact_strategy")
or payload.get("selectedArtifactStrategy"),
summary_payload=TeamsMeetingSummaryPayload.from_dict(summary_payload)
if summary_payload
else None,
error_info=dict(payload.get("error_info") or payload.get("errorInfo") or {}),
)
def to_dict(self) -> dict[str, Any]:
return _clean_dict(
{
"job_id": self.job_id,
"event_id": self.event_id,
"source_event_type": self.source_event_type,
"dedupe_key": self.dedupe_key,
"status": self.status,
"retry_count": self.retry_count,
"created_at": _serialize_datetime(self.created_at),
"updated_at": _serialize_datetime(self.updated_at),
"meeting_ref": self.meeting_ref.to_dict() if self.meeting_ref else None,
"selected_artifact_strategy": self.selected_artifact_strategy,
"summary_payload": self.summary_payload.to_dict() if self.summary_payload else None,
"error_info": self.error_info or None,
}
)
__all__ = [
"ArtifactType",
"GraphSubscription",
"MeetingArtifact",
"TeamsMeetingPipelineJob",
"TeamsMeetingRef",
"TeamsMeetingSummaryPayload",
]

View file

@ -0,0 +1,688 @@
"""Pipeline orchestration for Microsoft Teams meeting summaries."""
from __future__ import annotations
import asyncio
import json
import logging
import os
import shutil
import subprocess
import tempfile
import uuid
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Awaitable, Callable, Optional
import httpx
from agent.auxiliary_client import async_call_llm, extract_content_or_reasoning
from hermes_constants import get_hermes_home
from plugins.teams_pipeline.meetings import (
TeamsMeetingArtifactNotFoundError,
download_recording_artifact,
enrich_meeting_with_call_record,
fetch_preferred_transcript_text,
list_recording_artifacts,
resolve_meeting_reference,
)
from plugins.teams_pipeline.models import (
MeetingArtifact,
TeamsMeetingPipelineJob,
TeamsMeetingRef,
TeamsMeetingSummaryPayload,
)
from plugins.teams_pipeline.store import TeamsPipelineStore
from tools.transcription_tools import transcribe_audio
logger = logging.getLogger(__name__)
TERMINAL_PIPELINE_STATES = {"completed", "failed", "retry_scheduled"}
ACTIVE_PIPELINE_STATES = {
"received",
"resolving_meeting",
"fetching_transcript",
"downloading_recording",
"transcribing_audio",
"summarizing",
"writing_notion",
"writing_linear",
"sending_teams",
}
class TeamsPipelineError(RuntimeError):
"""Base class for Teams meeting pipeline failures."""
class TeamsPipelineRetryableError(TeamsPipelineError):
"""Raised when the pipeline should be retried later."""
class TeamsPipelineSinkError(TeamsPipelineError):
"""Raised when an output sink fails."""
class TeamsPipelineArtifactNotFoundError(TeamsPipelineRetryableError):
"""Raised when meeting artifacts are not yet available."""
TranscribeFn = Callable[[str, Optional[str]], dict[str, Any]]
SummarizeFn = Callable[..., Awaitable[dict[str, Any] | TeamsMeetingSummaryPayload]]
SinkFn = Callable[
[TeamsMeetingSummaryPayload, dict[str, Any], Optional[dict[str, Any]]],
Awaitable[dict[str, Any]],
]
@dataclass
class TeamsPipelineConfig:
transcript_preferred: bool = True
transcript_required: bool = False
transcription_fallback: bool = True
stt_model: str | None = None
ffmpeg_extract_audio: bool = True
transcript_min_chars: int = 80
tmp_dir: Path | None = None
notion: dict[str, Any] | None = None
linear: dict[str, Any] | None = None
teams_delivery: dict[str, Any] | None = None
@classmethod
def from_dict(cls, payload: Optional[dict[str, Any]]) -> "TeamsPipelineConfig":
data = dict(payload or {})
tmp_dir = data.get("tmp_dir") or data.get("tmpDir")
return cls(
transcript_preferred=bool(data.get("transcript_preferred", True)),
transcript_required=bool(data.get("transcript_required", False)),
transcription_fallback=bool(data.get("transcription_fallback", True)),
stt_model=data.get("stt_model") or data.get("sttModel"),
ffmpeg_extract_audio=bool(data.get("ffmpeg_extract_audio", True)),
transcript_min_chars=int(data.get("transcript_min_chars", 80)),
tmp_dir=Path(tmp_dir) if tmp_dir else None,
notion=data.get("notion"),
linear=data.get("linear"),
teams_delivery=data.get("teams_delivery") or data.get("teamsDelivery"),
)
class NotionWriter:
API_BASE = "https://api.notion.com/v1"
API_VERSION = "2025-09-03"
def __init__(self, *, api_key: str | None = None, transport: httpx.AsyncBaseTransport | None = None) -> None:
self.api_key = (api_key or os.getenv("NOTION_API_KEY", "")).strip()
self._transport = transport
async def write_summary(
self,
payload: TeamsMeetingSummaryPayload,
config: dict[str, Any],
existing_record: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
if not self.api_key:
raise TeamsPipelineSinkError("NOTION_API_KEY is not configured.")
database_id = str(config.get("database_id") or config.get("databaseId") or "").strip()
page_id = (existing_record or {}).get("page_id")
if not database_id and not page_id:
raise TeamsPipelineSinkError("Notion sink requires database_id or an existing page_id.")
headers = {
"Authorization": f"Bearer {self.api_key}",
"Notion-Version": self.API_VERSION,
"Content-Type": "application/json",
}
async with httpx.AsyncClient(timeout=30.0, transport=self._transport) as client:
if page_id:
response = await client.patch(
f"{self.API_BASE}/pages/{page_id}",
headers=headers,
json={"properties": self._build_properties(payload, config)},
)
response.raise_for_status()
record = response.json()
else:
response = await client.post(
f"{self.API_BASE}/pages",
headers=headers,
json={
"parent": {"database_id": database_id},
"properties": self._build_properties(payload, config),
"children": self._build_blocks(payload),
},
)
response.raise_for_status()
record = response.json()
return {"page_id": record["id"], "url": record.get("url")}
def _build_properties(self, payload: TeamsMeetingSummaryPayload, config: dict[str, Any]) -> dict[str, Any]:
title_property = config.get("title_property", "Name")
summary_property = config.get("summary_property")
meeting_id_property = config.get("meeting_id_property")
properties: dict[str, Any] = {
title_property: {
"title": [{"text": {"content": payload.title or f"Meeting {payload.meeting_ref.meeting_id}"}}]
}
}
if summary_property:
properties[summary_property] = {
"rich_text": [{"text": {"content": (payload.summary or "")[:1900]}}]
}
if meeting_id_property:
properties[meeting_id_property] = {
"rich_text": [{"text": {"content": payload.meeting_ref.meeting_id}}]
}
return properties
def _build_blocks(self, payload: TeamsMeetingSummaryPayload) -> list[dict[str, Any]]:
sections = [
("Summary", payload.summary or ""),
("Key Decisions", "\n".join(f"- {item}" for item in payload.key_decisions)),
("Action Items", "\n".join(f"- {item}" for item in payload.action_items)),
("Risks", "\n".join(f"- {item}" for item in payload.risks)),
]
blocks: list[dict[str, Any]] = []
for heading, body in sections:
blocks.append(
{
"object": "block",
"type": "heading_2",
"heading_2": {"rich_text": [{"text": {"content": heading}}]},
}
)
blocks.append(
{
"object": "block",
"type": "paragraph",
"paragraph": {"rich_text": [{"text": {"content": body or "None"}}]},
}
)
return blocks
class LinearWriter:
API_URL = "https://api.linear.app/graphql"
def __init__(self, *, api_key: str | None = None, transport: httpx.AsyncBaseTransport | None = None) -> None:
self.api_key = (api_key or os.getenv("LINEAR_API_KEY", "")).strip()
self._transport = transport
async def write_summary(
self,
payload: TeamsMeetingSummaryPayload,
config: dict[str, Any],
existing_record: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
if not self.api_key:
raise TeamsPipelineSinkError("LINEAR_API_KEY is not configured.")
headers = {"Authorization": self.api_key, "Content-Type": "application/json"}
team_id = str(config.get("team_id") or config.get("teamId") or "").strip()
title = payload.title or f"Meeting Summary: {payload.meeting_ref.meeting_id}"
description = _render_summary_markdown(payload)
existing_issue_id = (existing_record or {}).get("issue_id")
async with httpx.AsyncClient(timeout=30.0, transport=self._transport) as client:
if existing_issue_id:
response = await client.post(
self.API_URL,
headers=headers,
json={
"query": (
"mutation($id: String!, $input: IssueUpdateInput!) "
"{ issueUpdate(id: $id, input: $input) { success issue { id identifier url } } }"
),
"variables": {
"id": existing_issue_id,
"input": {"title": title, "description": description},
},
},
)
else:
if not team_id:
raise TeamsPipelineSinkError("Linear sink requires team_id when creating a new issue.")
response = await client.post(
self.API_URL,
headers=headers,
json={
"query": (
"mutation($input: IssueCreateInput!) "
"{ issueCreate(input: $input) { success issue { id identifier url } } }"
),
"variables": {"input": {"teamId": team_id, "title": title, "description": description}},
},
)
response.raise_for_status()
payload_json = response.json()
issue = (
(((payload_json.get("data") or {}).get("issueUpdate") or {}).get("issue"))
or (((payload_json.get("data") or {}).get("issueCreate") or {}).get("issue"))
)
if not isinstance(issue, dict) or not issue.get("id"):
raise TeamsPipelineSinkError(f"Linear write failed: {payload_json}")
return {"issue_id": issue["id"], "identifier": issue.get("identifier"), "url": issue.get("url")}
class TeamsMeetingPipeline:
"""Transcript-first Teams meeting pipeline with durable lifecycle state."""
def __init__(
self,
*,
graph_client: Any,
store: TeamsPipelineStore,
config: TeamsPipelineConfig | dict[str, Any] | None = None,
transcribe_fn: TranscribeFn = transcribe_audio,
summarize_fn: Optional[SummarizeFn] = None,
notion_writer: Optional[NotionWriter] = None,
linear_writer: Optional[LinearWriter] = None,
teams_sender: Optional[SinkFn] = None,
) -> None:
self.graph_client = graph_client
self.store = store
self.config = config if isinstance(config, TeamsPipelineConfig) else TeamsPipelineConfig.from_dict(config)
self.transcribe_fn = transcribe_fn
self.summarize_fn = summarize_fn or self._generate_summary_payload
self.notion_writer = notion_writer
self.linear_writer = linear_writer
self.teams_sender = teams_sender
def create_job_from_notification(self, notification: dict[str, Any]) -> TeamsMeetingPipelineJob:
event_id = TeamsPipelineStore.build_notification_receipt_key(notification)
self.store.record_notification_receipt(event_id, notification)
existing_job = self._find_job_by_dedupe_key(event_id)
if existing_job is not None:
return existing_job
resource_data = notification.get("resourceData") or {}
meeting_id = (
resource_data.get("id")
or notification.get("meetingId")
or _extract_meeting_id_from_resource(str(notification.get("resource") or ""))
or notification.get("resource")
or event_id
)
job = TeamsMeetingPipelineJob(
job_id=f"teams-job-{uuid.uuid4().hex[:12]}",
event_id=event_id,
source_event_type=str(notification.get("changeType") or "graph.notification"),
dedupe_key=event_id,
status="received",
meeting_ref=TeamsMeetingRef(
meeting_id=str(meeting_id),
tenant_id=resource_data.get("tenantId") or notification.get("tenantId"),
metadata={
"notification": dict(notification),
"join_web_url": resource_data.get("joinWebUrl"),
"call_record_id": resource_data.get("callRecordId") or notification.get("callRecordId"),
},
),
)
self.store.upsert_job(job.job_id, job.to_dict())
return job
async def run_notification(self, notification: dict[str, Any]) -> TeamsMeetingPipelineJob:
job = self.create_job_from_notification(notification)
if job.status in TERMINAL_PIPELINE_STATES or job.status in ACTIVE_PIPELINE_STATES - {"received"}:
return job
return await self.run_job(job.job_id)
async def run_job(self, job_or_id: TeamsMeetingPipelineJob | str) -> TeamsMeetingPipelineJob:
job = self._coerce_job(job_or_id)
meeting_ref = job.meeting_ref
if meeting_ref is None:
raise TeamsPipelineError(f"Job {job.job_id} has no meeting_ref.")
artifacts: list[MeetingArtifact] = []
try:
job = self._persist_job(job, status="resolving_meeting")
notification = meeting_ref.metadata.get("notification") if isinstance(meeting_ref.metadata, dict) else {}
resolved_meeting = await resolve_meeting_reference(
self.graph_client,
meeting_id=meeting_ref.meeting_id,
join_web_url=meeting_ref.join_web_url or meeting_ref.metadata.get("join_web_url"),
tenant_id=meeting_ref.tenant_id,
)
job.meeting_ref = resolved_meeting
job = self._persist_job(job, meeting_ref=resolved_meeting.to_dict())
transcript_text: str | None = None
if self.config.transcript_preferred:
job = self._persist_job(job, status="fetching_transcript")
transcript_artifact, transcript_text = await fetch_preferred_transcript_text(
self.graph_client, resolved_meeting
)
if transcript_artifact and transcript_text:
artifacts.append(transcript_artifact)
if len(transcript_text.strip()) < self.config.transcript_min_chars:
transcript_text = None
if not transcript_text:
if self.config.transcript_required:
raise TeamsPipelineRetryableError(
f"Transcript unavailable for meeting {resolved_meeting.meeting_id}."
)
if not self.config.transcription_fallback:
raise TeamsPipelineArtifactNotFoundError(
"No transcript available and transcription fallback disabled "
f"for {resolved_meeting.meeting_id}."
)
job = self._persist_job(job, status="downloading_recording")
recordings = await list_recording_artifacts(self.graph_client, resolved_meeting)
if not recordings:
raise TeamsPipelineRetryableError(
f"Recording unavailable for meeting {resolved_meeting.meeting_id}."
)
recording = recordings[0]
artifacts.append(recording)
transcript_text = await self._transcribe_recording(job, resolved_meeting, recording)
job = self._persist_job(job, selected_artifact_strategy="recording_stt_fallback")
else:
job = self._persist_job(job, selected_artifact_strategy="transcript_first")
call_record_id = notification.get("callRecordId") or (meeting_ref.metadata or {}).get("call_record_id")
call_record = await enrich_meeting_with_call_record(
self.graph_client,
resolved_meeting,
call_record_id=call_record_id,
)
if call_record is not None:
artifacts.append(call_record)
job = self._persist_job(job, status="summarizing")
generated = await self.summarize_fn(
resolved_meeting=resolved_meeting,
transcript_text=transcript_text or "",
artifacts=artifacts,
)
summary_payload = (
generated
if isinstance(generated, TeamsMeetingSummaryPayload)
else TeamsMeetingSummaryPayload.from_dict(generated)
)
job.summary_payload = summary_payload
job = self._persist_job(job, summary_payload=summary_payload.to_dict())
await self._write_sinks(job, summary_payload)
job = self._persist_job(job, status="completed")
return job
except TeamsPipelineRetryableError as exc:
job = self._persist_job(
job,
status="retry_scheduled",
error_info={"message": str(exc), "retryable": True},
)
return job
except Exception as exc:
job = self._persist_job(
job,
status="failed",
error_info={"message": str(exc), "type": type(exc).__name__},
)
return job
def _coerce_job(self, job_or_id: TeamsMeetingPipelineJob | str) -> TeamsMeetingPipelineJob:
if isinstance(job_or_id, TeamsMeetingPipelineJob):
return job_or_id
payload = self.store.get_job(str(job_or_id))
if not payload:
raise TeamsPipelineError(f"Unknown Teams pipeline job: {job_or_id}")
return TeamsMeetingPipelineJob.from_dict(payload)
def _find_job_by_dedupe_key(self, dedupe_key: str) -> TeamsMeetingPipelineJob | None:
for payload in self.store.list_jobs().values():
if not isinstance(payload, dict):
continue
if str(payload.get("dedupe_key") or "") != dedupe_key:
continue
return TeamsMeetingPipelineJob.from_dict(payload)
return None
def _persist_job(self, job: TeamsMeetingPipelineJob, **updates: Any) -> TeamsMeetingPipelineJob:
payload = job.to_dict()
payload.update(updates)
stored = self.store.upsert_job(job.job_id, payload)
return TeamsMeetingPipelineJob.from_dict(stored)
async def _transcribe_recording(
self,
job: TeamsMeetingPipelineJob,
meeting_ref: TeamsMeetingRef,
recording: MeetingArtifact,
) -> str:
temp_root = self.config.tmp_dir or (get_hermes_home() / "tmp" / "teams_pipeline")
temp_root.mkdir(parents=True, exist_ok=True)
with tempfile.TemporaryDirectory(dir=str(temp_root), prefix="teams-recording-") as tmp_dir:
recording_name = recording.display_name or f"{recording.artifact_id}.mp4"
recording_path = Path(tmp_dir) / recording_name
await download_recording_artifact(
self.graph_client,
meeting_ref,
recording,
recording_path,
)
audio_path = await self._prepare_audio_path(recording_path)
job = self._persist_job(job, status="transcribing_audio")
result = await asyncio.to_thread(self.transcribe_fn, str(audio_path), self.config.stt_model)
if not result.get("success"):
raise TeamsPipelineRetryableError(str(result.get("error") or "Unknown STT failure"))
transcript = str(result.get("transcript") or "").strip()
if not transcript:
raise TeamsPipelineRetryableError("STT returned an empty transcript.")
return transcript
async def _prepare_audio_path(self, recording_path: Path) -> Path:
if recording_path.suffix.lower() in {".wav", ".mp3", ".m4a", ".ogg", ".flac", ".aac", ".webm"}:
return recording_path
if not self.config.ffmpeg_extract_audio:
return recording_path
ffmpeg = shutil.which("ffmpeg")
if not ffmpeg:
raise TeamsPipelineRetryableError(
"Recording fallback requires ffmpeg for audio extraction, but ffmpeg was not found."
)
audio_path = recording_path.with_suffix(".wav")
proc = await asyncio.create_subprocess_exec(
ffmpeg,
"-y",
"-i",
str(recording_path),
str(audio_path),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
_stdout, stderr = await proc.communicate()
if proc.returncode != 0:
detail = stderr.decode("utf-8", errors="replace").strip()
raise TeamsPipelineRetryableError(f"ffmpeg audio extraction failed: {detail}")
return audio_path
async def _generate_summary_payload(
self,
*,
resolved_meeting: TeamsMeetingRef,
transcript_text: str,
artifacts: list[MeetingArtifact],
) -> TeamsMeetingSummaryPayload:
prompt = _build_summary_prompt(resolved_meeting, transcript_text, artifacts)
try:
response = await async_call_llm(
task="call",
messages=[
{
"role": "system",
"content": (
"You summarize meeting transcripts. Return only valid JSON with keys: "
"summary, key_decisions, action_items, risks, confidence, confidence_notes."
),
},
{"role": "user", "content": prompt},
],
temperature=0.2,
max_tokens=900,
)
content = extract_content_or_reasoning(response)
parsed = _parse_summary_json(content)
except Exception as exc:
logger.info("Teams pipeline LLM summary unavailable, using heuristic summary: %s", exc)
parsed = _heuristic_summary(transcript_text)
metrics = _collect_call_metrics(artifacts)
return TeamsMeetingSummaryPayload(
meeting_ref=resolved_meeting,
title=str(resolved_meeting.metadata.get("subject") or f"Meeting {resolved_meeting.meeting_id}"),
start_time=resolved_meeting.metadata.get("startDateTime"),
end_time=resolved_meeting.metadata.get("endDateTime"),
participants=_collect_participants(resolved_meeting),
transcript_text=transcript_text,
summary=parsed.get("summary"),
key_decisions=list(parsed.get("key_decisions") or []),
action_items=list(parsed.get("action_items") or []),
risks=list(parsed.get("risks") or []),
call_metrics=metrics,
source_artifacts=artifacts,
confidence=parsed.get("confidence"),
confidence_notes=parsed.get("confidence_notes"),
notion_target=(self.config.notion or {}).get("database_id"),
linear_target=(self.config.linear or {}).get("team_id"),
teams_target=(self.config.teams_delivery or {}).get("channel_id"),
)
async def _write_sinks(self, job: TeamsMeetingPipelineJob, payload: TeamsMeetingSummaryPayload) -> None:
if self.config.notion and self.config.notion.get("enabled") and self.notion_writer:
job = self._persist_job(job, status="writing_notion")
sink_key = f"notion:{payload.meeting_ref.meeting_id}"
existing = self.store.get_sink_record(sink_key)
result = await self.notion_writer.write_summary(payload, self.config.notion, existing)
self.store.upsert_sink_record(sink_key, result)
if self.config.linear and self.config.linear.get("enabled") and self.linear_writer:
job = self._persist_job(job, status="writing_linear")
sink_key = f"linear:{payload.meeting_ref.meeting_id}"
existing = self.store.get_sink_record(sink_key)
result = await self.linear_writer.write_summary(payload, self.config.linear, existing)
self.store.upsert_sink_record(sink_key, result)
if self.config.teams_delivery and self.config.teams_delivery.get("enabled") and self.teams_sender:
job = self._persist_job(job, status="sending_teams")
sink_key = f"teams:{payload.meeting_ref.meeting_id}"
existing = self.store.get_sink_record(sink_key)
if hasattr(self.teams_sender, "write_summary"):
result = await self.teams_sender.write_summary(payload, self.config.teams_delivery, existing)
else:
result = await self.teams_sender(payload, self.config.teams_delivery, existing)
self.store.upsert_sink_record(sink_key, result)
def _collect_call_metrics(artifacts: list[MeetingArtifact]) -> dict[str, Any]:
metrics: dict[str, Any] = {}
for artifact in artifacts:
if artifact.artifact_type == "call_record":
metrics.update(dict(artifact.metadata.get("metrics") or {}))
metrics["artifact_count"] = len(artifacts)
return metrics
def _collect_participants(meeting_ref: TeamsMeetingRef) -> list[str]:
participants = meeting_ref.metadata.get("participants") or []
result: list[str] = []
if isinstance(participants, list):
for item in participants:
if isinstance(item, dict):
name = item.get("displayName") or (((item.get("identity") or {}).get("user") or {}).get("displayName"))
if name:
result.append(str(name))
return result
def _extract_meeting_id_from_resource(resource: str) -> str | None:
if not resource:
return None
parts = [part for part in resource.split("/") if part]
if not parts:
return None
if "onlineMeetings" in parts:
index = parts.index("onlineMeetings")
if index + 1 < len(parts):
return parts[index + 1]
return parts[-1]
def _build_summary_prompt(
meeting_ref: TeamsMeetingRef,
transcript_text: str,
artifacts: list[MeetingArtifact],
) -> str:
artifact_lines = [f"- {artifact.artifact_type}:{artifact.artifact_id}:{artifact.display_name or ''}" for artifact in artifacts]
return (
f"Meeting ID: {meeting_ref.meeting_id}\n"
f"Title: {meeting_ref.metadata.get('subject') or 'Unknown'}\n"
f"Artifacts:\n{chr(10).join(artifact_lines) or '- none'}\n\n"
"Transcript:\n"
f"{transcript_text[:18000]}"
)
def _parse_summary_json(content: str) -> dict[str, Any]:
text = (content or "").strip()
if not text:
return _heuristic_summary("")
start = text.find("{")
end = text.rfind("}")
if start >= 0 and end > start:
text = text[start : end + 1]
payload = json.loads(text)
return {
"summary": str(payload.get("summary") or "").strip(),
"key_decisions": [str(item).strip() for item in payload.get("key_decisions", []) if str(item).strip()],
"action_items": [str(item).strip() for item in payload.get("action_items", []) if str(item).strip()],
"risks": [str(item).strip() for item in payload.get("risks", []) if str(item).strip()],
"confidence": str(payload.get("confidence") or "medium").strip(),
"confidence_notes": str(payload.get("confidence_notes") or "").strip(),
}
def _heuristic_summary(transcript_text: str) -> dict[str, Any]:
lines = [line.strip(" -*\t") for line in transcript_text.splitlines() if line.strip()]
summary = " ".join(lines[:3])[:1200] or "Transcript unavailable or too sparse for a confident summary."
action_items = [
line for line in lines if line.lower().startswith(("action:", "todo:", "next step:", "follow up:"))
][:8]
risks = [line for line in lines if "risk" in line.lower() or "blocker" in line.lower()][:6]
decisions = [line for line in lines if "decide" in line.lower() or "decision" in line.lower()][:6]
confidence = "low" if len(transcript_text.strip()) < 300 else "medium"
return {
"summary": summary,
"key_decisions": decisions,
"action_items": action_items,
"risks": risks,
"confidence": confidence,
"confidence_notes": "Generated with heuristic fallback because no LLM summary response was available.",
}
def _render_summary_markdown(payload: TeamsMeetingSummaryPayload) -> str:
lines = [
f"# {payload.title or f'Meeting {payload.meeting_ref.meeting_id}'}",
"",
"## Summary",
payload.summary or "No summary available.",
"",
"## Key Decisions",
*([f"- {item}" for item in payload.key_decisions] or ["- None"]),
"",
"## Action Items",
*([f"- {item}" for item in payload.action_items] or ["- None"]),
"",
"## Risks",
*([f"- {item}" for item in payload.risks] or ["- None"]),
"",
f"Confidence: {payload.confidence or 'unknown'}",
payload.confidence_notes or "",
]
return "\n".join(lines).strip()

View file

@ -0,0 +1,9 @@
name: teams_pipeline
version: 0.1.0
description: "Microsoft Teams meeting pipeline plugin with durable runtime state and operator CLI flows for Graph-backed transcript-first meeting summaries."
author: NousResearch
kind: standalone
platforms:
- linux
- macos
- windows

View file

@ -0,0 +1,125 @@
"""Gateway runtime wiring for the Teams meeting pipeline plugin."""
from __future__ import annotations
import logging
from typing import Any
from gateway.config import Platform
from plugins.teams_pipeline.pipeline import TeamsMeetingPipeline
from plugins.teams_pipeline.store import TeamsPipelineStore, resolve_teams_pipeline_store_path
from plugins.teams_pipeline.subscriptions import build_graph_client
logger = logging.getLogger(__name__)
def _teams_delivery_is_configured(teams_extra: dict[str, Any], teams_delivery: dict[str, Any]) -> bool:
delivery_mode = str(
teams_delivery.get("mode")
or teams_delivery.get("delivery_mode")
or teams_extra.get("delivery_mode")
or ""
).strip().lower()
if delivery_mode == "incoming_webhook":
return bool(
teams_delivery.get("incoming_webhook_url")
or teams_extra.get("incoming_webhook_url")
)
if delivery_mode == "graph":
chat_id = teams_delivery.get("chat_id") or teams_extra.get("chat_id")
team_id = teams_delivery.get("team_id") or teams_extra.get("team_id")
channel_id = teams_delivery.get("channel_id") or teams_extra.get("channel_id")
return bool(chat_id or (team_id and channel_id))
return False
def build_pipeline_runtime_config(gateway_config: Any) -> dict[str, Any]:
"""Build pipeline config from gateway platform config.
Pipeline-specific knobs live under ``teams.extra.meeting_pipeline`` while
Teams delivery continues to source its target details from the existing
Teams platform config.
"""
teams_config = gateway_config.platforms.get(Platform("teams"))
teams_extra = dict((teams_config.extra or {}) if teams_config else {})
pipeline_config = dict(teams_extra.get("meeting_pipeline") or {})
if teams_config and teams_config.enabled:
teams_delivery = dict(pipeline_config.get("teams_delivery") or {})
delivery_mode = str(teams_extra.get("delivery_mode") or "").strip()
if delivery_mode:
teams_delivery["mode"] = delivery_mode
for key in (
"incoming_webhook_url",
"access_token",
"team_id",
"channel_id",
"chat_id",
):
value = teams_extra.get(key)
if value not in (None, ""):
teams_delivery[key] = value
if teams_delivery:
teams_delivery["enabled"] = _teams_delivery_is_configured(teams_extra, teams_delivery)
pipeline_config["teams_delivery"] = teams_delivery
return pipeline_config
def build_pipeline_runtime(gateway: Any) -> TeamsMeetingPipeline:
teams_sender = None
teams_config = gateway.config.platforms.get(Platform("teams"))
pipeline_config = build_pipeline_runtime_config(gateway.config)
teams_delivery = dict(pipeline_config.get("teams_delivery") or {})
if teams_config and teams_config.enabled and teams_delivery.get("enabled"):
try:
from plugins.platforms.teams.adapter import TeamsSummaryWriter
except ImportError:
logger.debug(
"TeamsSummaryWriter unavailable; Teams outbound delivery remains disabled until the adapter layer is present."
)
else:
teams_sender = TeamsSummaryWriter(platform_config=teams_config)
return TeamsMeetingPipeline(
graph_client=build_graph_client(),
store=TeamsPipelineStore(resolve_teams_pipeline_store_path()),
config=pipeline_config,
teams_sender=teams_sender,
)
def bind_gateway_runtime(gateway: Any) -> bool:
"""Attach the Teams pipeline runtime to the msgraph webhook adapter."""
adapter = gateway.adapters.get(Platform.MSGRAPH_WEBHOOK)
if adapter is None:
return False
if getattr(gateway, "_teams_pipeline_runtime", None) is not None:
return True
try:
runtime = build_pipeline_runtime(gateway)
except Exception as exc:
error_message = str(exc)
gateway._teams_pipeline_runtime_error = error_message
logger.warning(
"Teams pipeline runtime unavailable; leaving webhook scheduler unchanged: %s",
error_message,
)
return False
async def _schedule(notification: dict[str, Any], event: Any) -> None:
await runtime.run_notification(notification)
adapter.set_notification_scheduler(_schedule)
gateway._teams_pipeline_runtime = runtime
gateway._teams_pipeline_runtime_error = None
return True

View file

@ -0,0 +1,193 @@
"""Durable local state for the Teams pipeline plugin."""
from __future__ import annotations
import hashlib
import json
import os
import threading
from copy import deepcopy
from datetime import datetime, timezone
from pathlib import Path
from tempfile import NamedTemporaryFile
from typing import Any, Dict, Optional
from hermes_constants import get_hermes_home
DEFAULT_TEAMS_PIPELINE_STORE_FILENAME = "teams_pipeline_store.json"
def _utc_now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def resolve_teams_pipeline_store_path(path: str | Path | None = None) -> Path:
if path is not None:
explicit = str(path).strip()
if explicit:
return Path(explicit)
env_path = os.getenv("MSGRAPH_WEBHOOK_STORE_PATH", "").strip()
if env_path:
return Path(env_path)
return get_hermes_home() / DEFAULT_TEAMS_PIPELINE_STORE_FILENAME
class TeamsPipelineStore:
"""JSON-backed durable store for Teams pipeline state."""
def __init__(self, path: str | Path):
self.path = Path(path)
self._lock = threading.RLock()
self._state: Dict[str, Dict[str, Any]] = {
"subscriptions": {},
"notification_receipts": {},
"event_timestamps": {},
"jobs": {},
"sink_records": {},
}
self._load()
def _load(self) -> None:
with self._lock:
if not self.path.exists():
return
data = json.loads(self.path.read_text(encoding="utf-8") or "{}")
if not isinstance(data, dict):
return
self._state["subscriptions"] = dict(data.get("subscriptions") or {})
self._state["notification_receipts"] = dict(data.get("notification_receipts") or {})
self._state["event_timestamps"] = dict(data.get("event_timestamps") or {})
self._state["jobs"] = dict(data.get("jobs") or {})
self._state["sink_records"] = dict(data.get("sink_records") or {})
def _persist(self) -> None:
self.path.parent.mkdir(parents=True, exist_ok=True)
with NamedTemporaryFile(
"w",
encoding="utf-8",
dir=str(self.path.parent),
delete=False,
) as tmp:
json.dump(self._state, tmp, indent=2, sort_keys=True)
tmp.flush()
tmp_path = Path(tmp.name)
tmp_path.replace(self.path)
def list_subscriptions(self) -> Dict[str, Dict[str, Any]]:
with self._lock:
return deepcopy(self._state["subscriptions"])
def get_subscription(self, subscription_id: str) -> Optional[Dict[str, Any]]:
with self._lock:
record = self._state["subscriptions"].get(subscription_id)
return deepcopy(record) if isinstance(record, dict) else None
def upsert_subscription(self, subscription_id: str, payload: Dict[str, Any]) -> Dict[str, Any]:
with self._lock:
existing = self._state["subscriptions"].get(subscription_id, {})
merged = {**existing, **deepcopy(payload)}
merged["subscription_id"] = subscription_id
merged.setdefault("created_at", existing.get("created_at") or _utc_now_iso())
merged["updated_at"] = _utc_now_iso()
self._state["subscriptions"][subscription_id] = merged
self._persist()
return deepcopy(merged)
def delete_subscription(self, subscription_id: str) -> bool:
with self._lock:
removed = self._state["subscriptions"].pop(subscription_id, None)
if removed is None:
return False
self._persist()
return True
@classmethod
def build_notification_receipt_key(cls, notification: Dict[str, Any]) -> str:
explicit_id = notification.get("id")
if explicit_id:
return f"id:{explicit_id}"
canonical = json.dumps(notification, sort_keys=True, separators=(",", ":"))
digest = hashlib.sha256(canonical.encode("utf-8")).hexdigest()
return f"sha256:{digest}"
def has_notification_receipt(self, receipt_key: str) -> bool:
with self._lock:
return receipt_key in self._state["notification_receipts"]
def record_notification_receipt(
self,
receipt_key: str,
payload: Optional[Dict[str, Any]] = None,
*,
received_at: Optional[str] = None,
) -> bool:
with self._lock:
if receipt_key in self._state["notification_receipts"]:
return False
self._state["notification_receipts"][receipt_key] = {
"received_at": received_at or _utc_now_iso(),
"payload": deepcopy(payload) if isinstance(payload, dict) else payload,
}
self._persist()
return True
def record_event_timestamp(self, event_key: str, timestamp: Optional[str] = None) -> str:
with self._lock:
value = timestamp or _utc_now_iso()
self._state["event_timestamps"][event_key] = value
self._persist()
return value
def get_event_timestamp(self, event_key: str) -> Optional[str]:
with self._lock:
value = self._state["event_timestamps"].get(event_key)
return str(value) if value is not None else None
def stats(self) -> Dict[str, int]:
with self._lock:
return {
"subscriptions": len(self._state["subscriptions"]),
"notification_receipts": len(self._state["notification_receipts"]),
"event_timestamps": len(self._state["event_timestamps"]),
"jobs": len(self._state["jobs"]),
"sink_records": len(self._state["sink_records"]),
}
def upsert_job(self, job_id: str, payload: Dict[str, Any]) -> Dict[str, Any]:
with self._lock:
existing = self._state["jobs"].get(job_id, {})
merged = {**existing, **deepcopy(payload)}
merged["job_id"] = job_id
merged.setdefault("created_at", existing.get("created_at") or _utc_now_iso())
merged["updated_at"] = _utc_now_iso()
self._state["jobs"][job_id] = merged
self._persist()
return deepcopy(merged)
def get_job(self, job_id: str) -> Optional[Dict[str, Any]]:
with self._lock:
record = self._state["jobs"].get(job_id)
return deepcopy(record) if isinstance(record, dict) else None
def list_jobs(self) -> Dict[str, Dict[str, Any]]:
with self._lock:
return deepcopy(self._state["jobs"])
def upsert_sink_record(self, sink_key: str, payload: Dict[str, Any]) -> Dict[str, Any]:
with self._lock:
existing = self._state["sink_records"].get(sink_key, {})
merged = {**existing, **deepcopy(payload)}
merged["sink_key"] = sink_key
merged.setdefault("created_at", existing.get("created_at") or _utc_now_iso())
merged["updated_at"] = _utc_now_iso()
self._state["sink_records"][sink_key] = merged
self._persist()
return deepcopy(merged)
def get_sink_record(self, sink_key: str) -> Optional[Dict[str, Any]]:
with self._lock:
record = self._state["sink_records"].get(sink_key)
return deepcopy(record) if isinstance(record, dict) else None

View file

@ -0,0 +1,249 @@
"""Microsoft Graph subscription helpers for the Teams pipeline plugin."""
from __future__ import annotations
from datetime import datetime, timedelta, timezone
from typing import Any
from plugins.teams_pipeline.models import GraphSubscription
from plugins.teams_pipeline.store import TeamsPipelineStore, resolve_teams_pipeline_store_path
from tools.microsoft_graph_auth import MicrosoftGraphTokenProvider
from tools.microsoft_graph_client import MicrosoftGraphClient
def build_graph_client() -> MicrosoftGraphClient:
provider = MicrosoftGraphTokenProvider.from_env()
return MicrosoftGraphClient(provider)
def _parse_bool(value: Any, *, default: bool = False) -> bool:
if isinstance(value, bool):
return value
if isinstance(value, str):
lowered = value.strip().lower()
if lowered in {"1", "true", "yes", "on"}:
return True
if lowered in {"0", "false", "no", "off"}:
return False
return default
def _parse_int(value: Any, default: int) -> int:
try:
return int(value)
except (TypeError, ValueError):
return default
def _utc_now() -> datetime:
return datetime.now(timezone.utc)
def _utc_now_iso() -> str:
return _utc_now().replace(microsecond=0).isoformat().replace("+00:00", "Z")
def _parse_datetime(value: Any) -> datetime | None:
if value is None:
return None
text = str(value).strip()
if not text:
return None
if text.endswith("Z"):
text = f"{text[:-1]}+00:00"
parsed = datetime.fromisoformat(text)
if parsed.tzinfo is None:
return parsed.replace(tzinfo=timezone.utc)
return parsed.astimezone(timezone.utc)
def resolve_store_path(path: str | None) -> str:
return str(resolve_teams_pipeline_store_path(path))
def build_store(path: str | None = None) -> TeamsPipelineStore:
return TeamsPipelineStore(resolve_store_path(path))
def sync_graph_subscription_record(
store: TeamsPipelineStore,
subscription_payload: dict[str, Any],
*,
status: str | None = None,
renewed: bool = False,
) -> dict[str, Any]:
normalized = GraphSubscription.from_dict(subscription_payload).to_dict()
expiration = _parse_datetime(normalized.get("expiration_datetime"))
effective_status = status
if effective_status is None:
effective_status = "expired" if expiration and expiration <= _utc_now() else "active"
normalized["status"] = effective_status
if renewed:
normalized["latest_renewal_at"] = _utc_now_iso()
return store.upsert_subscription(normalized["subscription_id"], normalized)
def expected_client_state(raw: str | None = None) -> str | None:
if raw is None:
from os import getenv
raw = getenv("MSGRAPH_WEBHOOK_CLIENT_STATE", "")
value = str(raw or "").strip()
return value or None
def is_managed_subscription(
store: TeamsPipelineStore,
subscription_payload: dict[str, Any],
*,
expected_client_state_value: str | None,
) -> bool:
subscription_id = str(
subscription_payload.get("subscription_id") or subscription_payload.get("id") or ""
).strip()
if subscription_id and store.get_subscription(subscription_id):
return True
if expected_client_state_value:
candidate_state = str(
subscription_payload.get("client_state") or subscription_payload.get("clientState") or ""
).strip()
if candidate_state and candidate_state == expected_client_state_value:
return True
return False
async def maintain_graph_subscriptions(
*,
client: MicrosoftGraphClient,
store: TeamsPipelineStore,
renew_within_hours: int = 24,
extend_hours: int = 24,
dry_run: bool = False,
client_state: str | None = None,
) -> dict[str, Any]:
threshold_hours = max(1, int(renew_within_hours))
extend_hours = max(1, int(extend_hours))
managed_client_state = expected_client_state(client_state)
now = _utc_now()
remote_subscriptions = await client.collect_paginated("/subscriptions")
remote_ids: set[str] = set()
synced = 0
renewed: list[dict[str, Any]] = []
candidates: list[dict[str, Any]] = []
skipped: list[dict[str, Any]] = []
for raw in remote_subscriptions:
if not isinstance(raw, dict):
continue
subscription_id = str(raw.get("id") or "").strip()
if not subscription_id:
continue
managed = is_managed_subscription(
store,
raw,
expected_client_state_value=managed_client_state,
)
if not managed:
skipped.append(
{
"subscription_id": subscription_id,
"reason": "not_managed_by_teams_pipeline",
}
)
continue
remote_ids.add(subscription_id)
try:
sync_graph_subscription_record(store, raw)
synced += 1
except Exception as exc:
skipped.append(
{
"subscription_id": subscription_id,
"reason": f"failed_to_sync_local_store: {exc}",
}
)
continue
expiration = _parse_datetime(raw.get("expirationDateTime"))
if expiration is None:
skipped.append({"subscription_id": subscription_id, "reason": "missing_expiration"})
continue
seconds_until_expiry = int((expiration - now).total_seconds())
if seconds_until_expiry < 0:
store.upsert_subscription(
subscription_id,
{
"status": "expired",
"expiration_datetime": expiration.isoformat().replace("+00:00", "Z"),
},
)
skipped.append(
{
"subscription_id": subscription_id,
"reason": "already_expired",
"expiration_datetime": expiration.isoformat().replace("+00:00", "Z"),
}
)
continue
if seconds_until_expiry > threshold_hours * 3600:
skipped.append(
{
"subscription_id": subscription_id,
"reason": "not_due",
"expires_in_seconds": seconds_until_expiry,
}
)
continue
new_expiration = (max(now, expiration) + timedelta(hours=extend_hours)).replace(
microsecond=0
).isoformat().replace("+00:00", "Z")
candidate = {
"subscription_id": subscription_id,
"resource": raw.get("resource"),
"current_expiration": expiration.isoformat().replace("+00:00", "Z"),
"new_expiration": new_expiration,
}
candidates.append(candidate)
if dry_run:
continue
patched = await client.patch_json(
f"/subscriptions/{subscription_id}",
json_body={"expirationDateTime": new_expiration},
)
merged = {**raw, **(patched or {}), "id": subscription_id, "expirationDateTime": new_expiration}
sync_graph_subscription_record(store, merged, status="active", renewed=True)
renewed.append({**candidate, "result": patched})
for subscription_id in store.list_subscriptions():
if subscription_id in remote_ids:
continue
store.upsert_subscription(
subscription_id,
{
"status": "missing_remote",
"last_seen_missing_remote_at": _utc_now_iso(),
},
)
return {
"success": True,
"dry_run": bool(dry_run),
"store_path": str(store.path),
"remote_subscription_count": len(remote_subscriptions),
"synced_subscription_count": synced,
"candidate_count": len(candidates),
"renewed_count": len(renewed),
"threshold_hours": threshold_hours,
"extend_hours": extend_hours,
"candidates": candidates,
"renewed": renewed,
"skipped": skipped,
}

View file

@ -0,0 +1,185 @@
"""Tests for Teams pipeline runtime wiring into the gateway."""
from __future__ import annotations
import sys
from types import ModuleType
from types import SimpleNamespace
from unittest.mock import MagicMock
from gateway.config import Platform, PlatformConfig
from gateway.run import GatewayRunner
from plugins.teams_pipeline.runtime import (
bind_gateway_runtime,
build_pipeline_runtime,
build_pipeline_runtime_config,
)
def test_gateway_runner_wires_teams_pipeline_runtime(monkeypatch):
runner = GatewayRunner.__new__(GatewayRunner)
runner.adapters = {Platform.MSGRAPH_WEBHOOK: object()}
runner._teams_pipeline_runtime_error = None
calls: list[object] = []
def _bind(gateway_runner):
calls.append(gateway_runner)
return True
monkeypatch.setattr("plugins.teams_pipeline.runtime.bind_gateway_runtime", _bind)
GatewayRunner._wire_teams_pipeline_runtime(runner)
assert calls == [runner]
def test_gateway_runner_skips_wiring_without_msgraph_adapter(monkeypatch):
runner = GatewayRunner.__new__(GatewayRunner)
runner.adapters = {Platform.TELEGRAM: MagicMock()}
runner._teams_pipeline_runtime_error = None
called = False
def _bind(_gateway_runner):
nonlocal called
called = True
return True
monkeypatch.setattr("plugins.teams_pipeline.runtime.bind_gateway_runtime", _bind)
GatewayRunner._wire_teams_pipeline_runtime(runner)
assert called is False
def test_gateway_runner_skips_wiring_when_teams_pipeline_plugin_disabled(monkeypatch):
runner = GatewayRunner.__new__(GatewayRunner)
runner.adapters = {Platform.MSGRAPH_WEBHOOK: object()}
runner._teams_pipeline_runtime_error = None
called = False
def _bind(_gateway_runner):
nonlocal called
called = True
return True
monkeypatch.setattr("plugins.teams_pipeline.runtime.bind_gateway_runtime", _bind)
monkeypatch.setattr(
"gateway.run._load_gateway_config",
lambda: {"plugins": {"enabled": []}},
)
GatewayRunner._wire_teams_pipeline_runtime(runner)
assert called is False
def test_runtime_config_disables_teams_delivery_without_target():
gateway_config = SimpleNamespace(
platforms={
Platform("teams"): PlatformConfig(enabled=True, extra={}),
}
)
config = build_pipeline_runtime_config(gateway_config)
assert "teams_delivery" not in config
def test_build_pipeline_runtime_only_wires_sender_when_delivery_configured(monkeypatch):
gateway = SimpleNamespace(
config=SimpleNamespace(
platforms={
Platform("teams"): PlatformConfig(enabled=True, extra={}),
}
)
)
monkeypatch.setattr(
"plugins.teams_pipeline.runtime.build_graph_client",
lambda: object(),
)
monkeypatch.setattr(
"plugins.teams_pipeline.runtime.resolve_teams_pipeline_store_path",
lambda: "/tmp/teams-pipeline-store.json",
)
monkeypatch.setattr(
"plugins.teams_pipeline.runtime.TeamsPipelineStore",
lambda path: {"path": path},
)
runtime = build_pipeline_runtime(gateway)
assert runtime.teams_sender is None
def test_build_pipeline_runtime_skips_sender_when_adapter_layer_is_unavailable(monkeypatch):
gateway = SimpleNamespace(
config=SimpleNamespace(
platforms={
Platform("teams"): PlatformConfig(
enabled=True,
extra={
"delivery_mode": "graph",
"team_id": "team-1",
"channel_id": "channel-1",
},
),
}
)
)
monkeypatch.setattr(
"plugins.teams_pipeline.runtime.build_graph_client",
lambda: object(),
)
monkeypatch.setattr(
"plugins.teams_pipeline.runtime.resolve_teams_pipeline_store_path",
lambda: "/tmp/teams-pipeline-store.json",
)
monkeypatch.setattr(
"plugins.teams_pipeline.runtime.TeamsPipelineStore",
lambda path: {"path": path},
)
monkeypatch.setitem(
sys.modules,
"plugins.platforms.teams.adapter",
ModuleType("plugins.platforms.teams.adapter"),
)
runtime = build_pipeline_runtime(gateway)
assert runtime.teams_sender is None
def test_bind_gateway_runtime_leaves_scheduler_unchanged_on_failure(monkeypatch):
class FakeAdapter:
def __init__(self):
self.scheduler = None
def set_notification_scheduler(self, scheduler):
self.scheduler = scheduler
gateway = SimpleNamespace(
adapters={Platform.MSGRAPH_WEBHOOK: FakeAdapter()},
config=SimpleNamespace(
platforms={
Platform("teams"): PlatformConfig(enabled=True, extra={}),
}
),
_teams_pipeline_runtime=None,
_teams_pipeline_runtime_error=None,
)
monkeypatch.setattr(
"plugins.teams_pipeline.runtime.build_pipeline_runtime",
lambda _gateway: (_ for _ in ()).throw(RuntimeError("boom")),
)
bound = bind_gateway_runtime(gateway)
assert bound is False
assert gateway.adapters[Platform.MSGRAPH_WEBHOOK].scheduler is None
assert gateway._teams_pipeline_runtime_error == "boom"

View file

@ -0,0 +1,214 @@
"""Tests for the teams_pipeline plugin CLI."""
from __future__ import annotations
import json
from argparse import ArgumentParser, Namespace
from types import SimpleNamespace
import pytest
from plugins.teams_pipeline.cli import register_cli, teams_pipeline_command
from plugins.teams_pipeline.store import TeamsPipelineStore
@pytest.fixture(autouse=True)
def _isolate(tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
def _make_args(**kwargs):
defaults = {
"teams_pipeline_action": None,
"store_path": "",
"status": "",
"limit": 20,
"job_id": "",
"meeting_id": "",
"join_web_url": "",
"tenant_id": "",
"call_record_id": "",
"resource": "",
"notification_url": "",
"change_type": "updated",
"expiration": "",
"client_state": "",
"lifecycle_notification_url": "",
"latest_supported_tls_version": "v1_2",
"subscription_id": "",
"force_refresh": False,
"renew_within_hours": 24,
"extend_hours": 24,
"dry_run": False,
}
defaults.update(kwargs)
return Namespace(**defaults)
def test_register_cli_builds_tree():
parser = ArgumentParser()
register_cli(parser)
args = parser.parse_args(["list"])
assert args.teams_pipeline_action == "list"
def test_list_prints_recent_jobs(capsys, tmp_path):
store = TeamsPipelineStore(tmp_path / "teams_pipeline_store.json")
store.upsert_job(
"job-1",
{
"event_id": "evt-1",
"source_event_type": "updated",
"dedupe_key": "evt-1",
"status": "completed",
"meeting_ref": {"meeting_id": "meeting-1"},
},
)
teams_pipeline_command(
_make_args(
teams_pipeline_action="list",
store_path=str(tmp_path / "teams_pipeline_store.json"),
)
)
out = capsys.readouterr().out
assert "job-1" in out
assert "meeting-1" in out
def test_show_prints_job_json(capsys, tmp_path):
store = TeamsPipelineStore(tmp_path / "teams_pipeline_store.json")
store.upsert_job(
"job-1",
{
"event_id": "evt-1",
"source_event_type": "updated",
"dedupe_key": "evt-1",
"status": "completed",
"meeting_ref": {"meeting_id": "meeting-1"},
},
)
teams_pipeline_command(
_make_args(
teams_pipeline_action="show",
job_id="job-1",
store_path=str(tmp_path / "teams_pipeline_store.json"),
)
)
out = capsys.readouterr().out
payload = json.loads(out)
assert payload["job_id"] == "job-1"
assert payload["meeting_ref"]["meeting_id"] == "meeting-1"
def test_fetch_requires_meeting_identifier(capsys):
teams_pipeline_command(_make_args(teams_pipeline_action="fetch"))
out = capsys.readouterr().out
assert "meeting_id or join_web_url is required" in out
def test_subscriptions_lists_graph_subscriptions(monkeypatch, capsys):
class FakeClient:
async def collect_paginated(self, path):
assert path == "/subscriptions"
return [
{
"id": "sub-1",
"resource": "communications/onlineMeetings/getAllTranscripts",
"changeType": "updated",
"expirationDateTime": "2026-05-05T00:00:00Z",
}
]
monkeypatch.setattr("plugins.teams_pipeline.cli.build_graph_client", lambda: FakeClient())
teams_pipeline_command(_make_args(teams_pipeline_action="subscriptions"))
out = capsys.readouterr().out
assert "sub-1" in out
assert "getAllTranscripts" in out
def test_subscribe_defaults_to_created_for_transcript_resources(monkeypatch, capsys):
captured = {}
class FakeClient:
async def post_json(self, path, json_body=None, headers=None):
captured["path"] = path
captured["json_body"] = json_body
return {
"id": "sub-transcript",
"resource": json_body["resource"],
"changeType": json_body["changeType"],
"notificationUrl": json_body["notificationUrl"],
"expirationDateTime": json_body["expirationDateTime"],
}
monkeypatch.setattr("plugins.teams_pipeline.cli.build_graph_client", lambda: FakeClient())
teams_pipeline_command(
_make_args(
teams_pipeline_action="subscribe",
resource="communications/onlineMeetings/getAllTranscripts",
notification_url="https://example.com/webhooks/msgraph",
change_type="",
)
)
payload = json.loads(capsys.readouterr().out)
assert captured["path"] == "/subscriptions"
assert captured["json_body"]["changeType"] == "created"
assert payload["changeType"] == "created"
def test_token_health_force_refresh(monkeypatch, capsys):
class FakeProvider:
def inspect_token_health(self):
return {"configured": True, "cache_state": "warm"}
async def get_access_token(self, force_refresh=False):
assert force_refresh is True
return "token-123"
monkeypatch.setattr(
"plugins.teams_pipeline.cli.MicrosoftGraphTokenProvider",
SimpleNamespace(from_env=lambda: FakeProvider()),
)
teams_pipeline_command(_make_args(teams_pipeline_action="token-health", force_refresh=True))
payload = json.loads(capsys.readouterr().out)
assert payload["configured"] is True
assert payload["last_refresh_succeeded"] is True
assert payload["access_token_length"] == len("token-123")
def test_validate_accepts_msgraph_credentials_for_graph_delivery(monkeypatch, capsys, tmp_path):
from gateway.config import Platform, PlatformConfig
monkeypatch.setenv("MSGRAPH_TENANT_ID", "tenant")
monkeypatch.setenv("MSGRAPH_CLIENT_ID", "client")
monkeypatch.setenv("MSGRAPH_CLIENT_SECRET", "secret")
gateway_config = SimpleNamespace(
platforms={
Platform.MSGRAPH_WEBHOOK: PlatformConfig(enabled=True, extra={}),
Platform("teams"): PlatformConfig(
enabled=True,
extra={
"delivery_mode": "graph",
"team_id": "team-1",
"channel_id": "channel-1",
},
),
}
)
monkeypatch.setattr(
"plugins.teams_pipeline.cli.load_gateway_config",
lambda: gateway_config,
)
teams_pipeline_command(
_make_args(
teams_pipeline_action="validate",
store_path=str(tmp_path / "teams_pipeline_store.json"),
)
)
payload = json.loads(capsys.readouterr().out)
assert payload["ok"] is True
assert payload["issues"] == []

View file

@ -0,0 +1,437 @@
"""Tests for the Teams pipeline plugin package."""
from __future__ import annotations
import asyncio
from types import SimpleNamespace
from pathlib import Path
from unittest.mock import MagicMock
import pytest
from hermes_cli.plugins import PluginContext, PluginManager, PluginManifest
from gateway.config import GatewayConfig, Platform, PlatformConfig
from plugins.teams_pipeline import register
from plugins.teams_pipeline.pipeline import TeamsMeetingPipeline
from plugins.teams_pipeline.store import TeamsPipelineStore
from plugins.teams_pipeline.models import MeetingArtifact
class FakeGraphClient:
def __init__(self) -> None:
self.downloaded = False
async def _transcript_meeting_resolver(client, *, meeting_id=None, join_web_url=None, tenant_id=None):
from plugins.teams_pipeline.models import TeamsMeetingRef
return TeamsMeetingRef(
meeting_id=str(meeting_id),
tenant_id=tenant_id,
metadata={"subject": "Weekly Sync", "participants": [{"displayName": "Ada"}]},
)
async def _no_call_record(*args, **kwargs):
return None
def test_register_adds_cli_only():
mgr = PluginManager()
manifest = PluginManifest(name="teams_pipeline")
ctx = PluginContext(manifest, mgr)
register(ctx)
assert "teams-pipeline" in mgr._cli_commands
entry = mgr._cli_commands["teams-pipeline"]
assert entry["plugin"] == "teams_pipeline"
assert callable(entry["setup_fn"])
assert callable(entry["handler_fn"])
def test_runtime_config_uses_existing_teams_platform_settings():
from plugins.teams_pipeline.runtime import build_pipeline_runtime_config
gateway_config = GatewayConfig(
platforms={
Platform("teams"): PlatformConfig(
enabled=True,
extra={
"delivery_mode": "graph",
"team_id": "team-1",
"channel_id": "channel-1",
"meeting_pipeline": {
"transcript_min_chars": 120,
"notion": {"enabled": True, "database_id": "db-1"},
},
},
)
}
)
runtime_config = build_pipeline_runtime_config(gateway_config)
assert runtime_config["transcript_min_chars"] == 120
assert runtime_config["notion"]["database_id"] == "db-1"
assert runtime_config["teams_delivery"] == {
"enabled": True,
"mode": "graph",
"team_id": "team-1",
"channel_id": "channel-1",
}
@pytest.mark.anyio
async def test_bind_gateway_runtime_attaches_scheduler(monkeypatch, tmp_path):
from plugins.teams_pipeline import runtime as runtime_module
class FakeAdapter:
def __init__(self) -> None:
self.scheduler = None
def set_notification_scheduler(self, scheduler) -> None:
self.scheduler = scheduler
class FakePipeline:
def __init__(self) -> None:
self.notifications = []
async def run_notification(self, notification):
self.notifications.append(notification)
adapter = FakeAdapter()
pipeline = FakePipeline()
gateway = SimpleNamespace(
adapters={Platform.MSGRAPH_WEBHOOK: adapter},
config=GatewayConfig(platforms={}),
_teams_pipeline_runtime=None,
_teams_pipeline_runtime_error=None,
)
monkeypatch.setattr(runtime_module, "build_pipeline_runtime", lambda gateway_runner: pipeline)
bound = runtime_module.bind_gateway_runtime(gateway)
assert bound is True
assert gateway._teams_pipeline_runtime is pipeline
assert callable(adapter.scheduler)
notification = {"id": "notif-1"}
await adapter.scheduler(notification, object())
assert pipeline.notifications == [notification]
@pytest.mark.anyio
async def test_bind_gateway_runtime_drops_notifications_when_unavailable(monkeypatch):
from plugins.teams_pipeline import runtime as runtime_module
from tools.microsoft_graph_auth import MicrosoftGraphConfigError
class FakeAdapter:
def __init__(self) -> None:
self.scheduler = None
def set_notification_scheduler(self, scheduler) -> None:
self.scheduler = scheduler
adapter = FakeAdapter()
gateway = SimpleNamespace(
adapters={Platform.MSGRAPH_WEBHOOK: adapter},
config=GatewayConfig(platforms={}),
_teams_pipeline_runtime=None,
_teams_pipeline_runtime_error=None,
)
def _raise(_gateway_runner):
raise MicrosoftGraphConfigError("missing graph env")
monkeypatch.setattr(runtime_module, "build_pipeline_runtime", _raise)
bound = runtime_module.bind_gateway_runtime(gateway)
assert bound is False
assert "missing graph env" in gateway._teams_pipeline_runtime_error
assert callable(adapter.scheduler)
await adapter.scheduler({"id": "notif-2"}, object())
def test_store_persists_subscription_event_and_job_state(tmp_path):
store_path = tmp_path / "teams-store.json"
store = TeamsPipelineStore(store_path)
store.upsert_subscription(
"sub-1",
{"client_state": "abc", "resource": "communications/onlineMeetings"},
)
store.record_event_timestamp("evt-1", "2026-05-03T19:30:00Z")
store.upsert_job("job-1", {"status": "received", "event_id": "evt-1"})
store.upsert_sink_record("notion:meeting-1", {"page_id": "page-1"})
reloaded = TeamsPipelineStore(store_path)
subscription = reloaded.get_subscription("sub-1")
job = reloaded.get_job("job-1")
sink = reloaded.get_sink_record("notion:meeting-1")
assert subscription is not None
assert subscription["subscription_id"] == "sub-1"
assert subscription["client_state"] == "abc"
assert reloaded.get_event_timestamp("evt-1") == "2026-05-03T19:30:00Z"
assert job is not None
assert job["status"] == "received"
assert sink is not None
assert sink["page_id"] == "page-1"
def test_store_notification_receipts_are_idempotent(tmp_path):
store = TeamsPipelineStore(tmp_path / "teams-store.json")
notification = {
"subscriptionId": "sub-1",
"resource": "communications/onlineMeetings/meeting-1",
"changeType": "updated",
}
receipt_key = TeamsPipelineStore.build_notification_receipt_key(notification)
assert store.record_notification_receipt(receipt_key, notification) is True
assert store.record_notification_receipt(receipt_key, notification) is False
assert store.has_notification_receipt(receipt_key) is True
reloaded = TeamsPipelineStore(tmp_path / "teams-store.json")
assert reloaded.has_notification_receipt(receipt_key) is True
@pytest.mark.anyio
class TestTeamsMeetingPipeline:
async def test_transcript_first_path_persists_state_and_skips_recording(self, tmp_path, monkeypatch):
from plugins.teams_pipeline import pipeline as pipeline_module
monkeypatch.setattr(pipeline_module, "resolve_meeting_reference", _transcript_meeting_resolver)
async def _fetch_transcript(client, meeting_ref):
return (
MeetingArtifact(artifact_type="transcript", artifact_id="tx-1", display_name="meeting.vtt"),
"Action: Send draft by Friday.\nDecision: Ship the transcript-first path.\nDetailed transcript content.",
)
async def _call_record(client, meeting_ref, *, call_record_id=None, allow_permission_errors=True):
return MeetingArtifact(
artifact_type="call_record",
artifact_id="call-1",
metadata={"metrics": {"participant_count": 4}},
)
async def _summarize(**kwargs):
return pipeline_module.TeamsMeetingSummaryPayload(
meeting_ref=kwargs["resolved_meeting"],
title="Weekly Sync",
transcript_text=kwargs["transcript_text"],
summary="Short summary",
key_decisions=["Ship the transcript-first path."],
action_items=["Send draft by Friday."],
risks=["Timeline risk."],
confidence="high",
confidence_notes="Transcript available.",
source_artifacts=kwargs["artifacts"],
)
monkeypatch.setattr(pipeline_module, "fetch_preferred_transcript_text", _fetch_transcript)
monkeypatch.setattr(pipeline_module, "enrich_meeting_with_call_record", _call_record)
store = TeamsPipelineStore(tmp_path / "teams-store.json")
pipeline = TeamsMeetingPipeline(
graph_client=FakeGraphClient(),
store=store,
config={"transcript_min_chars": 20},
summarize_fn=_summarize,
)
job = await pipeline.run_notification(
{
"id": "notif-1",
"changeType": "updated",
"resource": "communications/onlineMeetings/meeting-123",
"resourceData": {"id": "meeting-123"},
}
)
assert job.status == "completed"
assert job.selected_artifact_strategy == "transcript_first"
assert job.summary_payload is not None
assert job.summary_payload.summary == "Short summary"
stored = store.get_job(job.job_id)
assert stored is not None
assert stored["status"] == "completed"
async def test_recording_fallback_uses_stt_and_updates_sink_records(self, tmp_path, monkeypatch):
from plugins.teams_pipeline import pipeline as pipeline_module
monkeypatch.setattr(pipeline_module, "resolve_meeting_reference", _transcript_meeting_resolver)
async def _no_transcript(client, meeting_ref):
return None, None
async def _recordings(client, meeting_ref):
return [
MeetingArtifact(
artifact_type="recording",
artifact_id="rec-1",
display_name="recording.mp4",
download_url="https://files.example/recording.mp4",
)
]
async def _download(client, meeting_ref, recording, destination):
target = Path(destination)
target.write_bytes(b"video-bytes")
return {"path": str(target), "size_bytes": 11, "content_type": "video/mp4"}
async def _prepare_audio(self, recording_path):
audio_path = recording_path.with_suffix(".wav")
audio_path.write_bytes(b"audio-bytes")
return audio_path
def _transcribe(file_path, model):
return {"success": True, "transcript": "Action: Follow up with Legal.\nRisk: Budget approval pending.", "provider": "local"}
async def _summarize(**kwargs):
return pipeline_module.TeamsMeetingSummaryPayload(
meeting_ref=kwargs["resolved_meeting"],
title="Weekly Sync",
transcript_text=kwargs["transcript_text"],
summary="Fallback summary",
key_decisions=[],
action_items=["Follow up with Legal."],
risks=["Budget approval pending."],
confidence="medium",
confidence_notes="Generated from STT fallback.",
source_artifacts=kwargs["artifacts"],
)
class FakeNotionWriter:
async def write_summary(self, payload, config, existing_record=None):
return {"page_id": existing_record.get("page_id") if existing_record else "page-1", "url": "https://notion.so/page-1"}
async def _teams_sender(payload, config, existing_record=None):
return {"message_id": existing_record.get("message_id") if existing_record else "msg-1"}
monkeypatch.setattr(pipeline_module, "fetch_preferred_transcript_text", _no_transcript)
monkeypatch.setattr(pipeline_module, "list_recording_artifacts", _recordings)
monkeypatch.setattr(pipeline_module, "download_recording_artifact", _download)
monkeypatch.setattr(pipeline_module.TeamsMeetingPipeline, "_prepare_audio_path", _prepare_audio)
monkeypatch.setattr(pipeline_module, "enrich_meeting_with_call_record", _no_call_record)
store = TeamsPipelineStore(tmp_path / "teams-store.json")
pipeline = TeamsMeetingPipeline(
graph_client=FakeGraphClient(),
store=store,
config={
"notion": {"enabled": True, "database_id": "db-1"},
"teams_delivery": {"enabled": True, "channel_id": "channel-1"},
},
transcribe_fn=_transcribe,
summarize_fn=_summarize,
notion_writer=FakeNotionWriter(),
teams_sender=_teams_sender,
)
job = await pipeline.run_notification(
{
"id": "notif-2",
"changeType": "updated",
"resource": "communications/onlineMeetings/meeting-456",
"resourceData": {"id": "meeting-456"},
}
)
assert job.status == "completed"
assert job.selected_artifact_strategy == "recording_stt_fallback"
assert job.summary_payload is not None
assert job.summary_payload.summary == "Fallback summary"
notion_record = store.get_sink_record("notion:meeting-456")
teams_record = store.get_sink_record("teams:meeting-456")
assert notion_record is not None
assert notion_record["page_id"] == "page-1"
assert teams_record is not None
assert teams_record["message_id"] == "msg-1"
async def test_missing_transcript_and_recording_schedules_retry(self, tmp_path, monkeypatch):
from plugins.teams_pipeline import pipeline as pipeline_module
monkeypatch.setattr(pipeline_module, "resolve_meeting_reference", _transcript_meeting_resolver)
monkeypatch.setattr(pipeline_module, "fetch_preferred_transcript_text", lambda *a, **kw: asyncio.sleep(0, result=(None, None)))
monkeypatch.setattr(pipeline_module, "list_recording_artifacts", lambda *a, **kw: asyncio.sleep(0, result=[]))
store = TeamsPipelineStore(tmp_path / "teams-store.json")
pipeline = TeamsMeetingPipeline(
graph_client=FakeGraphClient(),
store=store,
config={},
summarize_fn=lambda **kwargs: asyncio.sleep(0, result=None),
)
job = await pipeline.run_notification(
{
"id": "notif-3",
"changeType": "updated",
"resource": "communications/onlineMeetings/meeting-789",
"resourceData": {"id": "meeting-789"},
}
)
assert job.status == "retry_scheduled"
assert job.error_info["retryable"] is True
assert "Recording unavailable" in job.error_info["message"]
async def test_duplicate_notification_reuses_completed_job(self, tmp_path, monkeypatch):
from plugins.teams_pipeline import pipeline as pipeline_module
monkeypatch.setattr(pipeline_module, "resolve_meeting_reference", _transcript_meeting_resolver)
async def _fetch_transcript(client, meeting_ref):
return (
MeetingArtifact(artifact_type="transcript", artifact_id="tx-dup", display_name="meeting.vtt"),
"Decision: Keep duplicate notifications idempotent.\nAction: Verify the cached job is reused.",
)
summarize_calls = 0
async def _summarize(**kwargs):
nonlocal summarize_calls
summarize_calls += 1
return pipeline_module.TeamsMeetingSummaryPayload(
meeting_ref=kwargs["resolved_meeting"],
title="Weekly Sync",
transcript_text=kwargs["transcript_text"],
summary="Duplicate-safe summary",
key_decisions=["Keep duplicate notifications idempotent."],
action_items=["Verify the cached job is reused."],
confidence="high",
confidence_notes="Transcript available.",
source_artifacts=kwargs["artifacts"],
)
monkeypatch.setattr(pipeline_module, "fetch_preferred_transcript_text", _fetch_transcript)
monkeypatch.setattr(pipeline_module, "enrich_meeting_with_call_record", _no_call_record)
store = TeamsPipelineStore(tmp_path / "teams-store.json")
pipeline = TeamsMeetingPipeline(
graph_client=FakeGraphClient(),
store=store,
config={"transcript_min_chars": 20},
summarize_fn=_summarize,
)
notification = {
"id": "notif-dup",
"changeType": "updated",
"resource": "communications/onlineMeetings/meeting-dup",
"resourceData": {"id": "meeting-dup"},
}
first_job = await pipeline.run_notification(notification)
second_job = await pipeline.run_notification(notification)
assert first_job.status == "completed"
assert second_job.status == "completed"
assert second_job.job_id == first_job.job_id
assert summarize_calls == 1
assert len(store.list_jobs()) == 1
receipt_key = TeamsPipelineStore.build_notification_receipt_key(notification)
assert store.has_notification_receipt(receipt_key) is True