hermes-agent/plugins/platforms/teams/adapter.py
GodsBoy 93e25ceb13 feat(plugins): add standalone_sender_fn for out-of-process cron delivery
Plugin platforms (IRC, Teams, Google Chat) currently fail with
`No live adapter for platform '<name>'` when a `deliver=<plugin>` cron
job runs in a separate process from the gateway, even though the
platforms are eligible cron targets via `cron_deliver_env_var` (added
in #21306). Built-in platforms (Telegram, Discord, Slack, etc.) use
direct REST helpers in `tools/send_message_tool.py` so cron can deliver
without holding the gateway in the same process; plugin platforms
historically depended on `_gateway_runner_ref()` which returns `None`
out of process.

This change adds an optional `standalone_sender_fn` field to
`PlatformEntry` so plugins can register an ephemeral send path that
opens its own connection, sends, and closes without needing the live
adapter. The dispatch site in `_send_via_adapter` falls through to the
hook when the gateway runner is unavailable, with a descriptive error
when neither path applies. The hook is optional, so existing plugins
are unaffected.

Reference migrations land in the same change for IRC, Teams, and
Google Chat, exercising the hook across stdlib (asyncio + IRC protocol),
Bot Framework OAuth client_credentials, and Google service-account
flows respectively.

Security hardening on the new code paths:
* IRC: control-character stripping on chat_id and message body to
  block CRLF command injection; bounded nick-collision retries; JOIN
  before PRIVMSG so channels with the default `+n` mode accept the
  delivery.
* Teams: TEAMS_SERVICE_URL validated against an allowlist of known
  Bot Framework hosts (`smba.trafficmanager.net`,
  `smba.infra.gov.teams.microsoft.us`) to block SSRF; chat_id and
  tenant_id constrained to the documented Bot Framework character set;
  per-request timeouts so a slow STS endpoint cannot starve the
  activity POST.
* Google Chat: chat_id and thread_id validated against strict
  resource-name regexes; service-account refresh wrapped in
  `asyncio.wait_for` so a hung token endpoint cannot stall the
  scheduler.

Test coverage: 20 new tests covering happy path, missing-config errors,
network failure modes, and each defensive validation. Existing tests
unchanged. `bash scripts/run_tests.sh tests/tools/test_send_message_tool.py
tests/gateway/test_irc_adapter.py tests/gateway/test_teams.py
tests/gateway/test_google_chat.py` reports 341 passed, 0 regressions.

Documentation: new "Out-of-process cron delivery" section in
website/docs/developer-guide/adding-platform-adapters.md and an entry
in gateway/platforms/ADDING_A_PLATFORM.md naming the hook.
2026-05-09 02:56:29 -07:00

1177 lines
46 KiB
Python

"""
Microsoft Teams platform adapter for Hermes Agent.
Uses the microsoft-teams-apps SDK for authentication and activity processing.
Runs an aiohttp webhook server to receive messages from Teams.
Proactive messaging (send, typing) uses the SDK's App.send() method.
Requires:
pip install microsoft-teams-apps aiohttp
TEAMS_CLIENT_ID, TEAMS_CLIENT_SECRET, and TEAMS_TENANT_ID env vars
Configuration in config.yaml:
platforms:
teams:
enabled: true
extra:
client_id: "your-client-id" # or TEAMS_CLIENT_ID env var
client_secret: "your-secret" # or TEAMS_CLIENT_SECRET env var
tenant_id: "your-tenant-id" # or TEAMS_TENANT_ID env var
port: 3978 # or TEAMS_PORT env var
"""
from __future__ import annotations
import asyncio
import html
import json
import logging
import os
from typing import Any, Dict, Optional
from urllib.parse import quote
import httpx
try:
from aiohttp import web
AIOHTTP_AVAILABLE = True
except ImportError:
AIOHTTP_AVAILABLE = False
web = None # type: ignore[assignment]
try:
from microsoft_teams.apps import App, ActivityContext
from microsoft_teams.common.http.client import ClientOptions
from microsoft_teams.api import MessageActivity, ConversationReference
from microsoft_teams.api.activities.typing import TypingActivityInput
from microsoft_teams.api.activities.invoke.adaptive_card import AdaptiveCardInvokeActivity
from microsoft_teams.api.models.adaptive_card import (
AdaptiveCardActionCardResponse,
AdaptiveCardActionMessageResponse,
)
from microsoft_teams.api.models.invoke_response import InvokeResponse, AdaptiveCardInvokeResponse
from microsoft_teams.apps.http.adapter import (
HttpMethod,
HttpRequest,
HttpResponse,
HttpRouteHandler,
)
from microsoft_teams.cards import AdaptiveCard, ExecuteAction, TextBlock
TEAMS_SDK_AVAILABLE = True
except ImportError:
TEAMS_SDK_AVAILABLE = False
ClientOptions = None # type: ignore[assignment,misc]
App = None # type: ignore[assignment,misc]
ActivityContext = None # type: ignore[assignment,misc]
MessageActivity = None # type: ignore[assignment,misc]
ConversationReference = None # type: ignore[assignment,misc]
TypingActivityInput = None # type: ignore[assignment,misc]
AdaptiveCardInvokeActivity = None # type: ignore[assignment,misc]
AdaptiveCardActionCardResponse = None # type: ignore[assignment,misc]
AdaptiveCardActionMessageResponse = None # type: ignore[assignment,misc]
AdaptiveCardInvokeResponse = None # type: ignore[assignment,misc,union-attr]
InvokeResponse = None # type: ignore[assignment,misc]
HttpMethod = str # type: ignore[assignment,misc]
HttpRequest = None # type: ignore[assignment,misc]
HttpResponse = None # type: ignore[assignment,misc]
HttpRouteHandler = None # type: ignore[assignment,misc]
AdaptiveCard = None # type: ignore[assignment,misc]
ExecuteAction = None # type: ignore[assignment,misc]
TextBlock = None # type: ignore[assignment,misc]
from gateway.config import Platform, PlatformConfig
from gateway.platforms.helpers import MessageDeduplicator
from gateway.platforms.base import (
BasePlatformAdapter,
MessageEvent,
MessageType,
SendResult,
cache_image_from_url,
)
logger = logging.getLogger(__name__)
_DEFAULT_PORT = 3978
_WEBHOOK_PATH = "/api/messages"
def _parse_bool(value: Any, *, default: bool = False) -> bool:
if isinstance(value, bool):
return value
if isinstance(value, str):
normalized = value.strip().lower()
if normalized in {"1", "true", "yes", "on"}:
return True
if normalized in {"0", "false", "no", "off"}:
return False
return default
class _StaticAccessTokenProvider:
"""Minimal token-provider shim so outbound Graph delivery can reuse the shared client."""
def __init__(self, access_token: str):
self._access_token = str(access_token or "").strip()
async def get_access_token(self, *, force_refresh: bool = False) -> str:
del force_refresh
if not self._access_token:
raise ValueError("TEAMS_GRAPH_ACCESS_TOKEN is required for graph delivery mode.")
return self._access_token
def clear_cache(self) -> None:
return None
class TeamsSummaryWriter:
"""Pipeline-facing Teams outbound delivery surface.
This stays inside the existing Teams platform plugin so the meeting-pipeline
PR can reuse one Teams integration surface instead of introducing a second
adapter elsewhere in the gateway core.
"""
def __init__(
self,
platform_config: PlatformConfig | None = None,
*,
graph_client: Any | None = None,
transport: httpx.AsyncBaseTransport | None = None,
) -> None:
self._platform_config = platform_config
self._graph_client = graph_client
self._transport = transport
async def write_summary(
self,
payload: Any,
config: dict[str, Any] | None,
existing_record: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
merged = self._resolve_delivery_config(config)
if existing_record and not _parse_bool(merged.get("force_resend"), default=False):
return dict(existing_record)
mode = str(merged.get("delivery_mode") or merged.get("mode") or "").strip().lower()
if not mode:
if merged.get("incoming_webhook_url"):
mode = "incoming_webhook"
elif merged.get("chat_id") or (
merged.get("team_id") and merged.get("channel_id")
):
mode = "graph"
if mode == "incoming_webhook":
return await self._write_summary_via_incoming_webhook(payload, merged)
if mode == "graph":
return await self._write_summary_via_graph(payload, merged)
raise ValueError(
"Teams delivery_mode must be 'incoming_webhook' or 'graph'."
)
def _resolve_delivery_config(self, config: dict[str, Any] | None) -> dict[str, Any]:
merged: dict[str, Any] = {}
platform_cfg = self._platform_config
if platform_cfg is not None:
merged.update(dict(platform_cfg.extra or {}))
if platform_cfg.token and "access_token" not in merged:
merged["access_token"] = platform_cfg.token
if platform_cfg.home_channel:
merged.setdefault("channel_id", platform_cfg.home_channel.chat_id)
merged.update(dict(config or {}))
env_defaults = {
"delivery_mode": os.getenv("TEAMS_DELIVERY_MODE", ""),
"incoming_webhook_url": os.getenv("TEAMS_INCOMING_WEBHOOK_URL", ""),
"access_token": os.getenv("TEAMS_GRAPH_ACCESS_TOKEN", ""),
"team_id": os.getenv("TEAMS_TEAM_ID", ""),
"channel_id": os.getenv("TEAMS_CHANNEL_ID", ""),
"chat_id": os.getenv("TEAMS_CHAT_ID", ""),
}
for key, value in env_defaults.items():
if value and not merged.get(key):
merged[key] = value
return merged
async def _write_summary_via_incoming_webhook(
self,
payload: Any,
config: dict[str, Any],
) -> dict[str, Any]:
webhook_url = str(config.get("incoming_webhook_url") or "").strip()
if not webhook_url:
raise ValueError("TEAMS_INCOMING_WEBHOOK_URL is required for incoming_webhook mode.")
body = {"text": self._render_summary_markdown(payload)}
async with httpx.AsyncClient(timeout=20.0, transport=self._transport) as client:
response = await client.post(webhook_url, json=body)
response.raise_for_status()
return {
"delivery_mode": "incoming_webhook",
"webhook_url": webhook_url,
"status_code": response.status_code,
"delivered": True,
}
async def _write_summary_via_graph(
self,
payload: Any,
config: dict[str, Any],
) -> dict[str, Any]:
graph_client = self._build_graph_client(config)
chat_id = str(config.get("chat_id") or "").strip()
if chat_id:
path = f"/chats/{quote(chat_id, safe='')}/messages"
response = await graph_client.post_json(
path,
json_body={"body": {"contentType": "html", "content": self._render_summary_html(payload)}},
)
return {
"delivery_mode": "graph",
"target_type": "chat",
"chat_id": chat_id,
"message_id": (response or {}).get("id"),
"web_url": (response or {}).get("webUrl"),
}
team_id = str(config.get("team_id") or "").strip()
channel_id = str(config.get("channel_id") or "").strip()
if not team_id or not channel_id:
raise ValueError(
"Graph delivery mode requires chat_id, or both team_id and channel_id."
)
path = (
f"/teams/{quote(team_id, safe='')}/channels/"
f"{quote(channel_id, safe='')}/messages"
)
response = await graph_client.post_json(
path,
json_body={"body": {"contentType": "html", "content": self._render_summary_html(payload)}},
)
return {
"delivery_mode": "graph",
"target_type": "channel",
"team_id": team_id,
"channel_id": channel_id,
"message_id": (response or {}).get("id"),
"web_url": (response or {}).get("webUrl"),
}
def _build_graph_client(self, config: dict[str, Any]) -> Any:
if self._graph_client is not None:
return self._graph_client
from tools.microsoft_graph_auth import MicrosoftGraphTokenProvider
from tools.microsoft_graph_client import MicrosoftGraphClient
access_token = str(config.get("access_token") or "").strip()
if access_token:
return MicrosoftGraphClient(
_StaticAccessTokenProvider(access_token),
transport=self._transport,
)
return MicrosoftGraphClient(
MicrosoftGraphTokenProvider.from_env(),
transport=self._transport,
)
def _render_summary_markdown(self, payload: Any) -> str:
lines = [
f"**{self._title(payload)}**",
"",
f"Summary: {self._text(getattr(payload, 'summary', None), 'No summary available.')}",
"",
"Key decisions:",
*self._bullet_lines(getattr(payload, "key_decisions", None)),
"",
"Action items:",
*self._bullet_lines(getattr(payload, "action_items", None)),
"",
"Risks:",
*self._bullet_lines(getattr(payload, "risks", None)),
]
return "\n".join(lines)
def _render_summary_html(self, payload: Any) -> str:
sections = [
("Summary", [self._text(getattr(payload, "summary", None), "No summary available.")]),
("Key decisions", list(getattr(payload, "key_decisions", None) or [])),
("Action items", list(getattr(payload, "action_items", None) or [])),
("Risks", list(getattr(payload, "risks", None) or [])),
]
blocks = [f"<h2>{html.escape(self._title(payload))}</h2>"]
for heading, items in sections:
blocks.append(f"<h3>{html.escape(heading)}</h3>")
if len(items) == 1 and heading == "Summary":
blocks.append(f"<p>{html.escape(str(items[0]))}</p>")
continue
if items:
rendered = "".join(f"<li>{html.escape(str(item))}</li>" for item in items if str(item).strip())
blocks.append(rendered and f"<ul>{rendered}</ul>" or "<p>None</p>")
else:
blocks.append("<p>None</p>")
return "".join(blocks)
@staticmethod
def _title(payload: Any) -> str:
title = getattr(payload, "title", None)
if title:
return str(title)
meeting_ref = getattr(payload, "meeting_ref", None)
meeting_id = getattr(meeting_ref, "meeting_id", None) if meeting_ref else None
return f"Meeting {meeting_id or 'summary'}"
@staticmethod
def _text(value: Any, default: str) -> str:
text = str(value or "").strip()
return text or default
@classmethod
def _bullet_lines(cls, values: Any) -> list[str]:
items = [str(item).strip() for item in (values or []) if str(item).strip()]
return [f"- {item}" for item in items] or ["- None"]
class _AiohttpBridgeAdapter:
"""HttpServerAdapter that bridges the Teams SDK into an aiohttp server.
Without a custom adapter, ``App()`` unconditionally imports fastapi/uvicorn
and allocates a ``FastAPI()`` instance. This bridge captures the SDK's
route registrations and wires them into our own aiohttp ``Application``.
"""
def __init__(self, aiohttp_app: "web.Application"):
self._aiohttp_app = aiohttp_app
def register_route(self, method: "HttpMethod", path: str, handler: "HttpRouteHandler") -> None:
"""Register an SDK route handler as an aiohttp route."""
async def _aiohttp_handler(request: "web.Request") -> "web.Response":
body = await request.json()
headers = dict(request.headers)
result: "HttpResponse" = await handler(HttpRequest(body=body, headers=headers))
status = result.get("status", 200)
resp_body = result.get("body")
if resp_body is not None:
return web.Response(
status=status,
body=json.dumps(resp_body),
content_type="application/json",
)
return web.Response(status=status)
self._aiohttp_app.router.add_route(method, path, _aiohttp_handler)
def serve_static(self, path: str, directory: str) -> None:
pass
async def start(self, port: int) -> None:
raise NotImplementedError("aiohttp server is managed by the adapter")
async def stop(self) -> None:
pass
def check_requirements() -> bool:
"""Return True when all Teams dependencies and credentials are present."""
return TEAMS_SDK_AVAILABLE and AIOHTTP_AVAILABLE
def validate_config(config) -> bool:
"""Return True when the config has the minimum required credentials."""
extra = getattr(config, "extra", {}) or {}
client_id = os.getenv("TEAMS_CLIENT_ID") or extra.get("client_id", "")
client_secret = os.getenv("TEAMS_CLIENT_SECRET") or extra.get("client_secret", "")
tenant_id = os.getenv("TEAMS_TENANT_ID") or extra.get("tenant_id", "")
return bool(client_id and client_secret and tenant_id)
def is_connected(config) -> bool:
"""Check whether Teams is configured (env or config.yaml)."""
return validate_config(config)
def _env_enablement() -> dict | None:
"""Seed ``PlatformConfig.extra`` from env vars during gateway config load.
Called by the platform registry's env-enablement hook BEFORE adapter
construction, so ``gateway status`` and ``get_connected_platforms()``
reflect env-only configuration without instantiating the Teams SDK.
Returns ``None`` when Teams isn't minimally configured.
The special ``home_channel`` key in the returned dict becomes a proper
``HomeChannel`` dataclass on the ``PlatformConfig`` via the core hook.
"""
client_id = os.getenv("TEAMS_CLIENT_ID", "").strip()
client_secret = os.getenv("TEAMS_CLIENT_SECRET", "").strip()
tenant_id = os.getenv("TEAMS_TENANT_ID", "").strip()
if not (client_id and client_secret and tenant_id):
return None
seed: dict = {
"client_id": client_id,
"client_secret": client_secret,
"tenant_id": tenant_id,
}
port = os.getenv("TEAMS_PORT", "").strip()
if port:
try:
seed["port"] = int(port)
except ValueError:
pass
service_url = os.getenv("TEAMS_SERVICE_URL", "").strip()
if service_url:
seed["service_url"] = service_url
home = os.getenv("TEAMS_HOME_CHANNEL", "").strip()
if home:
seed["home_channel"] = {
"chat_id": home,
"name": os.getenv("TEAMS_HOME_CHANNEL_NAME", "Home"),
}
return seed
# Bot Framework default service URL for the global Teams endpoint. Some
# regional/government tenants need a different host (e.g.
# ``https://smba.infra.gov.teams.microsoft.us/``) which can be supplied via
# ``TEAMS_SERVICE_URL`` or ``extra['service_url']``.
_DEFAULT_TEAMS_SERVICE_URL = "https://smba.trafficmanager.net/teams/"
# Allowlist of Bot Framework service hosts that may receive a freshly
# minted bearer token. Operator-supplied URLs are matched against this
# allowlist to block SSRF / token-exfiltration via a tampered env var.
_ALLOWED_TEAMS_SERVICE_HOSTS = frozenset({
"smba.trafficmanager.net",
"smba.infra.gov.teams.microsoft.us",
})
# Conservative pattern for Bot Framework conversation IDs. Real values
# combine digits, colons, hyphens, dots, '@', and the ``thread.skype`` /
# ``thread.tacv2`` suffixes; reject anything outside this set so a hostile
# value cannot path-traverse out of ``/v3/conversations/<id>/activities``.
import re as _re_teams
_TEAMS_CONV_ID_RE = _re_teams.compile(r"^[A-Za-z0-9:@\-_.]+$")
def _validate_teams_service_url(raw: str) -> Optional[str]:
"""Return a normalized service URL or ``None`` if it is not allowed.
Requires ``https://`` and a host in ``_ALLOWED_TEAMS_SERVICE_HOSTS``.
The trailing slash is added if absent so callers can append
``v3/conversations/...`` without double slashes.
"""
if not raw:
return None
try:
from urllib.parse import urlparse
parsed = urlparse(raw)
except Exception:
return None
if parsed.scheme != "https":
return None
if parsed.hostname not in _ALLOWED_TEAMS_SERVICE_HOSTS:
return None
normalized = raw if raw.endswith("/") else raw + "/"
return normalized
async def _standalone_send(
pconfig,
chat_id: str,
message: str,
*,
thread_id: Optional[str] = None,
media_files: Optional[list] = None,
force_document: bool = False,
) -> Dict[str, Any]:
"""Acquire a Bot Framework bearer token and POST a single message activity.
Used by ``tools/send_message_tool._send_via_adapter`` when the gateway
runner is not in this process (e.g. ``hermes cron`` running as a
separate process from ``hermes gateway``). Without this hook,
``deliver=teams`` cron jobs fail with ``No live adapter for platform``.
Configuration: requires ``TEAMS_CLIENT_ID``, ``TEAMS_CLIENT_SECRET``,
``TEAMS_TENANT_ID``, ``TEAMS_HOME_CHANNEL`` (the conversation ID), and
optionally ``TEAMS_SERVICE_URL`` (Bot Framework service host; must be
a known Bot Framework endpoint, see ``_ALLOWED_TEAMS_SERVICE_HOSTS``).
Security: ``service_url`` is validated against an allowlist of known
Bot Framework hosts to block SSRF / token-exfiltration via a tampered
env var. ``chat_id`` is validated to match the documented Bot
Framework ID character set so it cannot escape the URL path.
``media_files`` and ``force_document`` are accepted for signature
parity but not implemented for the standalone path; messages with
attachments will send as text-only. The live adapter handles
attachments via the SDK.
"""
extra = getattr(pconfig, "extra", {}) or {}
client_id = os.getenv("TEAMS_CLIENT_ID") or extra.get("client_id", "")
client_secret = os.getenv("TEAMS_CLIENT_SECRET") or extra.get("client_secret", "")
tenant_id = os.getenv("TEAMS_TENANT_ID") or extra.get("tenant_id", "")
if not (client_id and client_secret and tenant_id):
return {"error": "Teams standalone send: TEAMS_CLIENT_ID, TEAMS_CLIENT_SECRET, and TEAMS_TENANT_ID are all required"}
raw_service_url = (
os.getenv("TEAMS_SERVICE_URL")
or extra.get("service_url", "")
or _DEFAULT_TEAMS_SERVICE_URL
)
service_url = _validate_teams_service_url(raw_service_url)
if service_url is None:
return {"error": (
f"Teams standalone send: TEAMS_SERVICE_URL host is not on the "
f"Bot Framework allowlist; expected one of "
f"{sorted(_ALLOWED_TEAMS_SERVICE_HOSTS)}"
)}
# Bot Framework conversation IDs are restricted to a known character
# set; anything else means a tampered chat_id trying to break out of
# the URL path.
if not chat_id:
return {"error": "Teams standalone send: chat_id (conversation ID) is required"}
if not _TEAMS_CONV_ID_RE.match(chat_id):
return {"error": "Teams standalone send: chat_id contains characters outside the Bot Framework conversation ID set"}
if not _TEAMS_CONV_ID_RE.match(tenant_id):
return {"error": "Teams standalone send: TEAMS_TENANT_ID contains characters outside the expected set"}
token_url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token"
activities_url = f"{service_url}v3/conversations/{chat_id}/activities"
if not AIOHTTP_AVAILABLE:
return {"error": "Teams standalone send: aiohttp not installed"}
try:
import aiohttp as _aiohttp
# Per-request timeouts so a slow STS endpoint cannot starve the
# subsequent activity POST of its budget.
per_request_timeout = _aiohttp.ClientTimeout(total=15.0)
async with _aiohttp.ClientSession() as session:
async with session.post(
token_url,
data={
"grant_type": "client_credentials",
"client_id": client_id,
"client_secret": client_secret,
"scope": "https://api.botframework.com/.default",
},
headers={"Content-Type": "application/x-www-form-urlencoded"},
timeout=per_request_timeout,
) as token_resp:
if token_resp.status >= 400:
body = await token_resp.text()
return {"error": f"Teams standalone send: token request failed ({token_resp.status}): {body[:300]}"}
token_payload = await token_resp.json()
access_token = token_payload.get("access_token")
if not access_token:
return {"error": "Teams standalone send: token response missing access_token"}
activity = {
"type": "message",
"text": message,
"textFormat": "markdown",
}
async with session.post(
activities_url,
json=activity,
headers={
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
},
timeout=per_request_timeout,
) as send_resp:
if send_resp.status >= 400:
body = await send_resp.text()
return {"error": f"Teams standalone send: activity post failed ({send_resp.status}): {body[:300]}"}
send_payload = await send_resp.json()
return {
"success": True,
"message_id": send_payload.get("id"),
}
except asyncio.CancelledError:
raise
except Exception as e:
logger.debug("Teams standalone send raised", exc_info=True)
return {"error": f"Teams standalone send failed: {e}"}
# Keep the old name as an alias so existing test imports don't break.
check_teams_requirements = check_requirements
class TeamsAdapter(BasePlatformAdapter):
"""Microsoft Teams adapter using the microsoft-teams-apps SDK."""
MAX_MESSAGE_LENGTH = 28000 # Teams text message limit (~28 KB)
def __init__(self, config: PlatformConfig):
super().__init__(config, Platform("teams"))
extra = config.extra or {}
self._client_id = extra.get("client_id") or os.getenv("TEAMS_CLIENT_ID", "")
self._client_secret = extra.get("client_secret") or os.getenv("TEAMS_CLIENT_SECRET", "")
self._tenant_id = extra.get("tenant_id") or os.getenv("TEAMS_TENANT_ID", "")
self._port = int(extra.get("port") or os.getenv("TEAMS_PORT", str(_DEFAULT_PORT)))
self._app: Optional["App"] = None
self._runner: Optional["web.AppRunner"] = None
self._dedup = MessageDeduplicator(max_size=1000)
# Maps chat_id → ConversationReference captured from incoming messages.
# Used to send cards with the correct conversation type (personal/group/channel).
self._conv_refs: Dict[str, Any] = {}
async def connect(self) -> bool:
if not TEAMS_SDK_AVAILABLE:
self._set_fatal_error(
"MISSING_SDK",
"microsoft-teams-apps not installed. Run: pip install microsoft-teams-apps",
retryable=False,
)
return False
if not AIOHTTP_AVAILABLE:
self._set_fatal_error(
"MISSING_SDK",
"aiohttp not installed. Run: pip install aiohttp",
retryable=False,
)
return False
if not self._client_id or not self._client_secret or not self._tenant_id:
self._set_fatal_error(
"MISSING_CREDENTIALS",
"TEAMS_CLIENT_ID, TEAMS_CLIENT_SECRET, and TEAMS_TENANT_ID are all required",
retryable=False,
)
return False
try:
# Set up aiohttp app first — the bridge adapter wires SDK routes into it
aiohttp_app = web.Application()
aiohttp_app.router.add_get("/health", lambda _: web.Response(text="ok"))
self._app = App(
client_id=self._client_id,
client_secret=self._client_secret,
tenant_id=self._tenant_id,
http_server_adapter=_AiohttpBridgeAdapter(aiohttp_app),
client=ClientOptions(headers={"User-Agent": "Hermes"}),
)
# Register message handler before initialize()
@self._app.on_message
async def _handle_message(ctx: ActivityContext[MessageActivity]):
await self._on_message(ctx)
@self._app.on_card_action
async def _handle_card_action(
ctx: ActivityContext[AdaptiveCardInvokeActivity],
) -> InvokeResponse[AdaptiveCardActionMessageResponse]:
return await self._on_card_action(ctx)
# initialize() calls register_route() on the bridge, which adds
# POST /api/messages to aiohttp_app automatically
await self._app.initialize()
self._runner = web.AppRunner(aiohttp_app)
await self._runner.setup()
site = web.TCPSite(self._runner, "0.0.0.0", self._port)
await site.start()
self._running = True
self._mark_connected()
logger.info(
"[teams] Webhook server listening on 0.0.0.0:%d%s",
self._port,
_WEBHOOK_PATH,
)
return True
except Exception as e:
self._set_fatal_error(
"CONNECT_FAILED",
f"Teams connection failed: {e}",
retryable=True,
)
logger.error("[teams] Failed to connect: %s", e)
return False
async def disconnect(self) -> None:
self._running = False
if self._runner:
await self._runner.cleanup()
self._runner = None
self._app = None
self._mark_disconnected()
logger.info("[teams] Disconnected")
async def _on_message(self, ctx: ActivityContext[MessageActivity]) -> None:
"""Process an incoming Teams message and dispatch to the gateway."""
activity = ctx.activity
# Self-message filter
bot_id = self._app.id if self._app else None
if bot_id and getattr(activity.from_, "id", None) == bot_id:
return
# Deduplication
msg_id = getattr(activity, "id", None)
if msg_id and self._dedup.is_duplicate(msg_id):
return
# Cache the conversation reference for proactive sends (approval cards, etc.)
conv_id = getattr(activity.conversation, "id", None)
if conv_id:
self._conv_refs[conv_id] = ctx.conversation_ref
# Extract text — strip bot @mentions
text = ""
if hasattr(activity, "text") and activity.text:
text = activity.text
# Strip <at>BotName</at> HTML tags that Teams prepends for @mentions
if "<at>" in text:
import re
text = re.sub(r"<at>[^<]*</at>\s*", "", text).strip()
# Determine chat type from conversation
conv = activity.conversation
conv_type = getattr(conv, "conversation_type", None) or ""
if conv_type == "personal":
chat_type = "dm"
elif conv_type == "groupChat":
chat_type = "group"
elif conv_type == "channel":
chat_type = "channel"
else:
chat_type = "dm"
# Build source
from_account = activity.from_
user_id = getattr(from_account, "aad_object_id", None) or getattr(from_account, "id", "")
user_name = getattr(from_account, "name", None) or ""
source = self.build_source(
chat_id=conv.id,
chat_name=getattr(conv, "name", None) or "",
chat_type=chat_type,
user_id=str(user_id),
user_name=user_name,
guild_id=getattr(conv, "tenant_id", None) or self._tenant_id,
)
# Handle image attachments
media_urls = []
media_types = []
for att in getattr(activity, "attachments", None) or []:
content_url = getattr(att, "content_url", None)
content_type = getattr(att, "content_type", None) or ""
if content_url and content_type.startswith("image/"):
try:
cached = await cache_image_from_url(content_url)
if cached:
media_urls.append(cached)
media_types.append(content_type)
except Exception as e:
logger.warning("[teams] Failed to cache image attachment: %s", e)
msg_type = MessageType.PHOTO if media_urls else MessageType.TEXT
event = MessageEvent(
text=text,
source=source,
message_type=msg_type,
media_urls=media_urls,
media_types=media_types,
message_id=msg_id,
)
await self.handle_message(event)
async def _send_card(self, chat_id: str, card: "AdaptiveCard") -> "Any":
"""Send an AdaptiveCard, using a stored ConversationReference when available."""
from microsoft_teams.api import MessageActivityInput
conv_ref = self._conv_refs.get(chat_id)
if conv_ref and self._app:
activity = MessageActivityInput().add_card(card)
return await self._app.activity_sender.send(activity, conv_ref)
elif self._app:
return await self._app.send(chat_id, card)
return None
async def _on_card_action(
self, ctx: "ActivityContext[AdaptiveCardInvokeActivity]"
) -> "InvokeResponse[AdaptiveCardActionMessageResponse]":
"""Handle an Adaptive Card Action.Execute button click."""
from tools.approval import resolve_gateway_approval, has_blocking_approval
action = ctx.activity.value.action
data = action.data or {}
hermes_action = data.get("hermes_action", "")
session_key = data.get("session_key", "")
if not hermes_action or not session_key:
return InvokeResponse(
status=200,
body=AdaptiveCardActionMessageResponse(value="Unknown action."),
)
# Only authorized users may click approval buttons.
# Default-deny: require either TEAMS_ALLOWED_USERS or an explicit
# TEAMS_ALLOW_ALL_USERS=true opt-in. Without one of these set, the
# bot silently treated every clicker as authorized — meaning any
# Teams user who could message the bot could approve dangerous commands.
allowed_csv = os.getenv("TEAMS_ALLOWED_USERS", "").strip()
allow_all = os.getenv("TEAMS_ALLOW_ALL_USERS", "").strip().lower() in ("1", "true", "yes")
if not allow_all:
if not allowed_csv:
logger.warning(
"[teams] card action rejected: TEAMS_ALLOWED_USERS not configured "
"and TEAMS_ALLOW_ALL_USERS not set — default deny"
)
return InvokeResponse(
status=200,
body=AdaptiveCardActionMessageResponse(
value="⛔ Approval buttons require TEAMS_ALLOWED_USERS to be configured."
),
)
from_account = ctx.activity.from_
clicker_id = getattr(from_account, "aad_object_id", None) or getattr(from_account, "id", "")
allowed_ids = {uid.strip() for uid in allowed_csv.split(",") if uid.strip()}
if "*" not in allowed_ids and clicker_id not in allowed_ids:
logger.warning("[teams] Unauthorized card action by %s — ignoring", clicker_id)
return InvokeResponse(
status=200,
body=AdaptiveCardActionMessageResponse(value="⛔ Not authorized."),
)
choice_map = {
"approve_once": "once",
"approve_session": "session",
"approve_always": "always",
"deny": "deny",
}
choice = choice_map.get(hermes_action)
if not choice:
return InvokeResponse(
status=200,
body=AdaptiveCardActionMessageResponse(value="Unknown action."),
)
if not has_blocking_approval(session_key):
return InvokeResponse(
status=200,
body=AdaptiveCardActionCardResponse(
value=AdaptiveCard()
.with_version("1.4")
.with_body([TextBlock(text="⚠️ Approval already resolved or expired.", wrap=True)])
),
)
resolve_gateway_approval(session_key, choice)
label_map = {
"once": "✅ Allowed (once)",
"session": "✅ Allowed (session)",
"always": "✅ Always allowed",
"deny": "❌ Denied",
}
cmd = data.get("cmd", "")
desc = data.get("desc", "")
body = []
if cmd:
body.append(TextBlock(text="⚠️ Command Approval Required", wrap=True, weight="Bolder"))
body.append(TextBlock(text=f"```\n{cmd}\n```", wrap=True))
if desc:
body.append(TextBlock(text=f"Reason: {desc}", wrap=True, isSubtle=True))
body.append(TextBlock(text=label_map[choice], wrap=True, weight="Bolder"))
return InvokeResponse(
status=200,
body=AdaptiveCardActionCardResponse(
value=AdaptiveCard().with_version("1.4").with_body(body)
),
)
async def send_exec_approval(
self,
chat_id: str,
command: str,
session_key: str,
description: str = "dangerous command",
metadata: Optional[Dict[str, Any]] = None,
) -> SendResult:
"""Send an Adaptive Card approval prompt with Allow/Deny buttons."""
if not self._app:
return SendResult(success=False, error="Teams app not initialized")
cmd_preview = command[:2000] + "..." if len(command) > 2000 else command
# Truncated for button data payload — just enough to reconstruct the card body.
btn_data_base = {
"session_key": session_key,
"cmd": command[:200] + "..." if len(command) > 200 else command,
"desc": description,
}
card = (
AdaptiveCard()
.with_version("1.4")
.with_body([
TextBlock(text="⚠️ Command Approval Required", wrap=True, weight="Bolder"),
TextBlock(text=f"```\n{cmd_preview}\n```", wrap=True),
TextBlock(text=f"Reason: {description}", wrap=True, isSubtle=True),
])
.with_actions([
ExecuteAction(
title="Allow Once",
verb="hermes_approve",
data={**btn_data_base, "hermes_action": "approve_once"},
style="positive",
),
ExecuteAction(
title="Allow Session",
verb="hermes_approve",
data={**btn_data_base, "hermes_action": "approve_session"},
),
ExecuteAction(
title="Always Allow",
verb="hermes_approve",
data={**btn_data_base, "hermes_action": "approve_always"},
),
ExecuteAction(
title="Deny",
verb="hermes_approve",
data={**btn_data_base, "hermes_action": "deny"},
style="destructive",
),
])
)
try:
result = await self._send_card(chat_id, card)
message_id = getattr(result, "id", None) if result else None
return SendResult(success=True, message_id=message_id)
except Exception as e:
logger.error("[teams] send_exec_approval failed: %s", e, exc_info=True)
return SendResult(success=False, error=str(e), retryable=True)
async def send(
self,
chat_id: str,
content: str,
reply_to: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> SendResult:
if not self._app:
return SendResult(success=False, error="Teams app not initialized")
formatted = self.format_message(content)
chunks = self.truncate_message(formatted)
last_message_id = None
for chunk in chunks:
try:
if reply_to and reply_to.isdigit() and reply_to != "0":
try:
result = await self._app.reply(chat_id, reply_to, chunk)
except Exception as reply_err:
# Group chats 400 on threaded sends; the Teams SDK
# doesn't expose typed HTTP errors, so fall back on
# any exception and log for diagnostics.
logger.debug(
"Teams reply() failed, falling back to flat send: %s",
reply_err,
)
result = await self._app.send(chat_id, chunk)
else:
result = await self._app.send(chat_id, chunk)
last_message_id = getattr(result, "id", None)
except Exception as e:
return SendResult(success=False, error=str(e), retryable=True)
return SendResult(success=True, message_id=last_message_id)
async def send_typing(self, chat_id: str, metadata: Optional[Dict[str, Any]] = None) -> None:
if not self._app:
return
try:
await self._app.send(chat_id, TypingActivityInput())
except Exception:
pass
async def send_image(
self,
chat_id: str,
image_url: str,
caption: Optional[str] = None,
reply_to: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> SendResult:
if not self._app:
return SendResult(success=False, error="Teams app not initialized")
try:
import base64
import mimetypes
from microsoft_teams.api import Attachment, MessageActivityInput
if image_url.startswith("http://") or image_url.startswith("https://"):
content_url = image_url
mime_type = "image/png"
else:
# Local path — encode as base64 data URI
path = image_url.removeprefix("file://")
mime_type = mimetypes.guess_type(path)[0] or "image/png"
with open(path, "rb") as f:
content_url = f"data:{mime_type};base64,{base64.b64encode(f.read()).decode()}"
attachment = Attachment(content_type=mime_type, content_url=content_url)
activity = MessageActivityInput().add_attachments(attachment)
if caption:
activity = activity.add_text(caption)
conv_ref = self._conv_refs.get(chat_id)
if conv_ref:
result = await self._app.activity_sender.send(activity, conv_ref)
else:
result = await self._app.send(chat_id, activity)
return SendResult(success=True, message_id=getattr(result, "id", None))
except Exception as e:
logger.error("[teams] send_image failed: %s", e, exc_info=True)
return SendResult(success=False, error=str(e), retryable=True)
async def send_image_file(
self,
chat_id: str,
image_path: str,
caption: Optional[str] = None,
reply_to: Optional[str] = None,
**kwargs,
) -> SendResult:
return await self.send_image(
chat_id=chat_id,
image_url=image_path,
caption=caption,
reply_to=reply_to,
)
async def get_chat_info(self, chat_id: str) -> dict:
return {"name": chat_id, "type": "unknown", "chat_id": chat_id}
# ── Interactive setup ─────────────────────────────────────────────────────────
def interactive_setup() -> None:
"""Guide the user through Teams setup using the Teams CLI."""
from hermes_cli.config import (
get_env_value,
save_env_value,
)
from hermes_cli.cli_output import (
prompt,
prompt_yes_no,
print_info,
print_success,
print_warning,
)
existing_id = get_env_value("TEAMS_CLIENT_ID")
if existing_id:
print_info(f"Teams: already configured (app ID: {existing_id})")
if not prompt_yes_no("Reconfigure Teams?", False):
return
print_info("You'll need the Teams CLI. If you haven't already:")
print_info(" npm install -g @microsoft/teams.cli@preview")
print_info(" teams login")
print()
print_info("Then expose port 3978 publicly (devtunnel / ngrok / cloudflared),")
print_info("and create your bot:")
print_info(" teams app create --name \"Hermes\" --endpoint \"https://<tunnel>/api/messages\"")
print()
print_info("The CLI will print CLIENT_ID, CLIENT_SECRET, and TENANT_ID. Paste them below.")
print()
client_id = prompt("Client ID", default=existing_id or "")
if not client_id:
print_warning("Client ID is required — skipping Teams setup")
return
save_env_value("TEAMS_CLIENT_ID", client_id.strip())
client_secret = prompt("Client secret", default=get_env_value("TEAMS_CLIENT_SECRET") or "", password=True)
if not client_secret:
print_warning("Client secret is required — skipping Teams setup")
return
save_env_value("TEAMS_CLIENT_SECRET", client_secret.strip())
tenant_id = prompt("Tenant ID", default=get_env_value("TEAMS_TENANT_ID") or "")
if not tenant_id:
print_warning("Tenant ID is required — skipping Teams setup")
return
save_env_value("TEAMS_TENANT_ID", tenant_id.strip())
print()
print_info("To find your AAD object ID for the allowlist: teams status --verbose")
if prompt_yes_no("Restrict access to specific users? (recommended)", True):
allowed = prompt(
"Allowed AAD object IDs (comma-separated)",
default=get_env_value("TEAMS_ALLOWED_USERS") or "",
)
if allowed:
save_env_value("TEAMS_ALLOWED_USERS", allowed.replace(" ", ""))
print_success("Allowlist configured")
else:
save_env_value("TEAMS_ALLOWED_USERS", "")
else:
save_env_value("TEAMS_ALLOW_ALL_USERS", "true")
print_warning("⚠️ Open access — anyone who can message the bot can command it.")
print()
print_success("Teams configuration saved to ~/.hermes/.env")
print_info("Install the app in Teams: teams app install --id <teamsAppId>")
print_info("Restart the gateway: hermes gateway restart")
# ── Plugin entry point ────────────────────────────────────────────────────────
def register(ctx) -> None:
"""Plugin entry point — called by the Hermes plugin system."""
ctx.register_platform(
name="teams",
label="Microsoft Teams",
adapter_factory=lambda cfg: TeamsAdapter(cfg),
check_fn=check_requirements,
validate_config=validate_config,
is_connected=is_connected,
required_env=["TEAMS_CLIENT_ID", "TEAMS_CLIENT_SECRET", "TEAMS_TENANT_ID"],
install_hint="pip install microsoft-teams-apps aiohttp",
setup_fn=interactive_setup,
# Env-driven auto-configuration — seeds PlatformConfig.extra with
# client_id/secret/tenant + port + home_channel so env-only setups
# show up in gateway status without instantiating the Teams SDK.
env_enablement_fn=_env_enablement,
# Cron home-channel delivery support. Lets deliver=teams cron
# jobs route to the configured Teams chat/channel without editing
# cron/scheduler.py's hardcoded sets.
cron_deliver_env_var="TEAMS_HOME_CHANNEL",
# Out-of-process cron delivery via Bot Framework REST. Without
# this hook, deliver=teams cron jobs fail with "No live adapter"
# when cron runs separately from the gateway.
standalone_sender_fn=_standalone_send,
# Auth env vars for _is_user_authorized() integration
allowed_users_env="TEAMS_ALLOWED_USERS",
allow_all_env="TEAMS_ALLOW_ALL_USERS",
# Teams supports up to ~28 KB per message
max_message_length=28000,
# Display
emoji="💼",
allow_update_command=True,
# LLM guidance
platform_hint=(
"You are chatting via Microsoft Teams. Teams renders a subset of "
"markdown — bold (**text**), italic (*text*), and inline code "
"(`code`) work, but complex tables or raw HTML do not. Keep "
"responses clear and professional."
),
)