mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-08 03:01:47 +00:00
feat: add SSE transport support for MCP client
Add support for MCP servers using the SSE transport protocol
(SseServerTransport) alongside the existing Streamable HTTP and stdio
transports. Many MCP servers use SSE (GET /sse + POST /messages/)
which was previously unsupported -- the client silently fell back to
Streamable HTTP, causing 10s connection timeouts.
Changes:
- Import mcp.client.sse.sse_client with graceful fallback
- Check config.get('transport') == 'sse' in _run_http() to select
the SSE transport path with proper timeout handling
- Read transport type from config in get_mcp_status() instead of
hardcoding 'http' for URL-based servers
- Update docstring, example config, and feature list
This commit is contained in:
parent
c4a7992317
commit
12289c2630
1 changed files with 47 additions and 5 deletions
|
|
@ -2,9 +2,9 @@
|
|||
"""
|
||||
MCP (Model Context Protocol) Client Support
|
||||
|
||||
Connects to external MCP servers via stdio or HTTP/StreamableHTTP transport,
|
||||
discovers their tools, and registers them into the hermes-agent tool registry
|
||||
so the agent can call them like any built-in tool.
|
||||
Connects to external MCP servers via stdio, HTTP/StreamableHTTP, or SSE
|
||||
transport, discovers their tools, and registers them into the hermes-agent
|
||||
tool registry so the agent can call them like any built-in tool.
|
||||
|
||||
Configuration is read from ~/.hermes/config.yaml under the ``mcp_servers`` key.
|
||||
The ``mcp`` Python package is optional -- if not installed, this module is a
|
||||
|
|
@ -29,7 +29,11 @@ Example config::
|
|||
headers:
|
||||
Authorization: "Bearer sk-..."
|
||||
timeout: 180
|
||||
analysis:
|
||||
searxng:
|
||||
url: "http://localhost:8000/sse"
|
||||
transport: sse # use SSE transport instead of Streamable HTTP
|
||||
timeout: 180
|
||||
connect_timeout: 10
|
||||
command: "npx"
|
||||
args: ["-y", "analysis-server"]
|
||||
sampling: # server-initiated LLM requests
|
||||
|
|
@ -44,6 +48,7 @@ Example config::
|
|||
|
||||
Features:
|
||||
- Stdio transport (command + args) and HTTP/StreamableHTTP transport (url)
|
||||
- SSE transport (transport: sse) for MCP servers using the SSE protocol
|
||||
- Automatic reconnection with exponential backoff (up to 5 retries)
|
||||
- Environment variable filtering for stdio subprocesses (security)
|
||||
- Credential stripping in error messages returned to the LLM
|
||||
|
|
@ -191,6 +196,12 @@ try:
|
|||
from mcp.types import LATEST_PROTOCOL_VERSION
|
||||
except ImportError:
|
||||
logger.debug("mcp.types.LATEST_PROTOCOL_VERSION not available -- using fallback protocol version")
|
||||
# SSE transport client (for MCP servers using SSE transport instead of Streamable HTTP)
|
||||
try:
|
||||
from mcp.client.sse import sse_client
|
||||
except ImportError:
|
||||
sse_client = None
|
||||
logger.debug("mcp.client.sse.sse_client not available -- SSE transport disabled")
|
||||
# Sampling types -- separated so older SDK versions don't break MCP support
|
||||
try:
|
||||
from mcp.types import (
|
||||
|
|
@ -1210,6 +1221,37 @@ class MCPServerTask:
|
|||
if _MCP_NOTIFICATION_TYPES and _MCP_MESSAGE_HANDLER_SUPPORTED:
|
||||
sampling_kwargs["message_handler"] = self._make_message_handler()
|
||||
|
||||
# SSE transport (for MCP servers that implement the SSE transport protocol
|
||||
# rather than Streamable HTTP). Configure with ``transport: sse`` in the
|
||||
# mcp_servers entry in config.yaml.
|
||||
if config.get("transport") == "sse":
|
||||
if sse_client is None:
|
||||
raise ImportError(
|
||||
f"MCP server '{self.name}' requires SSE transport but "
|
||||
"mcp.client.sse.sse_client is not available. "
|
||||
"Upgrade the mcp package to get SSE support."
|
||||
)
|
||||
async with sse_client(
|
||||
url=url,
|
||||
headers=headers or None,
|
||||
timeout=float(connect_timeout),
|
||||
sse_read_timeout=float(config.get("timeout", _DEFAULT_TOOL_TIMEOUT)),
|
||||
) as (read_stream, write_stream):
|
||||
async with ClientSession(
|
||||
read_stream, write_stream, **sampling_kwargs
|
||||
) as session:
|
||||
await session.initialize()
|
||||
self.session = session
|
||||
await self._discover_tools()
|
||||
self._ready.set()
|
||||
reason = await self._wait_for_lifecycle_event()
|
||||
if reason == "reconnect":
|
||||
logger.info(
|
||||
"MCP server '%s': reconnect requested — "
|
||||
"tearing down SSE session", self.name,
|
||||
)
|
||||
return
|
||||
|
||||
if _MCP_NEW_HTTP:
|
||||
# New API (mcp >= 1.24.0): build an explicit httpx.AsyncClient
|
||||
# matching the SDK's own create_mcp_http_client defaults.
|
||||
|
|
@ -2965,7 +3007,7 @@ def get_mcp_status() -> List[dict]:
|
|||
active_servers = dict(_servers)
|
||||
|
||||
for name, cfg in configured.items():
|
||||
transport = "http" if "url" in cfg else "stdio"
|
||||
transport = cfg.get("transport", "http") if "url" in cfg else "stdio"
|
||||
server = active_servers.get(name)
|
||||
if server and server.session is not None:
|
||||
entry = {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue