"""Thin OAuth adapter for MCP HTTP servers. Wraps the MCP SDK's built-in ``OAuthClientProvider`` (which implements ``httpx.Auth``) with Hermes-specific token storage and browser-based authorization. The SDK handles all of the heavy lifting: PKCE generation, metadata discovery, dynamic client registration, token exchange, and refresh. Usage in mcp_tool.py:: from tools.mcp_oauth import build_oauth_auth auth = build_oauth_auth(server_name, server_url) # pass ``auth`` as the httpx auth parameter """ from __future__ import annotations import asyncio import json import logging import os import socket import threading import webbrowser from http.server import BaseHTTPRequestHandler, HTTPServer from pathlib import Path from typing import Any from urllib.parse import parse_qs, urlparse logger = logging.getLogger(__name__) _TOKEN_DIR_NAME = "mcp-tokens" # --------------------------------------------------------------------------- # Token storage — persists tokens + client info to ~/.hermes/mcp-tokens/ # --------------------------------------------------------------------------- class HermesTokenStorage: """File-backed token storage implementing the MCP SDK's TokenStorage protocol.""" def __init__(self, server_name: str): self._server_name = server_name def _base_dir(self) -> Path: home = Path(os.environ.get("HERMES_HOME", Path.home() / ".hermes")) d = home / _TOKEN_DIR_NAME d.mkdir(parents=True, exist_ok=True) return d def _tokens_path(self) -> Path: return self._base_dir() / f"{self._server_name}.json" def _client_path(self) -> Path: return self._base_dir() / f"{self._server_name}.client.json" # -- TokenStorage protocol (async) -- async def get_tokens(self): data = self._read_json(self._tokens_path()) if not data: return None try: from mcp.shared.auth import OAuthToken return OAuthToken(**data) except Exception: return None async def set_tokens(self, tokens) -> None: self._write_json(self._tokens_path(), tokens.model_dump(exclude_none=True)) async def get_client_info(self): data = self._read_json(self._client_path()) if not data: return None try: from mcp.shared.auth import OAuthClientInformationFull return OAuthClientInformationFull(**data) except Exception: return None async def set_client_info(self, client_info) -> None: self._write_json(self._client_path(), client_info.model_dump(exclude_none=True)) # -- helpers -- @staticmethod def _read_json(path: Path) -> dict | None: if not path.exists(): return None try: return json.loads(path.read_text(encoding="utf-8")) except Exception: return None @staticmethod def _write_json(path: Path, data: dict) -> None: path.write_text(json.dumps(data, indent=2), encoding="utf-8") try: path.chmod(0o600) except OSError: pass def remove(self) -> None: """Delete stored tokens and client info for this server.""" for p in (self._tokens_path(), self._client_path()): try: p.unlink(missing_ok=True) except OSError: pass # --------------------------------------------------------------------------- # Browser-based callback handler # --------------------------------------------------------------------------- def _find_free_port() -> int: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind(("127.0.0.1", 0)) return s.getsockname()[1] class _CallbackHandler(BaseHTTPRequestHandler): auth_code: str | None = None state: str | None = None def do_GET(self): qs = parse_qs(urlparse(self.path).query) _CallbackHandler.auth_code = (qs.get("code") or [None])[0] _CallbackHandler.state = (qs.get("state") or [None])[0] self.send_response(200) self.send_header("Content-Type", "text/html") self.end_headers() self.wfile.write(b"