hermes-agent/plugins/teams_pipeline/store.py
Dilee 07bbd93337 feat(teams-pipeline): add plugin runtime and operator cli
Third slice of the Microsoft Teams meeting pipeline stack, salvaged
onto current main. Adds the standalone teams_pipeline plugin that
consumes Graph change notifications from the webhook listener,
resolves meeting artifacts (transcript first, recording + STT fallback
later), persists job state in a durable store, and exposes an operator
CLI for inspection, replay, subscription management, and validation.

Design choices follow maintainer review feedback on PR #19815:

- Standalone plugin rather than bolted-on core surface
  (plugins/teams_pipeline/, kind: standalone in plugin.yaml).
- Zero new model tools. The agent drives the pipeline by invoking
  the operator CLI via the terminal tool, guided by the skill that
  ships with a follow-up PR.
- Reuses the existing msgraph_webhook gateway platform for Graph
  ingress. Pipeline runtime is wired in via bind_gateway_runtime and
  gated on plugins.enabled so gateways that don't run the plugin
  boot cleanly.

Additions:

- plugins/teams_pipeline/: runtime (gateway wiring + config builder),
  pipeline core, durable SQLite store, subscription maintenance
  helpers, Graph artifact resolution, operator CLI (list, show,
  run/replay, fetch dry-run, subscriptions list, subscribe,
  renew-subscription, delete-subscription, maintain-subscriptions,
  token-health, validate).
- hermes_cli/main.py: second-pass plugin CLI discovery so any
  standalone plugin registered via ctx.register_cli_command()
  outside the memory-plugin convention path gets its subcommand
  wired into argparse without touching core.
- gateway/run.py: _teams_pipeline_plugin_enabled() config gate,
  _wire_teams_pipeline_runtime() binding after adapter setup, and
  the two runner attributes used by the runtime.

Credit to @dlkakbs for the entire plugin implementation.
2026-05-08 11:18:14 -07:00

193 lines
7.3 KiB
Python

