From 12289c2630548b35575e289ba215a4541dd8ec72 Mon Sep 17 00:00:00 2001 From: Andrew Ho Date: Sun, 3 May 2026 01:33:20 -0700 Subject: [PATCH] feat: add SSE transport support for MCP client Add support for MCP servers using the SSE transport protocol (SseServerTransport) alongside the existing Streamable HTTP and stdio transports. Many MCP servers use SSE (GET /sse + POST /messages/) which was previously unsupported -- the client silently fell back to Streamable HTTP, causing 10s connection timeouts. Changes: - Import mcp.client.sse.sse_client with graceful fallback - Check config.get('transport') == 'sse' in _run_http() to select the SSE transport path with proper timeout handling - Read transport type from config in get_mcp_status() instead of hardcoding 'http' for URL-based servers - Update docstring, example config, and feature list --- tools/mcp_tool.py | 52 ++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 47 insertions(+), 5 deletions(-) diff --git a/tools/mcp_tool.py b/tools/mcp_tool.py index 9ed8ac75d0..c3d88475f5 100644 --- a/tools/mcp_tool.py +++ b/tools/mcp_tool.py @@ -2,9 +2,9 @@ """ MCP (Model Context Protocol) Client Support -Connects to external MCP servers via stdio or HTTP/StreamableHTTP transport, -discovers their tools, and registers them into the hermes-agent tool registry -so the agent can call them like any built-in tool. +Connects to external MCP servers via stdio, HTTP/StreamableHTTP, or SSE +transport, discovers their tools, and registers them into the hermes-agent +tool registry so the agent can call them like any built-in tool. Configuration is read from ~/.hermes/config.yaml under the ``mcp_servers`` key. The ``mcp`` Python package is optional -- if not installed, this module is a @@ -29,7 +29,11 @@ Example config:: headers: Authorization: "Bearer sk-..." timeout: 180 - analysis: + searxng: + url: "http://localhost:8000/sse" + transport: sse # use SSE transport instead of Streamable HTTP + timeout: 180 + connect_timeout: 10 command: "npx" args: ["-y", "analysis-server"] sampling: # server-initiated LLM requests @@ -44,6 +48,7 @@ Example config:: Features: - Stdio transport (command + args) and HTTP/StreamableHTTP transport (url) + - SSE transport (transport: sse) for MCP servers using the SSE protocol - Automatic reconnection with exponential backoff (up to 5 retries) - Environment variable filtering for stdio subprocesses (security) - Credential stripping in error messages returned to the LLM @@ -191,6 +196,12 @@ try: from mcp.types import LATEST_PROTOCOL_VERSION except ImportError: logger.debug("mcp.types.LATEST_PROTOCOL_VERSION not available -- using fallback protocol version") + # SSE transport client (for MCP servers using SSE transport instead of Streamable HTTP) + try: + from mcp.client.sse import sse_client + except ImportError: + sse_client = None + logger.debug("mcp.client.sse.sse_client not available -- SSE transport disabled") # Sampling types -- separated so older SDK versions don't break MCP support try: from mcp.types import ( @@ -1210,6 +1221,37 @@ class MCPServerTask: if _MCP_NOTIFICATION_TYPES and _MCP_MESSAGE_HANDLER_SUPPORTED: sampling_kwargs["message_handler"] = self._make_message_handler() + # SSE transport (for MCP servers that implement the SSE transport protocol + # rather than Streamable HTTP). Configure with ``transport: sse`` in the + # mcp_servers entry in config.yaml. + if config.get("transport") == "sse": + if sse_client is None: + raise ImportError( + f"MCP server '{self.name}' requires SSE transport but " + "mcp.client.sse.sse_client is not available. " + "Upgrade the mcp package to get SSE support." + ) + async with sse_client( + url=url, + headers=headers or None, + timeout=float(connect_timeout), + sse_read_timeout=float(config.get("timeout", _DEFAULT_TOOL_TIMEOUT)), + ) as (read_stream, write_stream): + async with ClientSession( + read_stream, write_stream, **sampling_kwargs + ) as session: + await session.initialize() + self.session = session + await self._discover_tools() + self._ready.set() + reason = await self._wait_for_lifecycle_event() + if reason == "reconnect": + logger.info( + "MCP server '%s': reconnect requested — " + "tearing down SSE session", self.name, + ) + return + if _MCP_NEW_HTTP: # New API (mcp >= 1.24.0): build an explicit httpx.AsyncClient # matching the SDK's own create_mcp_http_client defaults. @@ -2965,7 +3007,7 @@ def get_mcp_status() -> List[dict]: active_servers = dict(_servers) for name, cfg in configured.items(): - transport = "http" if "url" in cfg else "stdio" + transport = cfg.get("transport", "http") if "url" in cfg else "stdio" server = active_servers.get(name) if server and server.session is not None: entry = {