diff --git a/gateway/run.py b/gateway/run.py index 53482a7a74..69c8793f22 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -847,6 +847,15 @@ def _platform_config_key(platform: "Platform") -> str: return "cli" if platform == Platform.LOCAL else platform.value +def _teams_pipeline_plugin_enabled() -> bool: + """Return True when the standalone Teams pipeline plugin is enabled.""" + config = _load_gateway_config() + enabled = cfg_get(config, "plugins", "enabled", default=[]) + if not isinstance(enabled, list): + return False + return "teams_pipeline" in enabled or "teams-pipeline" in enabled + + def _load_gateway_config() -> dict: """Load and parse ~/.hermes/config.yaml, returning {} on any error. @@ -1154,6 +1163,9 @@ class GatewayRunner: # Per-session reasoning effort overrides from /reasoning. # Key: session_key, Value: parsed reasoning config dict. self._session_reasoning_overrides: Dict[str, Dict[str, Any]] = {} + # Teams meeting pipeline runtime (bound later when msgraph_webhook adapter exists). + self._teams_pipeline_runtime = None + self._teams_pipeline_runtime_error: Optional[str] = None # Track pending exec approvals per session # Key: session_key, Value: {"command": str, "pattern_key": str, ...} self._pending_approvals: Dict[str, Dict[str, Any]] = {} @@ -1251,6 +1263,37 @@ class GatewayRunner: self._background_tasks: set = set() + def _wire_teams_pipeline_runtime(self) -> None: + """Bind the Teams meeting pipeline runtime to Graph webhook ingress. + + No-op when the msgraph_webhook adapter isn't running or the + teams_pipeline plugin isn't enabled — lets the gateway start cleanly + whether or not the user has opted into the pipeline. + """ + if Platform.MSGRAPH_WEBHOOK not in self.adapters: + return + if not _teams_pipeline_plugin_enabled(): + logger.debug("Teams pipeline plugin is disabled; skipping runtime wiring") + return + try: + from plugins.teams_pipeline.runtime import bind_gateway_runtime + except Exception as exc: + logger.warning("Teams pipeline runtime import failed: %s", exc) + return + try: + bound = bind_gateway_runtime(self) + except Exception as exc: + logger.warning("Teams pipeline runtime wiring failed: %s", exc) + return + if bound: + logger.info("Teams pipeline runtime bound to msgraph webhook ingress") + elif self._teams_pipeline_runtime_error: + logger.warning( + "Teams pipeline runtime unavailable: %s", + self._teams_pipeline_runtime_error, + ) + + def _warn_if_docker_media_delivery_is_risky(self) -> None: """Warn when Docker-backed gateways lack an explicit export mount. @@ -3304,7 +3347,8 @@ class GatewayRunner: # Update delivery router with adapters self.delivery_router.adapters = self.adapters - + self._wire_teams_pipeline_runtime() + self._running = True self._update_runtime_status("running") diff --git a/hermes_cli/main.py b/hermes_cli/main.py index 8ac6fe3a43..a35c53bb07 100644 --- a/hermes_cli/main.py +++ b/hermes_cli/main.py @@ -10052,7 +10052,9 @@ Examples: # ========================================================================= try: from plugins.memory import discover_plugin_cli_commands + from hermes_cli.plugins import discover_plugins, get_plugin_manager + seen_plugin_commands = set() for cmd_info in discover_plugin_cli_commands(): plugin_parser = subparsers.add_parser( cmd_info["name"], @@ -10061,6 +10063,23 @@ Examples: formatter_class=__import__("argparse").RawDescriptionHelpFormatter, ) cmd_info["setup_fn"](plugin_parser) + if cmd_info.get("handler_fn") is not None: + plugin_parser.set_defaults(func=cmd_info["handler_fn"]) + seen_plugin_commands.add(cmd_info["name"]) + + discover_plugins() + for cmd_info in get_plugin_manager()._cli_commands.values(): + if cmd_info["name"] in seen_plugin_commands: + continue + plugin_parser = subparsers.add_parser( + cmd_info["name"], + help=cmd_info["help"], + description=cmd_info.get("description", ""), + formatter_class=__import__("argparse").RawDescriptionHelpFormatter, + ) + cmd_info["setup_fn"](plugin_parser) + if cmd_info.get("handler_fn") is not None: + plugin_parser.set_defaults(func=cmd_info["handler_fn"]) except Exception as _exc: logging.getLogger(__name__).debug("Plugin CLI discovery failed: %s", _exc) diff --git a/plugins/teams_pipeline/__init__.py b/plugins/teams_pipeline/__init__.py new file mode 100644 index 0000000000..75d631fa41 --- /dev/null +++ b/plugins/teams_pipeline/__init__.py @@ -0,0 +1,23 @@ +"""Teams meeting pipeline plugin. + +Registers only operator-facing CLI surfaces. The agent should invoke these via +the terminal tool; no model tools are added by this plugin. +""" + +from __future__ import annotations + +from plugins.teams_pipeline.cli import register_cli, teams_pipeline_command + + +def register(ctx) -> None: + ctx.register_cli_command( + name="teams-pipeline", + help="Inspect and operate the Microsoft Teams meeting pipeline", + setup_fn=register_cli, + handler_fn=teams_pipeline_command, + description=( + "Operator CLI for the Microsoft Teams meeting pipeline. " + "Lists jobs, inspects stored runs, replays jobs, validates Graph " + "setup, and maintains Graph subscriptions." + ), + ) diff --git a/plugins/teams_pipeline/cli.py b/plugins/teams_pipeline/cli.py new file mode 100644 index 0000000000..0e1114e3e7 --- /dev/null +++ b/plugins/teams_pipeline/cli.py @@ -0,0 +1,462 @@ +"""CLI commands for the Teams meeting pipeline plugin.""" + +from __future__ import annotations + +import argparse +import asyncio +import json +import os +from datetime import datetime, timedelta, timezone +from pathlib import Path +from typing import Any + +from hermes_constants import display_hermes_home +from gateway.config import Platform, load_gateway_config +from plugins.teams_pipeline.meetings import ( + enrich_meeting_with_call_record, + fetch_preferred_transcript_text, + list_recording_artifacts, + resolve_meeting_reference, +) +from plugins.teams_pipeline.models import GraphSubscription +from plugins.teams_pipeline.pipeline import TeamsMeetingPipeline +from plugins.teams_pipeline.store import TeamsPipelineStore, resolve_teams_pipeline_store_path +from plugins.teams_pipeline.subscriptions import ( + build_graph_client, + maintain_graph_subscriptions, + sync_graph_subscription_record, +) +from tools.microsoft_graph_auth import MicrosoftGraphConfigError, MicrosoftGraphTokenProvider + + +def register_cli(subparser: argparse.ArgumentParser) -> None: + subs = subparser.add_subparsers(dest="teams_pipeline_action") + + list_p = subs.add_parser("list", aliases=["ls"], help="List recent Teams pipeline jobs") + list_p.add_argument("--limit", type=int, default=20) + list_p.add_argument("--status", default="") + list_p.add_argument("--store-path", default="") + + show_p = subs.add_parser("show", help="Show a stored Teams pipeline job") + show_p.add_argument("job_id") + show_p.add_argument("--store-path", default="") + + run_p = subs.add_parser("run", aliases=["replay"], help="Replay a stored Teams pipeline job") + run_p.add_argument("job_id") + run_p.add_argument("--store-path", default="") + + fetch_p = subs.add_parser("fetch", aliases=["test"], help="Dry-run meeting artifact resolution") + fetch_p.add_argument("--meeting-id", default="") + fetch_p.add_argument("--join-web-url", default="") + fetch_p.add_argument("--tenant-id", default="") + fetch_p.add_argument("--call-record-id", default="") + + subs_p = subs.add_parser("subscriptions", aliases=["subs"], help="List Graph subscriptions") + subs_p.add_argument("--store-path", default="") + + sub_p = subs.add_parser("subscribe", help="Create a Microsoft Graph subscription") + sub_p.add_argument("--resource", required=True) + sub_p.add_argument("--notification-url", required=True) + sub_p.add_argument("--change-type", default="") + sub_p.add_argument("--expiration", default="") + sub_p.add_argument("--client-state", default="") + sub_p.add_argument("--lifecycle-notification-url", default="") + sub_p.add_argument("--latest-supported-tls-version", default="v1_2") + sub_p.add_argument("--store-path", default="") + + renew_p = subs.add_parser("renew-subscription", help="Renew a Microsoft Graph subscription") + renew_p.add_argument("subscription_id") + renew_p.add_argument("--expiration", required=True) + renew_p.add_argument("--store-path", default="") + + delete_p = subs.add_parser("delete-subscription", help="Delete a Microsoft Graph subscription") + delete_p.add_argument("subscription_id") + delete_p.add_argument("--store-path", default="") + + maintain_p = subs.add_parser("maintain-subscriptions", help="Renew near-expiry managed subscriptions") + maintain_p.add_argument("--renew-within-hours", type=int, default=24) + maintain_p.add_argument("--extend-hours", type=int, default=24) + maintain_p.add_argument("--dry-run", action="store_true") + maintain_p.add_argument("--store-path", default="") + maintain_p.add_argument("--client-state", default="") + + token_p = subs.add_parser("token-health", aliases=["token"], help="Inspect Graph token health") + token_p.add_argument("--force-refresh", action="store_true") + + validate_p = subs.add_parser("validate", help="Validate Teams pipeline configuration snapshot") + validate_p.add_argument("--store-path", default="") + + subparser.set_defaults(func=teams_pipeline_command) + + +def teams_pipeline_command(args: argparse.Namespace) -> int: + action = getattr(args, "teams_pipeline_action", None) + if not action: + print( + "Usage: hermes teams-pipeline " + "{list|show|run|fetch|subscriptions|subscribe|renew-subscription|delete-subscription|maintain-subscriptions|token-health|validate}" + ) + return 2 + + try: + if action in ("list", "ls"): + _cmd_list(args) + elif action == "show": + _cmd_show(args) + elif action in ("run", "replay"): + _cmd_run(args) + elif action in ("fetch", "test"): + _cmd_fetch(args) + elif action in ("subscriptions", "subs"): + _cmd_subscriptions(args) + elif action == "subscribe": + _cmd_subscribe(args) + elif action == "renew-subscription": + _cmd_renew_subscription(args) + elif action == "delete-subscription": + _cmd_delete_subscription(args) + elif action == "maintain-subscriptions": + _cmd_maintain_subscriptions(args) + elif action in ("token-health", "token"): + _cmd_token_health(args) + elif action == "validate": + _cmd_validate(args) + else: + print(f"Unknown teams-pipeline action: {action}") + return 2 + return 0 + except MicrosoftGraphConfigError: + print(_graph_setup_hint()) + return 1 + + +def _run_async(coro): + return asyncio.run(coro) + + +def _store_path(path_arg: str | None) -> Path: + return resolve_teams_pipeline_store_path(path_arg) + + +def _graph_setup_hint() -> str: + return f""" + Microsoft Graph is not configured. Add these to {display_hermes_home()}/.env: + + MSGRAPH_TENANT_ID=... + MSGRAPH_CLIENT_ID=... + MSGRAPH_CLIENT_SECRET=... + + Then restart the gateway or rerun this command. +""" + + +def _iso_utc_timestamp(hours_from_now: int) -> str: + return (datetime.now(timezone.utc) + timedelta(hours=hours_from_now)).replace( + microsecond=0 + ).isoformat().replace("+00:00", "Z") + + +def _default_change_type_for_resource(resource: str) -> str: + normalized = str(resource or "").strip().lower() + if normalized.startswith("communications/onlinemeetings/getalltranscripts"): + return "created" + if normalized.startswith("communications/onlinemeetings/getallrecordings"): + return "created" + if normalized.startswith("communications/callrecords"): + return "created" + return "updated" + + +def _compact_job(job: dict) -> dict: + payload = dict(job) + summary = dict(payload.get("summary_payload") or {}) + transcript = summary.pop("transcript_text", None) + if transcript: + summary["transcript_preview"] = str(transcript)[:240] + payload["summary_payload"] = summary or None + return payload + + +def _sync_subscription_record( + store: TeamsPipelineStore, + subscription_payload: dict[str, Any], + *, + status: str = "active", + renewed: bool = False, +) -> dict[str, Any]: + normalized = GraphSubscription.from_dict(subscription_payload).to_dict() + normalized["status"] = status + if renewed: + normalized["latest_renewal_at"] = _iso_utc_timestamp(0) + return store.upsert_subscription(normalized["subscription_id"], normalized) + + +def _validate_configuration_snapshot(store: TeamsPipelineStore) -> dict[str, Any]: + env = os.environ + issues: list[str] = [] + warnings: list[str] = [] + gateway_config = load_gateway_config() + webhook_config = gateway_config.platforms.get(Platform.MSGRAPH_WEBHOOK) + teams_config = gateway_config.platforms.get(Platform("teams")) + + graph = { + "tenant_id": bool(env.get("MSGRAPH_TENANT_ID")), + "client_id": bool(env.get("MSGRAPH_CLIENT_ID")), + "client_secret": bool(env.get("MSGRAPH_CLIENT_SECRET")), + } + webhook_enabled = bool(webhook_config and webhook_config.enabled) + teams_enabled = bool(teams_config and teams_config.enabled) + teams_extra = dict((teams_config.extra or {}) if teams_config else {}) + teams_mode = str(teams_extra.get("delivery_mode") or "").strip() or None + + if not all(graph.values()): + issues.append("Microsoft Graph app-only credentials are incomplete.") + if not webhook_enabled: + issues.append("MSGRAPH_WEBHOOK_ENABLED is not enabled.") + if not teams_enabled: + warnings.append("Teams outbound delivery is disabled.") + elif teams_mode == "incoming_webhook": + if not teams_extra.get("incoming_webhook_url"): + issues.append("TEAMS_INCOMING_WEBHOOK_URL is required for incoming_webhook mode.") + elif teams_mode == "graph": + missing: list[str] = [] + has_graph_delivery_token = bool( + (teams_config.token if teams_config else "") or teams_extra.get("access_token") + ) + has_graph_app_credentials = all(graph.values()) + if not has_graph_delivery_token and not has_graph_app_credentials: + missing.append( + "TEAMS_GRAPH_ACCESS_TOKEN or complete MSGRAPH_* app credentials" + ) + if not teams_extra.get("team_id"): + missing.append("TEAMS_TEAM_ID") + channel_id = teams_extra.get("channel_id") or teams_extra.get("chat_id") + if not channel_id and not (teams_config and teams_config.home_channel): + missing.append("TEAMS_CHANNEL_ID") + for key in missing: + issues.append(f"{key} is required for graph delivery mode.") + else: + warnings.append("TEAMS_DELIVERY_MODE is not set.") + + return { + "ok": not issues, + "issues": issues, + "warnings": warnings, + "graph_config": graph, + "webhook_enabled": webhook_enabled, + "teams_enabled": teams_enabled, + "teams_delivery_mode": teams_mode, + "store_path": str(store.path), + "store_stats": store.stats(), + } + + +def _cmd_list(args) -> None: + store = TeamsPipelineStore(_store_path(getattr(args, "store_path", None))) + jobs = list(store.list_jobs().values()) + status = str(getattr(args, "status", "") or "").strip().lower() + if status: + jobs = [job for job in jobs if str(job.get("status") or "").lower() == status] + jobs.sort(key=lambda item: str((item or {}).get("updated_at") or ""), reverse=True) + limit = max(1, min(int(getattr(args, "limit", 20) or 20), 100)) + jobs = jobs[:limit] + + if not jobs: + print("No Teams meeting pipeline jobs found.") + return + + print(f"\n{len(jobs)} Teams pipeline job(s):\n") + for job in jobs: + meeting_id = ((job.get("meeting_ref") or {}).get("meeting_id") or "unknown") + print(f" ◆ {job.get('job_id')}") + print(f" status: {job.get('status')}") + print(f" meeting: {meeting_id}") + if job.get("selected_artifact_strategy"): + print(f" strategy: {job.get('selected_artifact_strategy')}") + if job.get("updated_at"): + print(f" updated: {job.get('updated_at')}") + if job.get("error_info"): + print(f" error: {job.get('error_info')}") + print() + + +def _cmd_show(args) -> None: + job_id = str(getattr(args, "job_id", "") or "").strip() + if not job_id: + print("job_id is required") + return + store = TeamsPipelineStore(_store_path(getattr(args, "store_path", None))) + job = store.get_job(job_id) + if not job: + print(f"Unknown job: {job_id}") + return + print(json.dumps(_compact_job(job), indent=2, sort_keys=True)) + + +def _cmd_run(args) -> None: + job_id = str(getattr(args, "job_id", "") or "").strip() + if not job_id: + print("job_id is required") + return + store = TeamsPipelineStore(_store_path(getattr(args, "store_path", None))) + pipeline = TeamsMeetingPipeline(graph_client=build_graph_client(), store=store, config={}) + result = _run_async(pipeline.run_job(job_id)) + print(json.dumps(_compact_job(result.to_dict()), indent=2, sort_keys=True)) + + +def _cmd_fetch(args) -> None: + meeting_id = str(getattr(args, "meeting_id", "") or "").strip() or None + join_web_url = str(getattr(args, "join_web_url", "") or "").strip() or None + tenant_id = str(getattr(args, "tenant_id", "") or "").strip() or None + call_record_id = str(getattr(args, "call_record_id", "") or "").strip() or None + if not meeting_id and not join_web_url: + print("meeting_id or join_web_url is required") + return + + client = build_graph_client() + meeting_ref = _run_async( + resolve_meeting_reference( + client, + meeting_id=meeting_id, + join_web_url=join_web_url, + tenant_id=tenant_id, + ) + ) + transcript_artifact, transcript_text = _run_async(fetch_preferred_transcript_text(client, meeting_ref)) + recordings = _run_async(list_recording_artifacts(client, meeting_ref)) + call_record = _run_async( + enrich_meeting_with_call_record(client, meeting_ref, call_record_id=call_record_id) + ) + print( + json.dumps( + { + "meeting_ref": meeting_ref.to_dict(), + "transcript_available": bool(transcript_artifact and transcript_text), + "transcript_artifact": transcript_artifact.to_dict() if transcript_artifact else None, + "transcript_preview": (transcript_text or "")[:240] or None, + "recording_count": len(recordings), + "recordings": [recording.to_dict() for recording in recordings[:5]], + "call_record": call_record.to_dict() if call_record else None, + }, + indent=2, + sort_keys=True, + ) + ) + + +def _cmd_subscriptions(args) -> None: + store = TeamsPipelineStore(_store_path(getattr(args, "store_path", None))) + client = build_graph_client() + subscriptions = _run_async(client.collect_paginated("/subscriptions")) + for sub in subscriptions: + try: + _sync_subscription_record(store, sub, status="active") + except Exception: + continue + if not subscriptions: + print("No Microsoft Graph subscriptions found.") + return + + print(f"\n{len(subscriptions)} Microsoft Graph subscription(s):\n") + for sub in subscriptions: + print(f" ◆ {sub.get('id') or 'unknown'}") + print(f" resource: {sub.get('resource') or 'unknown'}") + print(f" changeType: {sub.get('changeType') or 'unknown'}") + if sub.get("expirationDateTime"): + print(f" expires: {sub.get('expirationDateTime')}") + if sub.get("notificationUrl"): + print(f" notify: {sub.get('notificationUrl')}") + print() + + +def _cmd_subscribe(args) -> None: + store = TeamsPipelineStore(_store_path(getattr(args, "store_path", None))) + resource = str(getattr(args, "resource", "") or "").strip() + notification_url = str(getattr(args, "notification_url", "") or "").strip() + change_type = str(getattr(args, "change_type", "") or "").strip() or _default_change_type_for_resource(resource) + expiration = str(getattr(args, "expiration", "") or "").strip() or _iso_utc_timestamp(1) + client_state = str(getattr(args, "client_state", "") or "").strip() + lifecycle_url = str(getattr(args, "lifecycle_notification_url", "") or "").strip() + tls_version = str(getattr(args, "latest_supported_tls_version", "") or "").strip() or "v1_2" + + payload = { + "changeType": change_type, + "notificationUrl": notification_url, + "resource": resource, + "expirationDateTime": expiration, + "latestSupportedTlsVersion": tls_version, + } + if client_state: + payload["clientState"] = client_state + if lifecycle_url: + payload["lifecycleNotificationUrl"] = lifecycle_url + + result = _run_async(build_graph_client().post_json("/subscriptions", json_body=payload)) + _sync_subscription_record(store, result, status="active") + print(json.dumps(result, indent=2, sort_keys=True)) + + +def _cmd_renew_subscription(args) -> None: + subscription_id = str(getattr(args, "subscription_id", "") or "").strip() + expiration = str(getattr(args, "expiration", "") or "").strip() + if not subscription_id or not expiration: + print("subscription_id and --expiration are required") + return + + store = TeamsPipelineStore(_store_path(getattr(args, "store_path", None))) + result = _run_async( + build_graph_client().patch_json( + f"/subscriptions/{subscription_id}", + json_body={"expirationDateTime": expiration}, + ) + ) + merged = {"id": subscription_id, **(result or {}), "expirationDateTime": expiration} + _sync_subscription_record(store, merged, status="active", renewed=True) + print(json.dumps(merged, indent=2, sort_keys=True)) + + +def _cmd_delete_subscription(args) -> None: + subscription_id = str(getattr(args, "subscription_id", "") or "").strip() + if not subscription_id: + print("subscription_id is required") + return + store = TeamsPipelineStore(_store_path(getattr(args, "store_path", None))) + result = _run_async(build_graph_client().delete(f"/subscriptions/{subscription_id}")) + store.delete_subscription(subscription_id) + print(json.dumps({"subscription_id": subscription_id, "result": result}, indent=2, sort_keys=True)) + + +def _cmd_maintain_subscriptions(args) -> None: + store = TeamsPipelineStore(_store_path(getattr(args, "store_path", None))) + result = _run_async( + maintain_graph_subscriptions( + client=build_graph_client(), + store=store, + renew_within_hours=int(getattr(args, "renew_within_hours", 24) or 24), + extend_hours=int(getattr(args, "extend_hours", 24) or 24), + dry_run=bool(getattr(args, "dry_run", False)), + client_state=str(getattr(args, "client_state", "") or "").strip() or None, + ) + ) + print(json.dumps(result, indent=2, sort_keys=True)) + + +def _cmd_token_health(args) -> None: + provider = MicrosoftGraphTokenProvider.from_env() + health = provider.inspect_token_health() + payload = dict(health) + if getattr(args, "force_refresh", False): + try: + token = _run_async(provider.get_access_token(force_refresh=True)) + payload["last_refresh_succeeded"] = True + payload["access_token_length"] = len(token or "") + except Exception as exc: + payload["last_refresh_succeeded"] = False + payload["refresh_error"] = str(exc) + print(json.dumps(payload, indent=2, sort_keys=True)) + + +def _cmd_validate(args) -> None: + store = TeamsPipelineStore(_store_path(getattr(args, "store_path", None))) + snapshot = _validate_configuration_snapshot(store) + print(json.dumps(snapshot, indent=2, sort_keys=True)) diff --git a/plugins/teams_pipeline/meetings.py b/plugins/teams_pipeline/meetings.py new file mode 100644 index 0000000000..6d2648abd5 --- /dev/null +++ b/plugins/teams_pipeline/meetings.py @@ -0,0 +1,333 @@ +"""Graph-backed Teams meeting helpers for the plugin runtime.""" + +from __future__ import annotations + +import tempfile +from pathlib import Path +from typing import Any +from urllib.parse import quote + +from plugins.teams_pipeline.models import MeetingArtifact, TeamsMeetingRef +from tools.microsoft_graph_client import MicrosoftGraphAPIError, MicrosoftGraphClient + + +class TeamsMeetingError(RuntimeError): + """Base class for Teams meeting pipeline failures.""" + + +class TeamsMeetingNotFoundError(TeamsMeetingError): + """Raised when the meeting cannot be resolved from Graph.""" + + +class TeamsMeetingArtifactNotFoundError(TeamsMeetingError): + """Raised when a transcript or recording cannot be found.""" + + +class TeamsMeetingPermissionError(TeamsMeetingError): + """Raised when Graph access is denied for the requested resource.""" + + +def _meeting_path(meeting_ref: TeamsMeetingRef | str) -> str: + meeting_id = meeting_ref.meeting_id if isinstance(meeting_ref, TeamsMeetingRef) else str(meeting_ref) + return f"/communications/onlineMeetings/{quote(meeting_id, safe='')}" + + +def _wrap_graph_error(exc: MicrosoftGraphAPIError, *, missing_message: str) -> TeamsMeetingError: + if exc.status_code in (401, 403): + return TeamsMeetingPermissionError(str(exc)) + if exc.status_code == 404: + return TeamsMeetingNotFoundError(missing_message) + return TeamsMeetingError(str(exc)) + + +def _parse_organizer_user_id(payload: dict[str, Any]) -> str | None: + organizer = payload.get("organizer") + if not isinstance(organizer, dict): + return None + identity = organizer.get("identity") + if not isinstance(identity, dict): + return None + user = identity.get("user") + if not isinstance(user, dict): + return None + return user.get("id") + + +def _parse_thread_id(payload: dict[str, Any]) -> str | None: + chat = payload.get("chatInfo") + if isinstance(chat, dict): + thread_id = chat.get("threadId") + if thread_id: + return str(thread_id) + return payload.get("threadId") + + +def _normalize_meeting_ref(payload: dict[str, Any], *, tenant_id: str | None = None) -> TeamsMeetingRef: + metadata = { + key: payload.get(key) + for key in ("subject", "startDateTime", "endDateTime", "createdDateTime") + if payload.get(key) is not None + } + participants = payload.get("participants") + if participants is not None: + metadata["participants"] = participants + return TeamsMeetingRef( + meeting_id=str(payload.get("id") or "").strip(), + organizer_user_id=_parse_organizer_user_id(payload), + join_web_url=payload.get("joinWebUrl"), + calendar_event_id=payload.get("calendarEventId"), + thread_id=_parse_thread_id(payload), + tenant_id=tenant_id or payload.get("tenantId"), + metadata=metadata, + ) + + +def _normalize_artifact( + artifact_type: str, + payload: dict[str, Any], + *, + default_source_url: str | None = None, +) -> MeetingArtifact: + metadata = dict(payload) + download_url = ( + payload.get("@microsoft.graph.downloadUrl") + or payload.get("downloadUrl") + or payload.get("recordingContentUrl") + or payload.get("transcriptContentUrl") + ) + source_url = payload.get("webUrl") or payload.get("contentUrl") or default_source_url + return MeetingArtifact( + artifact_type=artifact_type, # type: ignore[arg-type] + artifact_id=str(payload.get("id") or "").strip(), + display_name=payload.get("displayName") or payload.get("name"), + content_type=payload.get("contentType") or payload.get("fileMimeType"), + source_url=source_url, + download_url=download_url, + created_at=payload.get("createdDateTime"), + available_at=payload.get("lastModifiedDateTime") or payload.get("meetingEndDateTime"), + size_bytes=payload.get("size"), + metadata=metadata, + ) + + +def _transcript_sort_key(artifact: MeetingArtifact) -> tuple[int, int, str]: + status = str(artifact.metadata.get("status") or "").lower() + has_download = int(bool(artifact.download_url or artifact.source_url)) + is_completed = int(status in {"available", "completed", "succeeded"}) + timestamp = "" + if artifact.available_at is not None: + timestamp = artifact.available_at.isoformat() + elif artifact.created_at is not None: + timestamp = artifact.created_at.isoformat() + return (is_completed, has_download, timestamp) + + +def _recording_download_path(meeting_ref: TeamsMeetingRef, artifact: MeetingArtifact) -> str: + if artifact.download_url: + return artifact.download_url + return f"{_meeting_path(meeting_ref)}/recordings/{quote(artifact.artifact_id, safe='')}/content" + + +def _transcript_download_path(meeting_ref: TeamsMeetingRef, artifact: MeetingArtifact) -> str: + if artifact.download_url: + return artifact.download_url + return f"{_meeting_path(meeting_ref)}/transcripts/{quote(artifact.artifact_id, safe='')}/content" + + +async def resolve_meeting_reference( + client: MicrosoftGraphClient, + *, + meeting_id: str | None = None, + join_web_url: str | None = None, + tenant_id: str | None = None, +) -> TeamsMeetingRef: + if meeting_id: + try: + payload = await client.get_json(_meeting_path(meeting_id)) + except MicrosoftGraphAPIError as exc: + raise _wrap_graph_error(exc, missing_message=f"Teams meeting not found: {meeting_id}") from exc + if not isinstance(payload, dict) or not payload.get("id"): + raise TeamsMeetingNotFoundError(f"Teams meeting not found: {meeting_id}") + return _normalize_meeting_ref(payload, tenant_id=tenant_id) + + if join_web_url: + escaped_join_url = join_web_url.replace("'", "''") + try: + payload = await client.get_json( + "/communications/onlineMeetings", + params={"$filter": f"JoinWebUrl eq '{escaped_join_url}'"}, + ) + except MicrosoftGraphAPIError as exc: + raise _wrap_graph_error( + exc, + missing_message=f"Teams meeting not found for join URL: {join_web_url}", + ) from exc + candidates = payload.get("value") if isinstance(payload, dict) else None + if not isinstance(candidates, list) or not candidates: + raise TeamsMeetingNotFoundError(f"Teams meeting not found for join URL: {join_web_url}") + return _normalize_meeting_ref(candidates[0], tenant_id=tenant_id) + + raise ValueError("Either meeting_id or join_web_url is required.") + + +async def list_transcript_artifacts( + client: MicrosoftGraphClient, + meeting_ref: TeamsMeetingRef, +) -> list[MeetingArtifact]: + try: + payloads = await client.collect_paginated(f"{_meeting_path(meeting_ref)}/transcripts") + except MicrosoftGraphAPIError as exc: + raise _wrap_graph_error( + exc, + missing_message=f"No transcripts found for Teams meeting {meeting_ref.meeting_id}", + ) from exc + return [_normalize_artifact("transcript", payload) for payload in payloads if isinstance(payload, dict)] + + +def select_preferred_transcript(candidates: list[MeetingArtifact]) -> MeetingArtifact | None: + transcripts = [candidate for candidate in candidates if candidate.artifact_type == "transcript"] + if not transcripts: + return None + return sorted(transcripts, key=_transcript_sort_key, reverse=True)[0] + + +async def download_transcript_text( + client: MicrosoftGraphClient, + meeting_ref: TeamsMeetingRef, + transcript: MeetingArtifact, + *, + encoding: str = "utf-8", +) -> str: + suffix = Path(transcript.display_name or "transcript.vtt").suffix or ".txt" + with tempfile.NamedTemporaryFile(prefix="teams-transcript-", suffix=suffix, delete=False) as handle: + destination = Path(handle.name) + try: + await client.download_to_file(_transcript_download_path(meeting_ref, transcript), destination) + text = destination.read_text(encoding=encoding).strip() + except MicrosoftGraphAPIError as exc: + raise _wrap_graph_error( + exc, + missing_message=( + f"Transcript {transcript.artifact_id} not found for meeting {meeting_ref.meeting_id}" + ), + ) from exc + finally: + try: + destination.unlink(missing_ok=True) + except OSError: + pass + + if not text: + raise TeamsMeetingArtifactNotFoundError( + f"Transcript {transcript.artifact_id} for meeting {meeting_ref.meeting_id} was empty." + ) + return text + + +async def fetch_preferred_transcript_text( + client: MicrosoftGraphClient, + meeting_ref: TeamsMeetingRef, +) -> tuple[MeetingArtifact | None, str | None]: + transcripts = await list_transcript_artifacts(client, meeting_ref) + transcript = select_preferred_transcript(transcripts) + if transcript is None: + return None, None + try: + return transcript, await download_transcript_text(client, meeting_ref, transcript) + except TeamsMeetingArtifactNotFoundError: + return None, None + + +async def list_recording_artifacts( + client: MicrosoftGraphClient, + meeting_ref: TeamsMeetingRef, +) -> list[MeetingArtifact]: + try: + payloads = await client.collect_paginated(f"{_meeting_path(meeting_ref)}/recordings") + except MicrosoftGraphAPIError as exc: + raise _wrap_graph_error( + exc, + missing_message=f"No recordings found for Teams meeting {meeting_ref.meeting_id}", + ) from exc + return [_normalize_artifact("recording", payload) for payload in payloads if isinstance(payload, dict)] + + +async def download_recording_artifact( + client: MicrosoftGraphClient, + meeting_ref: TeamsMeetingRef, + recording: MeetingArtifact, + destination: str | Path, +) -> dict[str, Any]: + destination_path = Path(destination) + try: + result = await client.download_to_file( + _recording_download_path(meeting_ref, recording), + destination_path, + ) + except MicrosoftGraphAPIError as exc: + raise _wrap_graph_error( + exc, + missing_message=f"Recording {recording.artifact_id} not found for meeting {meeting_ref.meeting_id}", + ) from exc + return { + "artifact": recording.to_dict(), + "path": str(destination_path), + "size_bytes": result.get("size_bytes") or recording.size_bytes, + "content_type": result.get("content_type") or recording.content_type, + } + + +async def fetch_call_record_artifact( + client: MicrosoftGraphClient, + *, + call_record_id: str, + allow_permission_errors: bool = True, +) -> MeetingArtifact | None: + try: + payload = await client.get_json(f"/communications/callRecords/{quote(call_record_id, safe='')}") + except MicrosoftGraphAPIError as exc: + if exc.status_code in (401, 403) and allow_permission_errors: + return None + if exc.status_code == 404: + return None + raise _wrap_graph_error(exc, missing_message=f"Call record not found: {call_record_id}") from exc + + if not isinstance(payload, dict) or not payload.get("id"): + return None + + metrics = { + "version": payload.get("version"), + "modalities": payload.get("modalities"), + "participant_count": len(payload.get("participants") or []), + "organizer": _parse_organizer_user_id(payload), + } + sessions = payload.get("sessions") or [] + if sessions: + metrics["session_count"] = len(sessions) + + return MeetingArtifact( + artifact_type="call_record", + artifact_id=str(payload["id"]), + display_name=payload.get("type") or "call_record", + source_url=payload.get("webUrl"), + created_at=payload.get("startDateTime"), + available_at=payload.get("endDateTime"), + metadata={"call_record": payload, "metrics": metrics}, + ) + + +async def enrich_meeting_with_call_record( + client: MicrosoftGraphClient, + meeting_ref: TeamsMeetingRef, + *, + call_record_id: str | None = None, + allow_permission_errors: bool = True, +) -> MeetingArtifact | None: + resolved_call_record_id = call_record_id or meeting_ref.metadata.get("call_record_id") + if not resolved_call_record_id: + return None + return await fetch_call_record_artifact( + client, + call_record_id=str(resolved_call_record_id), + allow_permission_errors=allow_permission_errors, + ) diff --git a/plugins/teams_pipeline/models.py b/plugins/teams_pipeline/models.py new file mode 100644 index 0000000000..8d85092be9 --- /dev/null +++ b/plugins/teams_pipeline/models.py @@ -0,0 +1,350 @@ +"""Normalized models for the Teams meeting pipeline plugin.""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from datetime import datetime, timezone +from typing import Any, Literal + + +ArtifactType = Literal["transcript", "recording", "call_record"] + + +def _parse_datetime(value: Any) -> datetime | None: + if value is None or isinstance(value, datetime): + return value + text = str(value).strip() + if not text: + return None + if text.endswith("Z"): + text = f"{text[:-1]}+00:00" + parsed = datetime.fromisoformat(text) + if parsed.tzinfo is None: + return parsed.replace(tzinfo=timezone.utc) + return parsed + + +def _serialize_datetime(value: datetime | None) -> str | None: + if value is None: + return None + normalized = value.astimezone(timezone.utc) + return normalized.isoformat().replace("+00:00", "Z") + + +def _clean_dict(values: dict[str, Any]) -> dict[str, Any]: + return {key: value for key, value in values.items() if value is not None} + + +@dataclass +class GraphSubscription: + subscription_id: str + resource: str + change_type: str + notification_url: str + expiration_datetime: datetime + client_state: str | None = None + latest_renewal_at: datetime | None = None + status: str | None = None + + def __post_init__(self) -> None: + if not self.subscription_id.strip(): + raise ValueError("GraphSubscription.subscription_id is required.") + if not self.resource.strip(): + raise ValueError("GraphSubscription.resource is required.") + if not self.change_type.strip(): + raise ValueError("GraphSubscription.change_type is required.") + if not self.notification_url.strip(): + raise ValueError("GraphSubscription.notification_url is required.") + self.expiration_datetime = _parse_datetime(self.expiration_datetime) + self.latest_renewal_at = _parse_datetime(self.latest_renewal_at) + if self.expiration_datetime is None: + raise ValueError("GraphSubscription.expiration_datetime is required.") + + @classmethod + def from_dict(cls, payload: dict[str, Any]) -> "GraphSubscription": + return cls( + subscription_id=str(payload.get("subscription_id") or payload.get("id") or "").strip(), + resource=str(payload.get("resource") or "").strip(), + change_type=str(payload.get("change_type") or payload.get("changeType") or "").strip(), + notification_url=str( + payload.get("notification_url") or payload.get("notificationUrl") or "" + ).strip(), + expiration_datetime=payload.get("expiration_datetime") + or payload.get("expirationDateTime"), + client_state=payload.get("client_state") or payload.get("clientState"), + latest_renewal_at=payload.get("latest_renewal_at") or payload.get("latestRenewalAt"), + status=payload.get("status"), + ) + + def to_dict(self) -> dict[str, Any]: + return _clean_dict( + { + "subscription_id": self.subscription_id, + "resource": self.resource, + "change_type": self.change_type, + "notification_url": self.notification_url, + "expiration_datetime": _serialize_datetime(self.expiration_datetime), + "client_state": self.client_state, + "latest_renewal_at": _serialize_datetime(self.latest_renewal_at), + "status": self.status, + } + ) + + +@dataclass +class TeamsMeetingRef: + meeting_id: str + organizer_user_id: str | None = None + join_web_url: str | None = None + calendar_event_id: str | None = None + thread_id: str | None = None + tenant_id: str | None = None + metadata: dict[str, Any] = field(default_factory=dict) + + def __post_init__(self) -> None: + if not self.meeting_id.strip(): + raise ValueError("TeamsMeetingRef.meeting_id is required.") + + @classmethod + def from_dict(cls, payload: dict[str, Any]) -> "TeamsMeetingRef": + return cls( + meeting_id=str(payload.get("meeting_id") or payload.get("id") or "").strip(), + organizer_user_id=payload.get("organizer_user_id") or payload.get("organizerUserId"), + join_web_url=payload.get("join_web_url") or payload.get("joinWebUrl"), + calendar_event_id=payload.get("calendar_event_id") or payload.get("calendarEventId"), + thread_id=payload.get("thread_id") or payload.get("threadId"), + tenant_id=payload.get("tenant_id") or payload.get("tenantId"), + metadata=dict(payload.get("metadata") or {}), + ) + + def to_dict(self) -> dict[str, Any]: + return _clean_dict( + { + "meeting_id": self.meeting_id, + "organizer_user_id": self.organizer_user_id, + "join_web_url": self.join_web_url, + "calendar_event_id": self.calendar_event_id, + "thread_id": self.thread_id, + "tenant_id": self.tenant_id, + "metadata": self.metadata or None, + } + ) + + +@dataclass +class MeetingArtifact: + artifact_type: ArtifactType + artifact_id: str + display_name: str | None = None + content_type: str | None = None + source_url: str | None = None + download_url: str | None = None + created_at: datetime | None = None + available_at: datetime | None = None + size_bytes: int | None = None + metadata: dict[str, Any] = field(default_factory=dict) + + def __post_init__(self) -> None: + if self.artifact_type not in ("transcript", "recording", "call_record"): + raise ValueError( + "MeetingArtifact.artifact_type must be transcript, recording, or call_record." + ) + if not self.artifact_id.strip(): + raise ValueError("MeetingArtifact.artifact_id is required.") + self.created_at = _parse_datetime(self.created_at) + self.available_at = _parse_datetime(self.available_at) + if self.size_bytes is not None: + self.size_bytes = int(self.size_bytes) + + @classmethod + def from_dict(cls, payload: dict[str, Any]) -> "MeetingArtifact": + return cls( + artifact_type=payload.get("artifact_type") or payload.get("artifactType"), + artifact_id=str(payload.get("artifact_id") or payload.get("id") or "").strip(), + display_name=payload.get("display_name") + or payload.get("displayName") + or payload.get("name"), + content_type=payload.get("content_type") or payload.get("contentType"), + source_url=payload.get("source_url") or payload.get("sourceUrl") or payload.get("webUrl"), + download_url=payload.get("download_url") + or payload.get("downloadUrl") + or payload.get("@microsoft.graph.downloadUrl"), + created_at=payload.get("created_at") or payload.get("createdDateTime"), + available_at=payload.get("available_at") + or payload.get("availableDateTime") + or payload.get("lastModifiedDateTime"), + size_bytes=payload.get("size_bytes") or payload.get("size"), + metadata=dict(payload.get("metadata") or {}), + ) + + def to_dict(self) -> dict[str, Any]: + return _clean_dict( + { + "artifact_type": self.artifact_type, + "artifact_id": self.artifact_id, + "display_name": self.display_name, + "content_type": self.content_type, + "source_url": self.source_url, + "download_url": self.download_url, + "created_at": _serialize_datetime(self.created_at), + "available_at": _serialize_datetime(self.available_at), + "size_bytes": self.size_bytes, + "metadata": self.metadata or None, + } + ) + + +@dataclass +class TeamsMeetingSummaryPayload: + meeting_ref: TeamsMeetingRef + title: str | None = None + start_time: datetime | None = None + end_time: datetime | None = None + participants: list[str] = field(default_factory=list) + transcript_text: str | None = None + summary: str | None = None + key_decisions: list[str] = field(default_factory=list) + action_items: list[str] = field(default_factory=list) + risks: list[str] = field(default_factory=list) + call_metrics: dict[str, Any] = field(default_factory=dict) + source_artifacts: list[MeetingArtifact] = field(default_factory=list) + confidence: str | None = None + confidence_notes: str | None = None + notion_target: str | None = None + linear_target: str | None = None + teams_target: str | None = None + + def __post_init__(self) -> None: + self.start_time = _parse_datetime(self.start_time) + self.end_time = _parse_datetime(self.end_time) + + @classmethod + def from_dict(cls, payload: dict[str, Any]) -> "TeamsMeetingSummaryPayload": + return cls( + meeting_ref=TeamsMeetingRef.from_dict(payload["meeting_ref"]), + title=payload.get("title"), + start_time=payload.get("start_time") or payload.get("startTime"), + end_time=payload.get("end_time") or payload.get("endTime"), + participants=list(payload.get("participants") or []), + transcript_text=payload.get("transcript_text") or payload.get("transcriptText"), + summary=payload.get("summary"), + key_decisions=list(payload.get("key_decisions") or payload.get("keyDecisions") or []), + action_items=list(payload.get("action_items") or payload.get("actionItems") or []), + risks=list(payload.get("risks") or []), + call_metrics=dict(payload.get("call_metrics") or payload.get("callMetrics") or {}), + source_artifacts=[ + MeetingArtifact.from_dict(item) for item in payload.get("source_artifacts", []) + ], + confidence=payload.get("confidence"), + confidence_notes=payload.get("confidence_notes") or payload.get("confidenceNotes"), + notion_target=payload.get("notion_target") or payload.get("notionTarget"), + linear_target=payload.get("linear_target") or payload.get("linearTarget"), + teams_target=payload.get("teams_target") or payload.get("teamsTarget"), + ) + + def to_dict(self) -> dict[str, Any]: + return _clean_dict( + { + "meeting_ref": self.meeting_ref.to_dict(), + "title": self.title, + "start_time": _serialize_datetime(self.start_time), + "end_time": _serialize_datetime(self.end_time), + "participants": self.participants or None, + "transcript_text": self.transcript_text, + "summary": self.summary, + "key_decisions": self.key_decisions or None, + "action_items": self.action_items or None, + "risks": self.risks or None, + "call_metrics": self.call_metrics or None, + "source_artifacts": [artifact.to_dict() for artifact in self.source_artifacts] + or None, + "confidence": self.confidence, + "confidence_notes": self.confidence_notes, + "notion_target": self.notion_target, + "linear_target": self.linear_target, + "teams_target": self.teams_target, + } + ) + + +@dataclass +class TeamsMeetingPipelineJob: + job_id: str + event_id: str + source_event_type: str + dedupe_key: str + status: str + retry_count: int = 0 + created_at: datetime | None = None + updated_at: datetime | None = None + meeting_ref: TeamsMeetingRef | None = None + selected_artifact_strategy: str | None = None + summary_payload: TeamsMeetingSummaryPayload | None = None + error_info: dict[str, Any] = field(default_factory=dict) + + def __post_init__(self) -> None: + if not self.job_id.strip(): + raise ValueError("TeamsMeetingPipelineJob.job_id is required.") + if not self.event_id.strip(): + raise ValueError("TeamsMeetingPipelineJob.event_id is required.") + if not self.source_event_type.strip(): + raise ValueError("TeamsMeetingPipelineJob.source_event_type is required.") + if not self.dedupe_key.strip(): + raise ValueError("TeamsMeetingPipelineJob.dedupe_key is required.") + if not self.status.strip(): + raise ValueError("TeamsMeetingPipelineJob.status is required.") + self.retry_count = int(self.retry_count) + self.created_at = _parse_datetime(self.created_at) + self.updated_at = _parse_datetime(self.updated_at) + + @classmethod + def from_dict(cls, payload: dict[str, Any]) -> "TeamsMeetingPipelineJob": + meeting_ref_payload = payload.get("meeting_ref") or payload.get("meetingRef") + summary_payload = payload.get("summary_payload") or payload.get("summaryPayload") + return cls( + job_id=str(payload.get("job_id") or payload.get("jobId") or "").strip(), + event_id=str(payload.get("event_id") or payload.get("eventId") or "").strip(), + source_event_type=str( + payload.get("source_event_type") or payload.get("sourceEventType") or "" + ).strip(), + dedupe_key=str(payload.get("dedupe_key") or payload.get("dedupeKey") or "").strip(), + status=str(payload.get("status") or "").strip(), + retry_count=payload.get("retry_count") or payload.get("retryCount") or 0, + created_at=payload.get("created_at") or payload.get("createdAt"), + updated_at=payload.get("updated_at") or payload.get("updatedAt"), + meeting_ref=TeamsMeetingRef.from_dict(meeting_ref_payload) if meeting_ref_payload else None, + selected_artifact_strategy=payload.get("selected_artifact_strategy") + or payload.get("selectedArtifactStrategy"), + summary_payload=TeamsMeetingSummaryPayload.from_dict(summary_payload) + if summary_payload + else None, + error_info=dict(payload.get("error_info") or payload.get("errorInfo") or {}), + ) + + def to_dict(self) -> dict[str, Any]: + return _clean_dict( + { + "job_id": self.job_id, + "event_id": self.event_id, + "source_event_type": self.source_event_type, + "dedupe_key": self.dedupe_key, + "status": self.status, + "retry_count": self.retry_count, + "created_at": _serialize_datetime(self.created_at), + "updated_at": _serialize_datetime(self.updated_at), + "meeting_ref": self.meeting_ref.to_dict() if self.meeting_ref else None, + "selected_artifact_strategy": self.selected_artifact_strategy, + "summary_payload": self.summary_payload.to_dict() if self.summary_payload else None, + "error_info": self.error_info or None, + } + ) + + +__all__ = [ + "ArtifactType", + "GraphSubscription", + "MeetingArtifact", + "TeamsMeetingPipelineJob", + "TeamsMeetingRef", + "TeamsMeetingSummaryPayload", +] diff --git a/plugins/teams_pipeline/pipeline.py b/plugins/teams_pipeline/pipeline.py new file mode 100644 index 0000000000..21958de21c --- /dev/null +++ b/plugins/teams_pipeline/pipeline.py @@ -0,0 +1,688 @@ +"""Pipeline orchestration for Microsoft Teams meeting summaries.""" + +from __future__ import annotations + +import asyncio +import json +import logging +import os +import shutil +import subprocess +import tempfile +import uuid +from dataclasses import dataclass +from pathlib import Path +from typing import Any, Awaitable, Callable, Optional + +import httpx + +from agent.auxiliary_client import async_call_llm, extract_content_or_reasoning +from hermes_constants import get_hermes_home +from plugins.teams_pipeline.meetings import ( + TeamsMeetingArtifactNotFoundError, + download_recording_artifact, + enrich_meeting_with_call_record, + fetch_preferred_transcript_text, + list_recording_artifacts, + resolve_meeting_reference, +) +from plugins.teams_pipeline.models import ( + MeetingArtifact, + TeamsMeetingPipelineJob, + TeamsMeetingRef, + TeamsMeetingSummaryPayload, +) +from plugins.teams_pipeline.store import TeamsPipelineStore +from tools.transcription_tools import transcribe_audio + +logger = logging.getLogger(__name__) + +TERMINAL_PIPELINE_STATES = {"completed", "failed", "retry_scheduled"} +ACTIVE_PIPELINE_STATES = { + "received", + "resolving_meeting", + "fetching_transcript", + "downloading_recording", + "transcribing_audio", + "summarizing", + "writing_notion", + "writing_linear", + "sending_teams", +} + + +class TeamsPipelineError(RuntimeError): + """Base class for Teams meeting pipeline failures.""" + + +class TeamsPipelineRetryableError(TeamsPipelineError): + """Raised when the pipeline should be retried later.""" + + +class TeamsPipelineSinkError(TeamsPipelineError): + """Raised when an output sink fails.""" + + +class TeamsPipelineArtifactNotFoundError(TeamsPipelineRetryableError): + """Raised when meeting artifacts are not yet available.""" + + +TranscribeFn = Callable[[str, Optional[str]], dict[str, Any]] +SummarizeFn = Callable[..., Awaitable[dict[str, Any] | TeamsMeetingSummaryPayload]] +SinkFn = Callable[ + [TeamsMeetingSummaryPayload, dict[str, Any], Optional[dict[str, Any]]], + Awaitable[dict[str, Any]], +] + + +@dataclass +class TeamsPipelineConfig: + transcript_preferred: bool = True + transcript_required: bool = False + transcription_fallback: bool = True + stt_model: str | None = None + ffmpeg_extract_audio: bool = True + transcript_min_chars: int = 80 + tmp_dir: Path | None = None + notion: dict[str, Any] | None = None + linear: dict[str, Any] | None = None + teams_delivery: dict[str, Any] | None = None + + @classmethod + def from_dict(cls, payload: Optional[dict[str, Any]]) -> "TeamsPipelineConfig": + data = dict(payload or {}) + tmp_dir = data.get("tmp_dir") or data.get("tmpDir") + return cls( + transcript_preferred=bool(data.get("transcript_preferred", True)), + transcript_required=bool(data.get("transcript_required", False)), + transcription_fallback=bool(data.get("transcription_fallback", True)), + stt_model=data.get("stt_model") or data.get("sttModel"), + ffmpeg_extract_audio=bool(data.get("ffmpeg_extract_audio", True)), + transcript_min_chars=int(data.get("transcript_min_chars", 80)), + tmp_dir=Path(tmp_dir) if tmp_dir else None, + notion=data.get("notion"), + linear=data.get("linear"), + teams_delivery=data.get("teams_delivery") or data.get("teamsDelivery"), + ) + + +class NotionWriter: + API_BASE = "https://api.notion.com/v1" + API_VERSION = "2025-09-03" + + def __init__(self, *, api_key: str | None = None, transport: httpx.AsyncBaseTransport | None = None) -> None: + self.api_key = (api_key or os.getenv("NOTION_API_KEY", "")).strip() + self._transport = transport + + async def write_summary( + self, + payload: TeamsMeetingSummaryPayload, + config: dict[str, Any], + existing_record: Optional[dict[str, Any]] = None, + ) -> dict[str, Any]: + if not self.api_key: + raise TeamsPipelineSinkError("NOTION_API_KEY is not configured.") + + database_id = str(config.get("database_id") or config.get("databaseId") or "").strip() + page_id = (existing_record or {}).get("page_id") + if not database_id and not page_id: + raise TeamsPipelineSinkError("Notion sink requires database_id or an existing page_id.") + + headers = { + "Authorization": f"Bearer {self.api_key}", + "Notion-Version": self.API_VERSION, + "Content-Type": "application/json", + } + async with httpx.AsyncClient(timeout=30.0, transport=self._transport) as client: + if page_id: + response = await client.patch( + f"{self.API_BASE}/pages/{page_id}", + headers=headers, + json={"properties": self._build_properties(payload, config)}, + ) + response.raise_for_status() + record = response.json() + else: + response = await client.post( + f"{self.API_BASE}/pages", + headers=headers, + json={ + "parent": {"database_id": database_id}, + "properties": self._build_properties(payload, config), + "children": self._build_blocks(payload), + }, + ) + response.raise_for_status() + record = response.json() + + return {"page_id": record["id"], "url": record.get("url")} + + def _build_properties(self, payload: TeamsMeetingSummaryPayload, config: dict[str, Any]) -> dict[str, Any]: + title_property = config.get("title_property", "Name") + summary_property = config.get("summary_property") + meeting_id_property = config.get("meeting_id_property") + + properties: dict[str, Any] = { + title_property: { + "title": [{"text": {"content": payload.title or f"Meeting {payload.meeting_ref.meeting_id}"}}] + } + } + if summary_property: + properties[summary_property] = { + "rich_text": [{"text": {"content": (payload.summary or "")[:1900]}}] + } + if meeting_id_property: + properties[meeting_id_property] = { + "rich_text": [{"text": {"content": payload.meeting_ref.meeting_id}}] + } + return properties + + def _build_blocks(self, payload: TeamsMeetingSummaryPayload) -> list[dict[str, Any]]: + sections = [ + ("Summary", payload.summary or ""), + ("Key Decisions", "\n".join(f"- {item}" for item in payload.key_decisions)), + ("Action Items", "\n".join(f"- {item}" for item in payload.action_items)), + ("Risks", "\n".join(f"- {item}" for item in payload.risks)), + ] + blocks: list[dict[str, Any]] = [] + for heading, body in sections: + blocks.append( + { + "object": "block", + "type": "heading_2", + "heading_2": {"rich_text": [{"text": {"content": heading}}]}, + } + ) + blocks.append( + { + "object": "block", + "type": "paragraph", + "paragraph": {"rich_text": [{"text": {"content": body or "None"}}]}, + } + ) + return blocks + + +class LinearWriter: + API_URL = "https://api.linear.app/graphql" + + def __init__(self, *, api_key: str | None = None, transport: httpx.AsyncBaseTransport | None = None) -> None: + self.api_key = (api_key or os.getenv("LINEAR_API_KEY", "")).strip() + self._transport = transport + + async def write_summary( + self, + payload: TeamsMeetingSummaryPayload, + config: dict[str, Any], + existing_record: Optional[dict[str, Any]] = None, + ) -> dict[str, Any]: + if not self.api_key: + raise TeamsPipelineSinkError("LINEAR_API_KEY is not configured.") + + headers = {"Authorization": self.api_key, "Content-Type": "application/json"} + team_id = str(config.get("team_id") or config.get("teamId") or "").strip() + title = payload.title or f"Meeting Summary: {payload.meeting_ref.meeting_id}" + description = _render_summary_markdown(payload) + existing_issue_id = (existing_record or {}).get("issue_id") + + async with httpx.AsyncClient(timeout=30.0, transport=self._transport) as client: + if existing_issue_id: + response = await client.post( + self.API_URL, + headers=headers, + json={ + "query": ( + "mutation($id: String!, $input: IssueUpdateInput!) " + "{ issueUpdate(id: $id, input: $input) { success issue { id identifier url } } }" + ), + "variables": { + "id": existing_issue_id, + "input": {"title": title, "description": description}, + }, + }, + ) + else: + if not team_id: + raise TeamsPipelineSinkError("Linear sink requires team_id when creating a new issue.") + response = await client.post( + self.API_URL, + headers=headers, + json={ + "query": ( + "mutation($input: IssueCreateInput!) " + "{ issueCreate(input: $input) { success issue { id identifier url } } }" + ), + "variables": {"input": {"teamId": team_id, "title": title, "description": description}}, + }, + ) + response.raise_for_status() + payload_json = response.json() + + issue = ( + (((payload_json.get("data") or {}).get("issueUpdate") or {}).get("issue")) + or (((payload_json.get("data") or {}).get("issueCreate") or {}).get("issue")) + ) + if not isinstance(issue, dict) or not issue.get("id"): + raise TeamsPipelineSinkError(f"Linear write failed: {payload_json}") + + return {"issue_id": issue["id"], "identifier": issue.get("identifier"), "url": issue.get("url")} + + +class TeamsMeetingPipeline: + """Transcript-first Teams meeting pipeline with durable lifecycle state.""" + + def __init__( + self, + *, + graph_client: Any, + store: TeamsPipelineStore, + config: TeamsPipelineConfig | dict[str, Any] | None = None, + transcribe_fn: TranscribeFn = transcribe_audio, + summarize_fn: Optional[SummarizeFn] = None, + notion_writer: Optional[NotionWriter] = None, + linear_writer: Optional[LinearWriter] = None, + teams_sender: Optional[SinkFn] = None, + ) -> None: + self.graph_client = graph_client + self.store = store + self.config = config if isinstance(config, TeamsPipelineConfig) else TeamsPipelineConfig.from_dict(config) + self.transcribe_fn = transcribe_fn + self.summarize_fn = summarize_fn or self._generate_summary_payload + self.notion_writer = notion_writer + self.linear_writer = linear_writer + self.teams_sender = teams_sender + + def create_job_from_notification(self, notification: dict[str, Any]) -> TeamsMeetingPipelineJob: + event_id = TeamsPipelineStore.build_notification_receipt_key(notification) + self.store.record_notification_receipt(event_id, notification) + existing_job = self._find_job_by_dedupe_key(event_id) + if existing_job is not None: + return existing_job + resource_data = notification.get("resourceData") or {} + meeting_id = ( + resource_data.get("id") + or notification.get("meetingId") + or _extract_meeting_id_from_resource(str(notification.get("resource") or "")) + or notification.get("resource") + or event_id + ) + job = TeamsMeetingPipelineJob( + job_id=f"teams-job-{uuid.uuid4().hex[:12]}", + event_id=event_id, + source_event_type=str(notification.get("changeType") or "graph.notification"), + dedupe_key=event_id, + status="received", + meeting_ref=TeamsMeetingRef( + meeting_id=str(meeting_id), + tenant_id=resource_data.get("tenantId") or notification.get("tenantId"), + metadata={ + "notification": dict(notification), + "join_web_url": resource_data.get("joinWebUrl"), + "call_record_id": resource_data.get("callRecordId") or notification.get("callRecordId"), + }, + ), + ) + self.store.upsert_job(job.job_id, job.to_dict()) + return job + + async def run_notification(self, notification: dict[str, Any]) -> TeamsMeetingPipelineJob: + job = self.create_job_from_notification(notification) + if job.status in TERMINAL_PIPELINE_STATES or job.status in ACTIVE_PIPELINE_STATES - {"received"}: + return job + return await self.run_job(job.job_id) + + async def run_job(self, job_or_id: TeamsMeetingPipelineJob | str) -> TeamsMeetingPipelineJob: + job = self._coerce_job(job_or_id) + meeting_ref = job.meeting_ref + if meeting_ref is None: + raise TeamsPipelineError(f"Job {job.job_id} has no meeting_ref.") + + artifacts: list[MeetingArtifact] = [] + + try: + job = self._persist_job(job, status="resolving_meeting") + notification = meeting_ref.metadata.get("notification") if isinstance(meeting_ref.metadata, dict) else {} + resolved_meeting = await resolve_meeting_reference( + self.graph_client, + meeting_id=meeting_ref.meeting_id, + join_web_url=meeting_ref.join_web_url or meeting_ref.metadata.get("join_web_url"), + tenant_id=meeting_ref.tenant_id, + ) + job.meeting_ref = resolved_meeting + job = self._persist_job(job, meeting_ref=resolved_meeting.to_dict()) + + transcript_text: str | None = None + if self.config.transcript_preferred: + job = self._persist_job(job, status="fetching_transcript") + transcript_artifact, transcript_text = await fetch_preferred_transcript_text( + self.graph_client, resolved_meeting + ) + if transcript_artifact and transcript_text: + artifacts.append(transcript_artifact) + if len(transcript_text.strip()) < self.config.transcript_min_chars: + transcript_text = None + + if not transcript_text: + if self.config.transcript_required: + raise TeamsPipelineRetryableError( + f"Transcript unavailable for meeting {resolved_meeting.meeting_id}." + ) + if not self.config.transcription_fallback: + raise TeamsPipelineArtifactNotFoundError( + "No transcript available and transcription fallback disabled " + f"for {resolved_meeting.meeting_id}." + ) + job = self._persist_job(job, status="downloading_recording") + recordings = await list_recording_artifacts(self.graph_client, resolved_meeting) + if not recordings: + raise TeamsPipelineRetryableError( + f"Recording unavailable for meeting {resolved_meeting.meeting_id}." + ) + recording = recordings[0] + artifacts.append(recording) + transcript_text = await self._transcribe_recording(job, resolved_meeting, recording) + job = self._persist_job(job, selected_artifact_strategy="recording_stt_fallback") + else: + job = self._persist_job(job, selected_artifact_strategy="transcript_first") + + call_record_id = notification.get("callRecordId") or (meeting_ref.metadata or {}).get("call_record_id") + call_record = await enrich_meeting_with_call_record( + self.graph_client, + resolved_meeting, + call_record_id=call_record_id, + ) + if call_record is not None: + artifacts.append(call_record) + + job = self._persist_job(job, status="summarizing") + generated = await self.summarize_fn( + resolved_meeting=resolved_meeting, + transcript_text=transcript_text or "", + artifacts=artifacts, + ) + summary_payload = ( + generated + if isinstance(generated, TeamsMeetingSummaryPayload) + else TeamsMeetingSummaryPayload.from_dict(generated) + ) + job.summary_payload = summary_payload + job = self._persist_job(job, summary_payload=summary_payload.to_dict()) + + await self._write_sinks(job, summary_payload) + job = self._persist_job(job, status="completed") + return job + except TeamsPipelineRetryableError as exc: + job = self._persist_job( + job, + status="retry_scheduled", + error_info={"message": str(exc), "retryable": True}, + ) + return job + except Exception as exc: + job = self._persist_job( + job, + status="failed", + error_info={"message": str(exc), "type": type(exc).__name__}, + ) + return job + + def _coerce_job(self, job_or_id: TeamsMeetingPipelineJob | str) -> TeamsMeetingPipelineJob: + if isinstance(job_or_id, TeamsMeetingPipelineJob): + return job_or_id + payload = self.store.get_job(str(job_or_id)) + if not payload: + raise TeamsPipelineError(f"Unknown Teams pipeline job: {job_or_id}") + return TeamsMeetingPipelineJob.from_dict(payload) + + def _find_job_by_dedupe_key(self, dedupe_key: str) -> TeamsMeetingPipelineJob | None: + for payload in self.store.list_jobs().values(): + if not isinstance(payload, dict): + continue + if str(payload.get("dedupe_key") or "") != dedupe_key: + continue + return TeamsMeetingPipelineJob.from_dict(payload) + return None + + def _persist_job(self, job: TeamsMeetingPipelineJob, **updates: Any) -> TeamsMeetingPipelineJob: + payload = job.to_dict() + payload.update(updates) + stored = self.store.upsert_job(job.job_id, payload) + return TeamsMeetingPipelineJob.from_dict(stored) + + async def _transcribe_recording( + self, + job: TeamsMeetingPipelineJob, + meeting_ref: TeamsMeetingRef, + recording: MeetingArtifact, + ) -> str: + temp_root = self.config.tmp_dir or (get_hermes_home() / "tmp" / "teams_pipeline") + temp_root.mkdir(parents=True, exist_ok=True) + with tempfile.TemporaryDirectory(dir=str(temp_root), prefix="teams-recording-") as tmp_dir: + recording_name = recording.display_name or f"{recording.artifact_id}.mp4" + recording_path = Path(tmp_dir) / recording_name + await download_recording_artifact( + self.graph_client, + meeting_ref, + recording, + recording_path, + ) + audio_path = await self._prepare_audio_path(recording_path) + job = self._persist_job(job, status="transcribing_audio") + result = await asyncio.to_thread(self.transcribe_fn, str(audio_path), self.config.stt_model) + if not result.get("success"): + raise TeamsPipelineRetryableError(str(result.get("error") or "Unknown STT failure")) + transcript = str(result.get("transcript") or "").strip() + if not transcript: + raise TeamsPipelineRetryableError("STT returned an empty transcript.") + return transcript + + async def _prepare_audio_path(self, recording_path: Path) -> Path: + if recording_path.suffix.lower() in {".wav", ".mp3", ".m4a", ".ogg", ".flac", ".aac", ".webm"}: + return recording_path + if not self.config.ffmpeg_extract_audio: + return recording_path + ffmpeg = shutil.which("ffmpeg") + if not ffmpeg: + raise TeamsPipelineRetryableError( + "Recording fallback requires ffmpeg for audio extraction, but ffmpeg was not found." + ) + audio_path = recording_path.with_suffix(".wav") + proc = await asyncio.create_subprocess_exec( + ffmpeg, + "-y", + "-i", + str(recording_path), + str(audio_path), + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + _stdout, stderr = await proc.communicate() + if proc.returncode != 0: + detail = stderr.decode("utf-8", errors="replace").strip() + raise TeamsPipelineRetryableError(f"ffmpeg audio extraction failed: {detail}") + return audio_path + + async def _generate_summary_payload( + self, + *, + resolved_meeting: TeamsMeetingRef, + transcript_text: str, + artifacts: list[MeetingArtifact], + ) -> TeamsMeetingSummaryPayload: + prompt = _build_summary_prompt(resolved_meeting, transcript_text, artifacts) + try: + response = await async_call_llm( + task="call", + messages=[ + { + "role": "system", + "content": ( + "You summarize meeting transcripts. Return only valid JSON with keys: " + "summary, key_decisions, action_items, risks, confidence, confidence_notes." + ), + }, + {"role": "user", "content": prompt}, + ], + temperature=0.2, + max_tokens=900, + ) + content = extract_content_or_reasoning(response) + parsed = _parse_summary_json(content) + except Exception as exc: + logger.info("Teams pipeline LLM summary unavailable, using heuristic summary: %s", exc) + parsed = _heuristic_summary(transcript_text) + + metrics = _collect_call_metrics(artifacts) + return TeamsMeetingSummaryPayload( + meeting_ref=resolved_meeting, + title=str(resolved_meeting.metadata.get("subject") or f"Meeting {resolved_meeting.meeting_id}"), + start_time=resolved_meeting.metadata.get("startDateTime"), + end_time=resolved_meeting.metadata.get("endDateTime"), + participants=_collect_participants(resolved_meeting), + transcript_text=transcript_text, + summary=parsed.get("summary"), + key_decisions=list(parsed.get("key_decisions") or []), + action_items=list(parsed.get("action_items") or []), + risks=list(parsed.get("risks") or []), + call_metrics=metrics, + source_artifacts=artifacts, + confidence=parsed.get("confidence"), + confidence_notes=parsed.get("confidence_notes"), + notion_target=(self.config.notion or {}).get("database_id"), + linear_target=(self.config.linear or {}).get("team_id"), + teams_target=(self.config.teams_delivery or {}).get("channel_id"), + ) + + async def _write_sinks(self, job: TeamsMeetingPipelineJob, payload: TeamsMeetingSummaryPayload) -> None: + if self.config.notion and self.config.notion.get("enabled") and self.notion_writer: + job = self._persist_job(job, status="writing_notion") + sink_key = f"notion:{payload.meeting_ref.meeting_id}" + existing = self.store.get_sink_record(sink_key) + result = await self.notion_writer.write_summary(payload, self.config.notion, existing) + self.store.upsert_sink_record(sink_key, result) + + if self.config.linear and self.config.linear.get("enabled") and self.linear_writer: + job = self._persist_job(job, status="writing_linear") + sink_key = f"linear:{payload.meeting_ref.meeting_id}" + existing = self.store.get_sink_record(sink_key) + result = await self.linear_writer.write_summary(payload, self.config.linear, existing) + self.store.upsert_sink_record(sink_key, result) + + if self.config.teams_delivery and self.config.teams_delivery.get("enabled") and self.teams_sender: + job = self._persist_job(job, status="sending_teams") + sink_key = f"teams:{payload.meeting_ref.meeting_id}" + existing = self.store.get_sink_record(sink_key) + if hasattr(self.teams_sender, "write_summary"): + result = await self.teams_sender.write_summary(payload, self.config.teams_delivery, existing) + else: + result = await self.teams_sender(payload, self.config.teams_delivery, existing) + self.store.upsert_sink_record(sink_key, result) + + +def _collect_call_metrics(artifacts: list[MeetingArtifact]) -> dict[str, Any]: + metrics: dict[str, Any] = {} + for artifact in artifacts: + if artifact.artifact_type == "call_record": + metrics.update(dict(artifact.metadata.get("metrics") or {})) + metrics["artifact_count"] = len(artifacts) + return metrics + + +def _collect_participants(meeting_ref: TeamsMeetingRef) -> list[str]: + participants = meeting_ref.metadata.get("participants") or [] + result: list[str] = [] + if isinstance(participants, list): + for item in participants: + if isinstance(item, dict): + name = item.get("displayName") or (((item.get("identity") or {}).get("user") or {}).get("displayName")) + if name: + result.append(str(name)) + return result + + +def _extract_meeting_id_from_resource(resource: str) -> str | None: + if not resource: + return None + parts = [part for part in resource.split("/") if part] + if not parts: + return None + if "onlineMeetings" in parts: + index = parts.index("onlineMeetings") + if index + 1 < len(parts): + return parts[index + 1] + return parts[-1] + + +def _build_summary_prompt( + meeting_ref: TeamsMeetingRef, + transcript_text: str, + artifacts: list[MeetingArtifact], +) -> str: + artifact_lines = [f"- {artifact.artifact_type}:{artifact.artifact_id}:{artifact.display_name or ''}" for artifact in artifacts] + return ( + f"Meeting ID: {meeting_ref.meeting_id}\n" + f"Title: {meeting_ref.metadata.get('subject') or 'Unknown'}\n" + f"Artifacts:\n{chr(10).join(artifact_lines) or '- none'}\n\n" + "Transcript:\n" + f"{transcript_text[:18000]}" + ) + + +def _parse_summary_json(content: str) -> dict[str, Any]: + text = (content or "").strip() + if not text: + return _heuristic_summary("") + start = text.find("{") + end = text.rfind("}") + if start >= 0 and end > start: + text = text[start : end + 1] + payload = json.loads(text) + return { + "summary": str(payload.get("summary") or "").strip(), + "key_decisions": [str(item).strip() for item in payload.get("key_decisions", []) if str(item).strip()], + "action_items": [str(item).strip() for item in payload.get("action_items", []) if str(item).strip()], + "risks": [str(item).strip() for item in payload.get("risks", []) if str(item).strip()], + "confidence": str(payload.get("confidence") or "medium").strip(), + "confidence_notes": str(payload.get("confidence_notes") or "").strip(), + } + + +def _heuristic_summary(transcript_text: str) -> dict[str, Any]: + lines = [line.strip(" -*\t") for line in transcript_text.splitlines() if line.strip()] + summary = " ".join(lines[:3])[:1200] or "Transcript unavailable or too sparse for a confident summary." + action_items = [ + line for line in lines if line.lower().startswith(("action:", "todo:", "next step:", "follow up:")) + ][:8] + risks = [line for line in lines if "risk" in line.lower() or "blocker" in line.lower()][:6] + decisions = [line for line in lines if "decide" in line.lower() or "decision" in line.lower()][:6] + confidence = "low" if len(transcript_text.strip()) < 300 else "medium" + return { + "summary": summary, + "key_decisions": decisions, + "action_items": action_items, + "risks": risks, + "confidence": confidence, + "confidence_notes": "Generated with heuristic fallback because no LLM summary response was available.", + } + + +def _render_summary_markdown(payload: TeamsMeetingSummaryPayload) -> str: + lines = [ + f"# {payload.title or f'Meeting {payload.meeting_ref.meeting_id}'}", + "", + "## Summary", + payload.summary or "No summary available.", + "", + "## Key Decisions", + *([f"- {item}" for item in payload.key_decisions] or ["- None"]), + "", + "## Action Items", + *([f"- {item}" for item in payload.action_items] or ["- None"]), + "", + "## Risks", + *([f"- {item}" for item in payload.risks] or ["- None"]), + "", + f"Confidence: {payload.confidence or 'unknown'}", + payload.confidence_notes or "", + ] + return "\n".join(lines).strip() diff --git a/plugins/teams_pipeline/plugin.yaml b/plugins/teams_pipeline/plugin.yaml new file mode 100644 index 0000000000..c9287ac083 --- /dev/null +++ b/plugins/teams_pipeline/plugin.yaml @@ -0,0 +1,9 @@ +name: teams_pipeline +version: 0.1.0 +description: "Microsoft Teams meeting pipeline plugin with durable runtime state and operator CLI flows for Graph-backed transcript-first meeting summaries." +author: NousResearch +kind: standalone +platforms: + - linux + - macos + - windows diff --git a/plugins/teams_pipeline/runtime.py b/plugins/teams_pipeline/runtime.py new file mode 100644 index 0000000000..0163ccaaae --- /dev/null +++ b/plugins/teams_pipeline/runtime.py @@ -0,0 +1,125 @@ +"""Gateway runtime wiring for the Teams meeting pipeline plugin.""" + +from __future__ import annotations + +import logging +from typing import Any + +from gateway.config import Platform +from plugins.teams_pipeline.pipeline import TeamsMeetingPipeline +from plugins.teams_pipeline.store import TeamsPipelineStore, resolve_teams_pipeline_store_path +from plugins.teams_pipeline.subscriptions import build_graph_client + +logger = logging.getLogger(__name__) + + +def _teams_delivery_is_configured(teams_extra: dict[str, Any], teams_delivery: dict[str, Any]) -> bool: + delivery_mode = str( + teams_delivery.get("mode") + or teams_delivery.get("delivery_mode") + or teams_extra.get("delivery_mode") + or "" + ).strip().lower() + + if delivery_mode == "incoming_webhook": + return bool( + teams_delivery.get("incoming_webhook_url") + or teams_extra.get("incoming_webhook_url") + ) + if delivery_mode == "graph": + chat_id = teams_delivery.get("chat_id") or teams_extra.get("chat_id") + team_id = teams_delivery.get("team_id") or teams_extra.get("team_id") + channel_id = teams_delivery.get("channel_id") or teams_extra.get("channel_id") + return bool(chat_id or (team_id and channel_id)) + + return False + + +def build_pipeline_runtime_config(gateway_config: Any) -> dict[str, Any]: + """Build pipeline config from gateway platform config. + + Pipeline-specific knobs live under ``teams.extra.meeting_pipeline`` while + Teams delivery continues to source its target details from the existing + Teams platform config. + """ + + teams_config = gateway_config.platforms.get(Platform("teams")) + teams_extra = dict((teams_config.extra or {}) if teams_config else {}) + pipeline_config = dict(teams_extra.get("meeting_pipeline") or {}) + + if teams_config and teams_config.enabled: + teams_delivery = dict(pipeline_config.get("teams_delivery") or {}) + + delivery_mode = str(teams_extra.get("delivery_mode") or "").strip() + if delivery_mode: + teams_delivery["mode"] = delivery_mode + + for key in ( + "incoming_webhook_url", + "access_token", + "team_id", + "channel_id", + "chat_id", + ): + value = teams_extra.get(key) + if value not in (None, ""): + teams_delivery[key] = value + + if teams_delivery: + teams_delivery["enabled"] = _teams_delivery_is_configured(teams_extra, teams_delivery) + pipeline_config["teams_delivery"] = teams_delivery + + return pipeline_config + + +def build_pipeline_runtime(gateway: Any) -> TeamsMeetingPipeline: + teams_sender = None + teams_config = gateway.config.platforms.get(Platform("teams")) + pipeline_config = build_pipeline_runtime_config(gateway.config) + teams_delivery = dict(pipeline_config.get("teams_delivery") or {}) + if teams_config and teams_config.enabled and teams_delivery.get("enabled"): + try: + from plugins.platforms.teams.adapter import TeamsSummaryWriter + except ImportError: + logger.debug( + "TeamsSummaryWriter unavailable; Teams outbound delivery remains disabled until the adapter layer is present." + ) + else: + teams_sender = TeamsSummaryWriter(platform_config=teams_config) + + return TeamsMeetingPipeline( + graph_client=build_graph_client(), + store=TeamsPipelineStore(resolve_teams_pipeline_store_path()), + config=pipeline_config, + teams_sender=teams_sender, + ) + + +def bind_gateway_runtime(gateway: Any) -> bool: + """Attach the Teams pipeline runtime to the msgraph webhook adapter.""" + + adapter = gateway.adapters.get(Platform.MSGRAPH_WEBHOOK) + if adapter is None: + return False + + if getattr(gateway, "_teams_pipeline_runtime", None) is not None: + return True + + try: + runtime = build_pipeline_runtime(gateway) + except Exception as exc: + error_message = str(exc) + gateway._teams_pipeline_runtime_error = error_message + logger.warning( + "Teams pipeline runtime unavailable; leaving webhook scheduler unchanged: %s", + error_message, + ) + return False + + async def _schedule(notification: dict[str, Any], event: Any) -> None: + await runtime.run_notification(notification) + + adapter.set_notification_scheduler(_schedule) + gateway._teams_pipeline_runtime = runtime + gateway._teams_pipeline_runtime_error = None + return True diff --git a/plugins/teams_pipeline/store.py b/plugins/teams_pipeline/store.py new file mode 100644 index 0000000000..ceab28cb7e --- /dev/null +++ b/plugins/teams_pipeline/store.py @@ -0,0 +1,193 @@ +"""Durable local state for the Teams pipeline plugin.""" + +from __future__ import annotations + +import hashlib +import json +import os +import threading +from copy import deepcopy +from datetime import datetime, timezone +from pathlib import Path +from tempfile import NamedTemporaryFile +from typing import Any, Dict, Optional + +from hermes_constants import get_hermes_home + + +DEFAULT_TEAMS_PIPELINE_STORE_FILENAME = "teams_pipeline_store.json" + + +def _utc_now_iso() -> str: + return datetime.now(timezone.utc).isoformat() + + +def resolve_teams_pipeline_store_path(path: str | Path | None = None) -> Path: + if path is not None: + explicit = str(path).strip() + if explicit: + return Path(explicit) + + env_path = os.getenv("MSGRAPH_WEBHOOK_STORE_PATH", "").strip() + if env_path: + return Path(env_path) + + return get_hermes_home() / DEFAULT_TEAMS_PIPELINE_STORE_FILENAME + + +class TeamsPipelineStore: + """JSON-backed durable store for Teams pipeline state.""" + + def __init__(self, path: str | Path): + self.path = Path(path) + self._lock = threading.RLock() + self._state: Dict[str, Dict[str, Any]] = { + "subscriptions": {}, + "notification_receipts": {}, + "event_timestamps": {}, + "jobs": {}, + "sink_records": {}, + } + self._load() + + def _load(self) -> None: + with self._lock: + if not self.path.exists(): + return + data = json.loads(self.path.read_text(encoding="utf-8") or "{}") + if not isinstance(data, dict): + return + self._state["subscriptions"] = dict(data.get("subscriptions") or {}) + self._state["notification_receipts"] = dict(data.get("notification_receipts") or {}) + self._state["event_timestamps"] = dict(data.get("event_timestamps") or {}) + self._state["jobs"] = dict(data.get("jobs") or {}) + self._state["sink_records"] = dict(data.get("sink_records") or {}) + + def _persist(self) -> None: + self.path.parent.mkdir(parents=True, exist_ok=True) + with NamedTemporaryFile( + "w", + encoding="utf-8", + dir=str(self.path.parent), + delete=False, + ) as tmp: + json.dump(self._state, tmp, indent=2, sort_keys=True) + tmp.flush() + tmp_path = Path(tmp.name) + tmp_path.replace(self.path) + + def list_subscriptions(self) -> Dict[str, Dict[str, Any]]: + with self._lock: + return deepcopy(self._state["subscriptions"]) + + def get_subscription(self, subscription_id: str) -> Optional[Dict[str, Any]]: + with self._lock: + record = self._state["subscriptions"].get(subscription_id) + return deepcopy(record) if isinstance(record, dict) else None + + def upsert_subscription(self, subscription_id: str, payload: Dict[str, Any]) -> Dict[str, Any]: + with self._lock: + existing = self._state["subscriptions"].get(subscription_id, {}) + merged = {**existing, **deepcopy(payload)} + merged["subscription_id"] = subscription_id + merged.setdefault("created_at", existing.get("created_at") or _utc_now_iso()) + merged["updated_at"] = _utc_now_iso() + self._state["subscriptions"][subscription_id] = merged + self._persist() + return deepcopy(merged) + + def delete_subscription(self, subscription_id: str) -> bool: + with self._lock: + removed = self._state["subscriptions"].pop(subscription_id, None) + if removed is None: + return False + self._persist() + return True + + @classmethod + def build_notification_receipt_key(cls, notification: Dict[str, Any]) -> str: + explicit_id = notification.get("id") + if explicit_id: + return f"id:{explicit_id}" + canonical = json.dumps(notification, sort_keys=True, separators=(",", ":")) + digest = hashlib.sha256(canonical.encode("utf-8")).hexdigest() + return f"sha256:{digest}" + + def has_notification_receipt(self, receipt_key: str) -> bool: + with self._lock: + return receipt_key in self._state["notification_receipts"] + + def record_notification_receipt( + self, + receipt_key: str, + payload: Optional[Dict[str, Any]] = None, + *, + received_at: Optional[str] = None, + ) -> bool: + with self._lock: + if receipt_key in self._state["notification_receipts"]: + return False + self._state["notification_receipts"][receipt_key] = { + "received_at": received_at or _utc_now_iso(), + "payload": deepcopy(payload) if isinstance(payload, dict) else payload, + } + self._persist() + return True + + def record_event_timestamp(self, event_key: str, timestamp: Optional[str] = None) -> str: + with self._lock: + value = timestamp or _utc_now_iso() + self._state["event_timestamps"][event_key] = value + self._persist() + return value + + def get_event_timestamp(self, event_key: str) -> Optional[str]: + with self._lock: + value = self._state["event_timestamps"].get(event_key) + return str(value) if value is not None else None + + def stats(self) -> Dict[str, int]: + with self._lock: + return { + "subscriptions": len(self._state["subscriptions"]), + "notification_receipts": len(self._state["notification_receipts"]), + "event_timestamps": len(self._state["event_timestamps"]), + "jobs": len(self._state["jobs"]), + "sink_records": len(self._state["sink_records"]), + } + + def upsert_job(self, job_id: str, payload: Dict[str, Any]) -> Dict[str, Any]: + with self._lock: + existing = self._state["jobs"].get(job_id, {}) + merged = {**existing, **deepcopy(payload)} + merged["job_id"] = job_id + merged.setdefault("created_at", existing.get("created_at") or _utc_now_iso()) + merged["updated_at"] = _utc_now_iso() + self._state["jobs"][job_id] = merged + self._persist() + return deepcopy(merged) + + def get_job(self, job_id: str) -> Optional[Dict[str, Any]]: + with self._lock: + record = self._state["jobs"].get(job_id) + return deepcopy(record) if isinstance(record, dict) else None + + def list_jobs(self) -> Dict[str, Dict[str, Any]]: + with self._lock: + return deepcopy(self._state["jobs"]) + + def upsert_sink_record(self, sink_key: str, payload: Dict[str, Any]) -> Dict[str, Any]: + with self._lock: + existing = self._state["sink_records"].get(sink_key, {}) + merged = {**existing, **deepcopy(payload)} + merged["sink_key"] = sink_key + merged.setdefault("created_at", existing.get("created_at") or _utc_now_iso()) + merged["updated_at"] = _utc_now_iso() + self._state["sink_records"][sink_key] = merged + self._persist() + return deepcopy(merged) + + def get_sink_record(self, sink_key: str) -> Optional[Dict[str, Any]]: + with self._lock: + record = self._state["sink_records"].get(sink_key) + return deepcopy(record) if isinstance(record, dict) else None diff --git a/plugins/teams_pipeline/subscriptions.py b/plugins/teams_pipeline/subscriptions.py new file mode 100644 index 0000000000..ff9cce3c9d --- /dev/null +++ b/plugins/teams_pipeline/subscriptions.py @@ -0,0 +1,249 @@ +"""Microsoft Graph subscription helpers for the Teams pipeline plugin.""" + +from __future__ import annotations + +from datetime import datetime, timedelta, timezone +from typing import Any + +from plugins.teams_pipeline.models import GraphSubscription +from plugins.teams_pipeline.store import TeamsPipelineStore, resolve_teams_pipeline_store_path +from tools.microsoft_graph_auth import MicrosoftGraphTokenProvider +from tools.microsoft_graph_client import MicrosoftGraphClient + + +def build_graph_client() -> MicrosoftGraphClient: + provider = MicrosoftGraphTokenProvider.from_env() + return MicrosoftGraphClient(provider) + + +def _parse_bool(value: Any, *, default: bool = False) -> bool: + if isinstance(value, bool): + return value + if isinstance(value, str): + lowered = value.strip().lower() + if lowered in {"1", "true", "yes", "on"}: + return True + if lowered in {"0", "false", "no", "off"}: + return False + return default + + +def _parse_int(value: Any, default: int) -> int: + try: + return int(value) + except (TypeError, ValueError): + return default + + +def _utc_now() -> datetime: + return datetime.now(timezone.utc) + + +def _utc_now_iso() -> str: + return _utc_now().replace(microsecond=0).isoformat().replace("+00:00", "Z") + + +def _parse_datetime(value: Any) -> datetime | None: + if value is None: + return None + text = str(value).strip() + if not text: + return None + if text.endswith("Z"): + text = f"{text[:-1]}+00:00" + parsed = datetime.fromisoformat(text) + if parsed.tzinfo is None: + return parsed.replace(tzinfo=timezone.utc) + return parsed.astimezone(timezone.utc) + + +def resolve_store_path(path: str | None) -> str: + return str(resolve_teams_pipeline_store_path(path)) + + +def build_store(path: str | None = None) -> TeamsPipelineStore: + return TeamsPipelineStore(resolve_store_path(path)) + + +def sync_graph_subscription_record( + store: TeamsPipelineStore, + subscription_payload: dict[str, Any], + *, + status: str | None = None, + renewed: bool = False, +) -> dict[str, Any]: + normalized = GraphSubscription.from_dict(subscription_payload).to_dict() + expiration = _parse_datetime(normalized.get("expiration_datetime")) + effective_status = status + if effective_status is None: + effective_status = "expired" if expiration and expiration <= _utc_now() else "active" + normalized["status"] = effective_status + if renewed: + normalized["latest_renewal_at"] = _utc_now_iso() + return store.upsert_subscription(normalized["subscription_id"], normalized) + + +def expected_client_state(raw: str | None = None) -> str | None: + if raw is None: + from os import getenv + + raw = getenv("MSGRAPH_WEBHOOK_CLIENT_STATE", "") + value = str(raw or "").strip() + return value or None + + +def is_managed_subscription( + store: TeamsPipelineStore, + subscription_payload: dict[str, Any], + *, + expected_client_state_value: str | None, +) -> bool: + subscription_id = str( + subscription_payload.get("subscription_id") or subscription_payload.get("id") or "" + ).strip() + if subscription_id and store.get_subscription(subscription_id): + return True + + if expected_client_state_value: + candidate_state = str( + subscription_payload.get("client_state") or subscription_payload.get("clientState") or "" + ).strip() + if candidate_state and candidate_state == expected_client_state_value: + return True + + return False + + +async def maintain_graph_subscriptions( + *, + client: MicrosoftGraphClient, + store: TeamsPipelineStore, + renew_within_hours: int = 24, + extend_hours: int = 24, + dry_run: bool = False, + client_state: str | None = None, +) -> dict[str, Any]: + threshold_hours = max(1, int(renew_within_hours)) + extend_hours = max(1, int(extend_hours)) + managed_client_state = expected_client_state(client_state) + now = _utc_now() + + remote_subscriptions = await client.collect_paginated("/subscriptions") + remote_ids: set[str] = set() + synced = 0 + renewed: list[dict[str, Any]] = [] + candidates: list[dict[str, Any]] = [] + skipped: list[dict[str, Any]] = [] + + for raw in remote_subscriptions: + if not isinstance(raw, dict): + continue + subscription_id = str(raw.get("id") or "").strip() + if not subscription_id: + continue + managed = is_managed_subscription( + store, + raw, + expected_client_state_value=managed_client_state, + ) + if not managed: + skipped.append( + { + "subscription_id": subscription_id, + "reason": "not_managed_by_teams_pipeline", + } + ) + continue + + remote_ids.add(subscription_id) + try: + sync_graph_subscription_record(store, raw) + synced += 1 + except Exception as exc: + skipped.append( + { + "subscription_id": subscription_id, + "reason": f"failed_to_sync_local_store: {exc}", + } + ) + continue + + expiration = _parse_datetime(raw.get("expirationDateTime")) + if expiration is None: + skipped.append({"subscription_id": subscription_id, "reason": "missing_expiration"}) + continue + + seconds_until_expiry = int((expiration - now).total_seconds()) + if seconds_until_expiry < 0: + store.upsert_subscription( + subscription_id, + { + "status": "expired", + "expiration_datetime": expiration.isoformat().replace("+00:00", "Z"), + }, + ) + skipped.append( + { + "subscription_id": subscription_id, + "reason": "already_expired", + "expiration_datetime": expiration.isoformat().replace("+00:00", "Z"), + } + ) + continue + + if seconds_until_expiry > threshold_hours * 3600: + skipped.append( + { + "subscription_id": subscription_id, + "reason": "not_due", + "expires_in_seconds": seconds_until_expiry, + } + ) + continue + + new_expiration = (max(now, expiration) + timedelta(hours=extend_hours)).replace( + microsecond=0 + ).isoformat().replace("+00:00", "Z") + candidate = { + "subscription_id": subscription_id, + "resource": raw.get("resource"), + "current_expiration": expiration.isoformat().replace("+00:00", "Z"), + "new_expiration": new_expiration, + } + candidates.append(candidate) + if dry_run: + continue + + patched = await client.patch_json( + f"/subscriptions/{subscription_id}", + json_body={"expirationDateTime": new_expiration}, + ) + merged = {**raw, **(patched or {}), "id": subscription_id, "expirationDateTime": new_expiration} + sync_graph_subscription_record(store, merged, status="active", renewed=True) + renewed.append({**candidate, "result": patched}) + + for subscription_id in store.list_subscriptions(): + if subscription_id in remote_ids: + continue + store.upsert_subscription( + subscription_id, + { + "status": "missing_remote", + "last_seen_missing_remote_at": _utc_now_iso(), + }, + ) + + return { + "success": True, + "dry_run": bool(dry_run), + "store_path": str(store.path), + "remote_subscription_count": len(remote_subscriptions), + "synced_subscription_count": synced, + "candidate_count": len(candidates), + "renewed_count": len(renewed), + "threshold_hours": threshold_hours, + "extend_hours": extend_hours, + "candidates": candidates, + "renewed": renewed, + "skipped": skipped, + } diff --git a/tests/gateway/test_teams_pipeline_runtime_wiring.py b/tests/gateway/test_teams_pipeline_runtime_wiring.py new file mode 100644 index 0000000000..d1f95b51ba --- /dev/null +++ b/tests/gateway/test_teams_pipeline_runtime_wiring.py @@ -0,0 +1,185 @@ +"""Tests for Teams pipeline runtime wiring into the gateway.""" + +from __future__ import annotations + +import sys +from types import ModuleType +from types import SimpleNamespace +from unittest.mock import MagicMock + +from gateway.config import Platform, PlatformConfig +from gateway.run import GatewayRunner +from plugins.teams_pipeline.runtime import ( + bind_gateway_runtime, + build_pipeline_runtime, + build_pipeline_runtime_config, +) + + +def test_gateway_runner_wires_teams_pipeline_runtime(monkeypatch): + runner = GatewayRunner.__new__(GatewayRunner) + runner.adapters = {Platform.MSGRAPH_WEBHOOK: object()} + runner._teams_pipeline_runtime_error = None + + calls: list[object] = [] + + def _bind(gateway_runner): + calls.append(gateway_runner) + return True + + monkeypatch.setattr("plugins.teams_pipeline.runtime.bind_gateway_runtime", _bind) + + GatewayRunner._wire_teams_pipeline_runtime(runner) + + assert calls == [runner] + + +def test_gateway_runner_skips_wiring_without_msgraph_adapter(monkeypatch): + runner = GatewayRunner.__new__(GatewayRunner) + runner.adapters = {Platform.TELEGRAM: MagicMock()} + runner._teams_pipeline_runtime_error = None + + called = False + + def _bind(_gateway_runner): + nonlocal called + called = True + return True + + monkeypatch.setattr("plugins.teams_pipeline.runtime.bind_gateway_runtime", _bind) + + GatewayRunner._wire_teams_pipeline_runtime(runner) + + assert called is False + + +def test_gateway_runner_skips_wiring_when_teams_pipeline_plugin_disabled(monkeypatch): + runner = GatewayRunner.__new__(GatewayRunner) + runner.adapters = {Platform.MSGRAPH_WEBHOOK: object()} + runner._teams_pipeline_runtime_error = None + + called = False + + def _bind(_gateway_runner): + nonlocal called + called = True + return True + + monkeypatch.setattr("plugins.teams_pipeline.runtime.bind_gateway_runtime", _bind) + monkeypatch.setattr( + "gateway.run._load_gateway_config", + lambda: {"plugins": {"enabled": []}}, + ) + + GatewayRunner._wire_teams_pipeline_runtime(runner) + + assert called is False + + +def test_runtime_config_disables_teams_delivery_without_target(): + gateway_config = SimpleNamespace( + platforms={ + Platform("teams"): PlatformConfig(enabled=True, extra={}), + } + ) + + config = build_pipeline_runtime_config(gateway_config) + + assert "teams_delivery" not in config + + +def test_build_pipeline_runtime_only_wires_sender_when_delivery_configured(monkeypatch): + gateway = SimpleNamespace( + config=SimpleNamespace( + platforms={ + Platform("teams"): PlatformConfig(enabled=True, extra={}), + } + ) + ) + + monkeypatch.setattr( + "plugins.teams_pipeline.runtime.build_graph_client", + lambda: object(), + ) + monkeypatch.setattr( + "plugins.teams_pipeline.runtime.resolve_teams_pipeline_store_path", + lambda: "/tmp/teams-pipeline-store.json", + ) + monkeypatch.setattr( + "plugins.teams_pipeline.runtime.TeamsPipelineStore", + lambda path: {"path": path}, + ) + + runtime = build_pipeline_runtime(gateway) + + assert runtime.teams_sender is None + + +def test_build_pipeline_runtime_skips_sender_when_adapter_layer_is_unavailable(monkeypatch): + gateway = SimpleNamespace( + config=SimpleNamespace( + platforms={ + Platform("teams"): PlatformConfig( + enabled=True, + extra={ + "delivery_mode": "graph", + "team_id": "team-1", + "channel_id": "channel-1", + }, + ), + } + ) + ) + + monkeypatch.setattr( + "plugins.teams_pipeline.runtime.build_graph_client", + lambda: object(), + ) + monkeypatch.setattr( + "plugins.teams_pipeline.runtime.resolve_teams_pipeline_store_path", + lambda: "/tmp/teams-pipeline-store.json", + ) + monkeypatch.setattr( + "plugins.teams_pipeline.runtime.TeamsPipelineStore", + lambda path: {"path": path}, + ) + monkeypatch.setitem( + sys.modules, + "plugins.platforms.teams.adapter", + ModuleType("plugins.platforms.teams.adapter"), + ) + + runtime = build_pipeline_runtime(gateway) + + assert runtime.teams_sender is None + + +def test_bind_gateway_runtime_leaves_scheduler_unchanged_on_failure(monkeypatch): + class FakeAdapter: + def __init__(self): + self.scheduler = None + + def set_notification_scheduler(self, scheduler): + self.scheduler = scheduler + + gateway = SimpleNamespace( + adapters={Platform.MSGRAPH_WEBHOOK: FakeAdapter()}, + config=SimpleNamespace( + platforms={ + Platform("teams"): PlatformConfig(enabled=True, extra={}), + } + ), + _teams_pipeline_runtime=None, + _teams_pipeline_runtime_error=None, + ) + + monkeypatch.setattr( + "plugins.teams_pipeline.runtime.build_pipeline_runtime", + lambda _gateway: (_ for _ in ()).throw(RuntimeError("boom")), + ) + + bound = bind_gateway_runtime(gateway) + + assert bound is False + assert gateway.adapters[Platform.MSGRAPH_WEBHOOK].scheduler is None + assert gateway._teams_pipeline_runtime_error == "boom" diff --git a/tests/hermes_cli/test_teams_pipeline_plugin_cli.py b/tests/hermes_cli/test_teams_pipeline_plugin_cli.py new file mode 100644 index 0000000000..309099f973 --- /dev/null +++ b/tests/hermes_cli/test_teams_pipeline_plugin_cli.py @@ -0,0 +1,214 @@ +"""Tests for the teams_pipeline plugin CLI.""" + +from __future__ import annotations + +import json +from argparse import ArgumentParser, Namespace +from types import SimpleNamespace + +import pytest + +from plugins.teams_pipeline.cli import register_cli, teams_pipeline_command +from plugins.teams_pipeline.store import TeamsPipelineStore + + +@pytest.fixture(autouse=True) +def _isolate(tmp_path, monkeypatch): + monkeypatch.setenv("HERMES_HOME", str(tmp_path)) + + +def _make_args(**kwargs): + defaults = { + "teams_pipeline_action": None, + "store_path": "", + "status": "", + "limit": 20, + "job_id": "", + "meeting_id": "", + "join_web_url": "", + "tenant_id": "", + "call_record_id": "", + "resource": "", + "notification_url": "", + "change_type": "updated", + "expiration": "", + "client_state": "", + "lifecycle_notification_url": "", + "latest_supported_tls_version": "v1_2", + "subscription_id": "", + "force_refresh": False, + "renew_within_hours": 24, + "extend_hours": 24, + "dry_run": False, + } + defaults.update(kwargs) + return Namespace(**defaults) + + +def test_register_cli_builds_tree(): + parser = ArgumentParser() + register_cli(parser) + args = parser.parse_args(["list"]) + assert args.teams_pipeline_action == "list" + + +def test_list_prints_recent_jobs(capsys, tmp_path): + store = TeamsPipelineStore(tmp_path / "teams_pipeline_store.json") + store.upsert_job( + "job-1", + { + "event_id": "evt-1", + "source_event_type": "updated", + "dedupe_key": "evt-1", + "status": "completed", + "meeting_ref": {"meeting_id": "meeting-1"}, + }, + ) + + teams_pipeline_command( + _make_args( + teams_pipeline_action="list", + store_path=str(tmp_path / "teams_pipeline_store.json"), + ) + ) + out = capsys.readouterr().out + assert "job-1" in out + assert "meeting-1" in out + + +def test_show_prints_job_json(capsys, tmp_path): + store = TeamsPipelineStore(tmp_path / "teams_pipeline_store.json") + store.upsert_job( + "job-1", + { + "event_id": "evt-1", + "source_event_type": "updated", + "dedupe_key": "evt-1", + "status": "completed", + "meeting_ref": {"meeting_id": "meeting-1"}, + }, + ) + + teams_pipeline_command( + _make_args( + teams_pipeline_action="show", + job_id="job-1", + store_path=str(tmp_path / "teams_pipeline_store.json"), + ) + ) + out = capsys.readouterr().out + payload = json.loads(out) + assert payload["job_id"] == "job-1" + assert payload["meeting_ref"]["meeting_id"] == "meeting-1" + + +def test_fetch_requires_meeting_identifier(capsys): + teams_pipeline_command(_make_args(teams_pipeline_action="fetch")) + out = capsys.readouterr().out + assert "meeting_id or join_web_url is required" in out + + +def test_subscriptions_lists_graph_subscriptions(monkeypatch, capsys): + class FakeClient: + async def collect_paginated(self, path): + assert path == "/subscriptions" + return [ + { + "id": "sub-1", + "resource": "communications/onlineMeetings/getAllTranscripts", + "changeType": "updated", + "expirationDateTime": "2026-05-05T00:00:00Z", + } + ] + + monkeypatch.setattr("plugins.teams_pipeline.cli.build_graph_client", lambda: FakeClient()) + teams_pipeline_command(_make_args(teams_pipeline_action="subscriptions")) + out = capsys.readouterr().out + assert "sub-1" in out + assert "getAllTranscripts" in out + + +def test_subscribe_defaults_to_created_for_transcript_resources(monkeypatch, capsys): + captured = {} + + class FakeClient: + async def post_json(self, path, json_body=None, headers=None): + captured["path"] = path + captured["json_body"] = json_body + return { + "id": "sub-transcript", + "resource": json_body["resource"], + "changeType": json_body["changeType"], + "notificationUrl": json_body["notificationUrl"], + "expirationDateTime": json_body["expirationDateTime"], + } + + monkeypatch.setattr("plugins.teams_pipeline.cli.build_graph_client", lambda: FakeClient()) + teams_pipeline_command( + _make_args( + teams_pipeline_action="subscribe", + resource="communications/onlineMeetings/getAllTranscripts", + notification_url="https://example.com/webhooks/msgraph", + change_type="", + ) + ) + payload = json.loads(capsys.readouterr().out) + assert captured["path"] == "/subscriptions" + assert captured["json_body"]["changeType"] == "created" + assert payload["changeType"] == "created" + + +def test_token_health_force_refresh(monkeypatch, capsys): + class FakeProvider: + def inspect_token_health(self): + return {"configured": True, "cache_state": "warm"} + + async def get_access_token(self, force_refresh=False): + assert force_refresh is True + return "token-123" + + monkeypatch.setattr( + "plugins.teams_pipeline.cli.MicrosoftGraphTokenProvider", + SimpleNamespace(from_env=lambda: FakeProvider()), + ) + teams_pipeline_command(_make_args(teams_pipeline_action="token-health", force_refresh=True)) + payload = json.loads(capsys.readouterr().out) + assert payload["configured"] is True + assert payload["last_refresh_succeeded"] is True + assert payload["access_token_length"] == len("token-123") + + +def test_validate_accepts_msgraph_credentials_for_graph_delivery(monkeypatch, capsys, tmp_path): + from gateway.config import Platform, PlatformConfig + + monkeypatch.setenv("MSGRAPH_TENANT_ID", "tenant") + monkeypatch.setenv("MSGRAPH_CLIENT_ID", "client") + monkeypatch.setenv("MSGRAPH_CLIENT_SECRET", "secret") + + gateway_config = SimpleNamespace( + platforms={ + Platform.MSGRAPH_WEBHOOK: PlatformConfig(enabled=True, extra={}), + Platform("teams"): PlatformConfig( + enabled=True, + extra={ + "delivery_mode": "graph", + "team_id": "team-1", + "channel_id": "channel-1", + }, + ), + } + ) + monkeypatch.setattr( + "plugins.teams_pipeline.cli.load_gateway_config", + lambda: gateway_config, + ) + + teams_pipeline_command( + _make_args( + teams_pipeline_action="validate", + store_path=str(tmp_path / "teams_pipeline_store.json"), + ) + ) + payload = json.loads(capsys.readouterr().out) + assert payload["ok"] is True + assert payload["issues"] == [] diff --git a/tests/plugins/test_teams_pipeline_plugin.py b/tests/plugins/test_teams_pipeline_plugin.py new file mode 100644 index 0000000000..3fb4728d23 --- /dev/null +++ b/tests/plugins/test_teams_pipeline_plugin.py @@ -0,0 +1,437 @@ +"""Tests for the Teams pipeline plugin package.""" + +from __future__ import annotations + +import asyncio +from types import SimpleNamespace +from pathlib import Path +from unittest.mock import MagicMock + +import pytest + +from hermes_cli.plugins import PluginContext, PluginManager, PluginManifest +from gateway.config import GatewayConfig, Platform, PlatformConfig +from plugins.teams_pipeline import register +from plugins.teams_pipeline.pipeline import TeamsMeetingPipeline +from plugins.teams_pipeline.store import TeamsPipelineStore +from plugins.teams_pipeline.models import MeetingArtifact + + +class FakeGraphClient: + def __init__(self) -> None: + self.downloaded = False + + +async def _transcript_meeting_resolver(client, *, meeting_id=None, join_web_url=None, tenant_id=None): + from plugins.teams_pipeline.models import TeamsMeetingRef + + return TeamsMeetingRef( + meeting_id=str(meeting_id), + tenant_id=tenant_id, + metadata={"subject": "Weekly Sync", "participants": [{"displayName": "Ada"}]}, + ) + + +async def _no_call_record(*args, **kwargs): + return None + + +def test_register_adds_cli_only(): + mgr = PluginManager() + manifest = PluginManifest(name="teams_pipeline") + ctx = PluginContext(manifest, mgr) + + register(ctx) + + assert "teams-pipeline" in mgr._cli_commands + entry = mgr._cli_commands["teams-pipeline"] + assert entry["plugin"] == "teams_pipeline" + assert callable(entry["setup_fn"]) + assert callable(entry["handler_fn"]) + + +def test_runtime_config_uses_existing_teams_platform_settings(): + from plugins.teams_pipeline.runtime import build_pipeline_runtime_config + + gateway_config = GatewayConfig( + platforms={ + Platform("teams"): PlatformConfig( + enabled=True, + extra={ + "delivery_mode": "graph", + "team_id": "team-1", + "channel_id": "channel-1", + "meeting_pipeline": { + "transcript_min_chars": 120, + "notion": {"enabled": True, "database_id": "db-1"}, + }, + }, + ) + } + ) + + runtime_config = build_pipeline_runtime_config(gateway_config) + + assert runtime_config["transcript_min_chars"] == 120 + assert runtime_config["notion"]["database_id"] == "db-1" + assert runtime_config["teams_delivery"] == { + "enabled": True, + "mode": "graph", + "team_id": "team-1", + "channel_id": "channel-1", + } + + +@pytest.mark.anyio +async def test_bind_gateway_runtime_attaches_scheduler(monkeypatch, tmp_path): + from plugins.teams_pipeline import runtime as runtime_module + + class FakeAdapter: + def __init__(self) -> None: + self.scheduler = None + + def set_notification_scheduler(self, scheduler) -> None: + self.scheduler = scheduler + + class FakePipeline: + def __init__(self) -> None: + self.notifications = [] + + async def run_notification(self, notification): + self.notifications.append(notification) + + adapter = FakeAdapter() + pipeline = FakePipeline() + gateway = SimpleNamespace( + adapters={Platform.MSGRAPH_WEBHOOK: adapter}, + config=GatewayConfig(platforms={}), + _teams_pipeline_runtime=None, + _teams_pipeline_runtime_error=None, + ) + + monkeypatch.setattr(runtime_module, "build_pipeline_runtime", lambda gateway_runner: pipeline) + + bound = runtime_module.bind_gateway_runtime(gateway) + + assert bound is True + assert gateway._teams_pipeline_runtime is pipeline + assert callable(adapter.scheduler) + + notification = {"id": "notif-1"} + await adapter.scheduler(notification, object()) + assert pipeline.notifications == [notification] + + +@pytest.mark.anyio +async def test_bind_gateway_runtime_drops_notifications_when_unavailable(monkeypatch): + from plugins.teams_pipeline import runtime as runtime_module + from tools.microsoft_graph_auth import MicrosoftGraphConfigError + + class FakeAdapter: + def __init__(self) -> None: + self.scheduler = None + + def set_notification_scheduler(self, scheduler) -> None: + self.scheduler = scheduler + + adapter = FakeAdapter() + gateway = SimpleNamespace( + adapters={Platform.MSGRAPH_WEBHOOK: adapter}, + config=GatewayConfig(platforms={}), + _teams_pipeline_runtime=None, + _teams_pipeline_runtime_error=None, + ) + + def _raise(_gateway_runner): + raise MicrosoftGraphConfigError("missing graph env") + + monkeypatch.setattr(runtime_module, "build_pipeline_runtime", _raise) + + bound = runtime_module.bind_gateway_runtime(gateway) + + assert bound is False + assert "missing graph env" in gateway._teams_pipeline_runtime_error + assert callable(adapter.scheduler) + await adapter.scheduler({"id": "notif-2"}, object()) + + +def test_store_persists_subscription_event_and_job_state(tmp_path): + store_path = tmp_path / "teams-store.json" + store = TeamsPipelineStore(store_path) + store.upsert_subscription( + "sub-1", + {"client_state": "abc", "resource": "communications/onlineMeetings"}, + ) + store.record_event_timestamp("evt-1", "2026-05-03T19:30:00Z") + store.upsert_job("job-1", {"status": "received", "event_id": "evt-1"}) + store.upsert_sink_record("notion:meeting-1", {"page_id": "page-1"}) + + reloaded = TeamsPipelineStore(store_path) + subscription = reloaded.get_subscription("sub-1") + job = reloaded.get_job("job-1") + sink = reloaded.get_sink_record("notion:meeting-1") + + assert subscription is not None + assert subscription["subscription_id"] == "sub-1" + assert subscription["client_state"] == "abc" + assert reloaded.get_event_timestamp("evt-1") == "2026-05-03T19:30:00Z" + assert job is not None + assert job["status"] == "received" + assert sink is not None + assert sink["page_id"] == "page-1" + + +def test_store_notification_receipts_are_idempotent(tmp_path): + store = TeamsPipelineStore(tmp_path / "teams-store.json") + notification = { + "subscriptionId": "sub-1", + "resource": "communications/onlineMeetings/meeting-1", + "changeType": "updated", + } + receipt_key = TeamsPipelineStore.build_notification_receipt_key(notification) + + assert store.record_notification_receipt(receipt_key, notification) is True + assert store.record_notification_receipt(receipt_key, notification) is False + assert store.has_notification_receipt(receipt_key) is True + + reloaded = TeamsPipelineStore(tmp_path / "teams-store.json") + assert reloaded.has_notification_receipt(receipt_key) is True + + +@pytest.mark.anyio +class TestTeamsMeetingPipeline: + async def test_transcript_first_path_persists_state_and_skips_recording(self, tmp_path, monkeypatch): + from plugins.teams_pipeline import pipeline as pipeline_module + + monkeypatch.setattr(pipeline_module, "resolve_meeting_reference", _transcript_meeting_resolver) + + async def _fetch_transcript(client, meeting_ref): + return ( + MeetingArtifact(artifact_type="transcript", artifact_id="tx-1", display_name="meeting.vtt"), + "Action: Send draft by Friday.\nDecision: Ship the transcript-first path.\nDetailed transcript content.", + ) + + async def _call_record(client, meeting_ref, *, call_record_id=None, allow_permission_errors=True): + return MeetingArtifact( + artifact_type="call_record", + artifact_id="call-1", + metadata={"metrics": {"participant_count": 4}}, + ) + + async def _summarize(**kwargs): + return pipeline_module.TeamsMeetingSummaryPayload( + meeting_ref=kwargs["resolved_meeting"], + title="Weekly Sync", + transcript_text=kwargs["transcript_text"], + summary="Short summary", + key_decisions=["Ship the transcript-first path."], + action_items=["Send draft by Friday."], + risks=["Timeline risk."], + confidence="high", + confidence_notes="Transcript available.", + source_artifacts=kwargs["artifacts"], + ) + + monkeypatch.setattr(pipeline_module, "fetch_preferred_transcript_text", _fetch_transcript) + monkeypatch.setattr(pipeline_module, "enrich_meeting_with_call_record", _call_record) + + store = TeamsPipelineStore(tmp_path / "teams-store.json") + pipeline = TeamsMeetingPipeline( + graph_client=FakeGraphClient(), + store=store, + config={"transcript_min_chars": 20}, + summarize_fn=_summarize, + ) + + job = await pipeline.run_notification( + { + "id": "notif-1", + "changeType": "updated", + "resource": "communications/onlineMeetings/meeting-123", + "resourceData": {"id": "meeting-123"}, + } + ) + + assert job.status == "completed" + assert job.selected_artifact_strategy == "transcript_first" + assert job.summary_payload is not None + assert job.summary_payload.summary == "Short summary" + stored = store.get_job(job.job_id) + assert stored is not None + assert stored["status"] == "completed" + + async def test_recording_fallback_uses_stt_and_updates_sink_records(self, tmp_path, monkeypatch): + from plugins.teams_pipeline import pipeline as pipeline_module + + monkeypatch.setattr(pipeline_module, "resolve_meeting_reference", _transcript_meeting_resolver) + + async def _no_transcript(client, meeting_ref): + return None, None + + async def _recordings(client, meeting_ref): + return [ + MeetingArtifact( + artifact_type="recording", + artifact_id="rec-1", + display_name="recording.mp4", + download_url="https://files.example/recording.mp4", + ) + ] + + async def _download(client, meeting_ref, recording, destination): + target = Path(destination) + target.write_bytes(b"video-bytes") + return {"path": str(target), "size_bytes": 11, "content_type": "video/mp4"} + + async def _prepare_audio(self, recording_path): + audio_path = recording_path.with_suffix(".wav") + audio_path.write_bytes(b"audio-bytes") + return audio_path + + def _transcribe(file_path, model): + return {"success": True, "transcript": "Action: Follow up with Legal.\nRisk: Budget approval pending.", "provider": "local"} + + async def _summarize(**kwargs): + return pipeline_module.TeamsMeetingSummaryPayload( + meeting_ref=kwargs["resolved_meeting"], + title="Weekly Sync", + transcript_text=kwargs["transcript_text"], + summary="Fallback summary", + key_decisions=[], + action_items=["Follow up with Legal."], + risks=["Budget approval pending."], + confidence="medium", + confidence_notes="Generated from STT fallback.", + source_artifacts=kwargs["artifacts"], + ) + + class FakeNotionWriter: + async def write_summary(self, payload, config, existing_record=None): + return {"page_id": existing_record.get("page_id") if existing_record else "page-1", "url": "https://notion.so/page-1"} + + async def _teams_sender(payload, config, existing_record=None): + return {"message_id": existing_record.get("message_id") if existing_record else "msg-1"} + + monkeypatch.setattr(pipeline_module, "fetch_preferred_transcript_text", _no_transcript) + monkeypatch.setattr(pipeline_module, "list_recording_artifacts", _recordings) + monkeypatch.setattr(pipeline_module, "download_recording_artifact", _download) + monkeypatch.setattr(pipeline_module.TeamsMeetingPipeline, "_prepare_audio_path", _prepare_audio) + monkeypatch.setattr(pipeline_module, "enrich_meeting_with_call_record", _no_call_record) + + store = TeamsPipelineStore(tmp_path / "teams-store.json") + pipeline = TeamsMeetingPipeline( + graph_client=FakeGraphClient(), + store=store, + config={ + "notion": {"enabled": True, "database_id": "db-1"}, + "teams_delivery": {"enabled": True, "channel_id": "channel-1"}, + }, + transcribe_fn=_transcribe, + summarize_fn=_summarize, + notion_writer=FakeNotionWriter(), + teams_sender=_teams_sender, + ) + + job = await pipeline.run_notification( + { + "id": "notif-2", + "changeType": "updated", + "resource": "communications/onlineMeetings/meeting-456", + "resourceData": {"id": "meeting-456"}, + } + ) + + assert job.status == "completed" + assert job.selected_artifact_strategy == "recording_stt_fallback" + assert job.summary_payload is not None + assert job.summary_payload.summary == "Fallback summary" + notion_record = store.get_sink_record("notion:meeting-456") + teams_record = store.get_sink_record("teams:meeting-456") + assert notion_record is not None + assert notion_record["page_id"] == "page-1" + assert teams_record is not None + assert teams_record["message_id"] == "msg-1" + + async def test_missing_transcript_and_recording_schedules_retry(self, tmp_path, monkeypatch): + from plugins.teams_pipeline import pipeline as pipeline_module + + monkeypatch.setattr(pipeline_module, "resolve_meeting_reference", _transcript_meeting_resolver) + monkeypatch.setattr(pipeline_module, "fetch_preferred_transcript_text", lambda *a, **kw: asyncio.sleep(0, result=(None, None))) + monkeypatch.setattr(pipeline_module, "list_recording_artifacts", lambda *a, **kw: asyncio.sleep(0, result=[])) + + store = TeamsPipelineStore(tmp_path / "teams-store.json") + pipeline = TeamsMeetingPipeline( + graph_client=FakeGraphClient(), + store=store, + config={}, + summarize_fn=lambda **kwargs: asyncio.sleep(0, result=None), + ) + + job = await pipeline.run_notification( + { + "id": "notif-3", + "changeType": "updated", + "resource": "communications/onlineMeetings/meeting-789", + "resourceData": {"id": "meeting-789"}, + } + ) + + assert job.status == "retry_scheduled" + assert job.error_info["retryable"] is True + assert "Recording unavailable" in job.error_info["message"] + + async def test_duplicate_notification_reuses_completed_job(self, tmp_path, monkeypatch): + from plugins.teams_pipeline import pipeline as pipeline_module + + monkeypatch.setattr(pipeline_module, "resolve_meeting_reference", _transcript_meeting_resolver) + + async def _fetch_transcript(client, meeting_ref): + return ( + MeetingArtifact(artifact_type="transcript", artifact_id="tx-dup", display_name="meeting.vtt"), + "Decision: Keep duplicate notifications idempotent.\nAction: Verify the cached job is reused.", + ) + + summarize_calls = 0 + + async def _summarize(**kwargs): + nonlocal summarize_calls + summarize_calls += 1 + return pipeline_module.TeamsMeetingSummaryPayload( + meeting_ref=kwargs["resolved_meeting"], + title="Weekly Sync", + transcript_text=kwargs["transcript_text"], + summary="Duplicate-safe summary", + key_decisions=["Keep duplicate notifications idempotent."], + action_items=["Verify the cached job is reused."], + confidence="high", + confidence_notes="Transcript available.", + source_artifacts=kwargs["artifacts"], + ) + + monkeypatch.setattr(pipeline_module, "fetch_preferred_transcript_text", _fetch_transcript) + monkeypatch.setattr(pipeline_module, "enrich_meeting_with_call_record", _no_call_record) + + store = TeamsPipelineStore(tmp_path / "teams-store.json") + pipeline = TeamsMeetingPipeline( + graph_client=FakeGraphClient(), + store=store, + config={"transcript_min_chars": 20}, + summarize_fn=_summarize, + ) + notification = { + "id": "notif-dup", + "changeType": "updated", + "resource": "communications/onlineMeetings/meeting-dup", + "resourceData": {"id": "meeting-dup"}, + } + + first_job = await pipeline.run_notification(notification) + second_job = await pipeline.run_notification(notification) + + assert first_job.status == "completed" + assert second_job.status == "completed" + assert second_job.job_id == first_job.job_id + assert summarize_calls == 1 + assert len(store.list_jobs()) == 1 + receipt_key = TeamsPipelineStore.build_notification_receipt_key(notification) + assert store.has_notification_receipt(receipt_key) is True