"""Durable local state for the Teams pipeline plugin."""
from __future__ import annotations
import hashlib
import json
import os
import threading
from copy import deepcopy
from datetime import datetime, timezone
from pathlib import Path
from tempfile import NamedTemporaryFile
from typing import Any, Dict, Optional
from hermes_constants import get_hermes_home
DEFAULT_TEAMS_PIPELINE_STORE_FILENAME = "teams_pipeline_store.json"
def _utc_now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def resolve_teams_pipeline_store_path(path: str | Path | None = None) -> Path:
if path is not None:
explicit = str(path).strip()
if explicit:
return Path(explicit)
env_path = os.getenv("MSGRAPH_WEBHOOK_STORE_PATH", "").strip()
if env_path:
return Path(env_path)
return get_hermes_home() / DEFAULT_TEAMS_PIPELINE_STORE_FILENAME
class TeamsPipelineStore:
"""JSON-backed durable store for Teams pipeline state."""
def __init__(self, path: str | Path):
self.path = Path(path)
self._lock = threading.RLock()
self._state: Dict[str, Dict[str, Any]] = {
"subscriptions": {},
"notification_receipts": {},
"event_timestamps": {},
"jobs": {},
"sink_records": {},
}
self._load()
def _load(self) -> None:
with self._lock:
if not self.path.exists():
return
data = json.loads(self.path.read_text(encoding="utf-8") or "{}")
if not isinstance(data, dict):
return
self._state["subscriptions"] = dict(data.get("subscriptions") or {})
self._state["notification_receipts"] = dict(data.get("notification_receipts") or {})
self._state["event_timestamps"] = dict(data.get("event_timestamps") or {})
self._state["jobs"] = dict(data.get("jobs") or {})
self._state["sink_records"] = dict(data.get("sink_records") or {})
def _persist(self) -> None:
self.path.parent.mkdir(parents=True, exist_ok=True)
with NamedTemporaryFile(
"w",
encoding="utf-8",
dir=str(self.path.parent),
delete=False,
) as tmp:
json.dump(self._state, tmp, indent=2, sort_keys=True)
tmp.flush()
tmp_path = Path(tmp.name)
tmp_path.replace(self.path)
def list_subscriptions(self) -> Dict[str, Dict[str, Any]]:
with self._lock:
return deepcopy(self._state["subscriptions"])
def get_subscription(self, subscription_id: str) -> Optional[Dict[str, Any]]:
with self._lock:
record = self._state["subscriptions"].get(subscription_id)
return deepcopy(record) if isinstance(record, dict) else None
def upsert_subscription(self, subscription_id: str, payload: Dict[str, Any]) -> Dict[str, Any]:
with self._lock:
existing = self._state["subscriptions"].get(subscription_id, {})
merged = {**existing, **deepcopy(payload)}
merged["subscription_id"] = subscription_id
merged.setdefault("created_at", existing.get("created_at") or _utc_now_iso())
merged["updated_at"] = _utc_now_iso()
self._state["subscriptions"][subscription_id] = merged
self._persist()
return deepcopy(merged)
def delete_subscription(self, subscription_id: str) -> bool:
with self._lock:
removed = self._state["subscriptions"].pop(subscription_id, None)
if removed is None:
return False
self._persist()
return True
@classmethod
def build_notification_receipt_key(cls, notification: Dict[str, Any]) -> str:
explicit_id = notification.get("id")
if explicit_id:
return f"id:{explicit_id}"
canonical = json.dumps(notification, sort_keys=True, separators=(",", ":"))
digest = hashlib.sha256(canonical.encode("utf-8")).hexdigest()
return f"sha256:{digest}"
def has_notification_receipt(self, receipt_key: str) -> bool:
with self._lock:
return receipt_key in self._state["notification_receipts"]
def record_notification_receipt(
self,
receipt_key: str,
payload: Optional[Dict[str, Any]] = None,
*,
received_at: Optional[str] = None,
) -> bool:
with self._lock:
if receipt_key in self._state["notification_receipts"]:
return False
self._state["notification_receipts"][receipt_key] = {
"received_at": received_at or _utc_now_iso(),
"payload": deepcopy(payload) if isinstance(payload, dict) else payload,
}
self._persist()
return True
def record_event_timestamp(self, event_key: str, timestamp: Optional[str] = None) -> str:
with self._lock:
value = timestamp or _utc_now_iso()
self._state["event_timestamps"][event_key] = value
self._persist()
return value
def get_event_timestamp(self, event_key: str) -> Optional[str]:
with self._lock:
value = self._state["event_timestamps"].get(event_key)
return str(value) if value is not None else None
def stats(self) -> Dict[str, int]:
with self._lock:
return {
"subscriptions": len(self._state["subscriptions"]),
"notification_receipts": len(self._state["notification_receipts"]),
"event_timestamps": len(self._state["event_timestamps"]),
"jobs": len(self._state["jobs"]),
"sink_records": len(self._state["sink_records"]),
}
def upsert_job(self, job_id: str, payload: Dict[str, Any]) -> Dict[str, Any]:
with self._lock:
existing = self._state["jobs"].get(job_id, {})
merged = {**existing, **deepcopy(payload)}
merged["job_id"] = job_id
merged.setdefault("created_at", existing.get("created_at") or _utc_now_iso())
merged["updated_at"] = _utc_now_iso()
self._state["jobs"][job_id] = merged
self._persist()
return deepcopy(merged)
def get_job(self, job_id: str) -> Optional[Dict[str, Any]]:
with self._lock:
record = self._state["jobs"].get(job_id)
return deepcopy(record) if isinstance(record, dict) else None
def list_jobs(self) -> Dict[str, Dict[str, Any]]:
with self._lock:
return deepcopy(self._state["jobs"])
def upsert_sink_record(self, sink_key: str, payload: Dict[str, Any]) -> Dict[str, Any]:
with self._lock:
existing = self._state["sink_records"].get(sink_key, {})
merged = {**existing, **deepcopy(payload)}
merged["sink_key"] = sink_key
merged.setdefault("created_at", existing.get("created_at") or _utc_now_iso())
merged["updated_at"] = _utc_now_iso()
self._state["sink_records"][sink_key] = merged
self._persist()
return deepcopy(merged)
def get_sink_record(self, sink_key: str) -> Optional[Dict[str, Any]]:
with self._lock:
record = self._state["sink_records"].get(sink_key)
return deepcopy(record) if isinstance(record, dict) else None