feat(mcp): banner integration, /reload-mcp command, resources & prompts

Banner integration:
- MCP Servers section in CLI startup banner between Tools and Skills
- Shows each server with transport type, tool count, connection status
- Failed servers shown in red; section hidden when no MCP configured
- Summary line includes MCP server count
- Removed raw print() calls from discovery (banner handles display)

/reload-mcp command:
- New slash command in both CLI and gateway
- Disconnects all MCP servers, re-reads config.yaml, reconnects
- Reports what changed (added/removed/reconnected servers)
- Allows adding/removing MCP servers without restarting

Resources & Prompts support:
- 4 utility tools registered per server: list_resources, read_resource,
  list_prompts, get_prompt
- Exposes MCP Resources (data sources) and Prompts (templates) as tools
- Proper parameter schemas (uri for read_resource, name for get_prompt)
- Handles text and binary resource content
- 23 new tests covering schemas, handlers, and registration

Test coverage: 74 MCP tests total, 1186 tests pass overall.
This commit is contained in:
teknium1 2026-03-02 19:15:59 -08:00
parent 60effcfc44
commit 7df14227a9
5 changed files with 869 additions and 5 deletions

View file

@ -475,6 +475,190 @@ def _make_tool_handler(server_name: str, tool_name: str, tool_timeout: float):
return _handler
def _make_list_resources_handler(server_name: str, tool_timeout: float):
"""Return a sync handler that lists resources from an MCP server."""
def _handler(args: dict, **kwargs) -> str:
with _lock:
server = _servers.get(server_name)
if not server or not server.session:
return json.dumps({
"error": f"MCP server '{server_name}' is not connected"
})
async def _call():
result = await server.session.list_resources()
resources = []
for r in (result.resources if hasattr(result, "resources") else []):
entry = {}
if hasattr(r, "uri"):
entry["uri"] = str(r.uri)
if hasattr(r, "name"):
entry["name"] = r.name
if hasattr(r, "description") and r.description:
entry["description"] = r.description
if hasattr(r, "mimeType") and r.mimeType:
entry["mimeType"] = r.mimeType
resources.append(entry)
return json.dumps({"resources": resources})
try:
return _run_on_mcp_loop(_call(), timeout=tool_timeout)
except Exception as exc:
logger.error(
"MCP %s/list_resources failed: %s", server_name, exc,
)
return json.dumps({
"error": _sanitize_error(
f"MCP call failed: {type(exc).__name__}: {exc}"
)
})
return _handler
def _make_read_resource_handler(server_name: str, tool_timeout: float):
"""Return a sync handler that reads a resource by URI from an MCP server."""
def _handler(args: dict, **kwargs) -> str:
with _lock:
server = _servers.get(server_name)
if not server or not server.session:
return json.dumps({
"error": f"MCP server '{server_name}' is not connected"
})
uri = args.get("uri")
if not uri:
return json.dumps({"error": "Missing required parameter 'uri'"})
async def _call():
result = await server.session.read_resource(uri)
# read_resource returns ReadResourceResult with .contents list
parts: List[str] = []
contents = result.contents if hasattr(result, "contents") else []
for block in contents:
if hasattr(block, "text"):
parts.append(block.text)
elif hasattr(block, "blob"):
parts.append(f"[binary data, {len(block.blob)} bytes]")
return json.dumps({"result": "\n".join(parts) if parts else ""})
try:
return _run_on_mcp_loop(_call(), timeout=tool_timeout)
except Exception as exc:
logger.error(
"MCP %s/read_resource failed: %s", server_name, exc,
)
return json.dumps({
"error": _sanitize_error(
f"MCP call failed: {type(exc).__name__}: {exc}"
)
})
return _handler
def _make_list_prompts_handler(server_name: str, tool_timeout: float):
"""Return a sync handler that lists prompts from an MCP server."""
def _handler(args: dict, **kwargs) -> str:
with _lock:
server = _servers.get(server_name)
if not server or not server.session:
return json.dumps({
"error": f"MCP server '{server_name}' is not connected"
})
async def _call():
result = await server.session.list_prompts()
prompts = []
for p in (result.prompts if hasattr(result, "prompts") else []):
entry = {}
if hasattr(p, "name"):
entry["name"] = p.name
if hasattr(p, "description") and p.description:
entry["description"] = p.description
if hasattr(p, "arguments") and p.arguments:
entry["arguments"] = [
{
"name": a.name,
**({"description": a.description} if hasattr(a, "description") and a.description else {}),
**({"required": a.required} if hasattr(a, "required") else {}),
}
for a in p.arguments
]
prompts.append(entry)
return json.dumps({"prompts": prompts})
try:
return _run_on_mcp_loop(_call(), timeout=tool_timeout)
except Exception as exc:
logger.error(
"MCP %s/list_prompts failed: %s", server_name, exc,
)
return json.dumps({
"error": _sanitize_error(
f"MCP call failed: {type(exc).__name__}: {exc}"
)
})
return _handler
def _make_get_prompt_handler(server_name: str, tool_timeout: float):
"""Return a sync handler that gets a prompt by name from an MCP server."""
def _handler(args: dict, **kwargs) -> str:
with _lock:
server = _servers.get(server_name)
if not server or not server.session:
return json.dumps({
"error": f"MCP server '{server_name}' is not connected"
})
name = args.get("name")
if not name:
return json.dumps({"error": "Missing required parameter 'name'"})
arguments = args.get("arguments", {})
async def _call():
result = await server.session.get_prompt(name, arguments=arguments)
# GetPromptResult has .messages list
messages = []
for msg in (result.messages if hasattr(result, "messages") else []):
entry = {}
if hasattr(msg, "role"):
entry["role"] = msg.role
if hasattr(msg, "content"):
content = msg.content
if hasattr(content, "text"):
entry["content"] = content.text
elif isinstance(content, str):
entry["content"] = content
else:
entry["content"] = str(content)
messages.append(entry)
resp = {"messages": messages}
if hasattr(result, "description") and result.description:
resp["description"] = result.description
return json.dumps(resp)
try:
return _run_on_mcp_loop(_call(), timeout=tool_timeout)
except Exception as exc:
logger.error(
"MCP %s/get_prompt failed: %s", server_name, exc,
)
return json.dumps({
"error": _sanitize_error(
f"MCP call failed: {type(exc).__name__}: {exc}"
)
})
return _handler
def _make_check_fn(server_name: str):
"""Return a check function that verifies the MCP connection is alive."""
@ -515,6 +699,77 @@ def _convert_mcp_schema(server_name: str, mcp_tool) -> dict:
}
def _build_utility_schemas(server_name: str) -> List[dict]:
"""Build schemas for the MCP utility tools (resources & prompts).
Returns a list of (schema, handler_factory_name) tuples encoded as dicts
with keys: schema, handler_key.
"""
safe_name = server_name.replace("-", "_").replace(".", "_")
return [
{
"schema": {
"name": f"mcp_{safe_name}_list_resources",
"description": f"List available resources from MCP server '{server_name}'",
"parameters": {
"type": "object",
"properties": {},
},
},
"handler_key": "list_resources",
},
{
"schema": {
"name": f"mcp_{safe_name}_read_resource",
"description": f"Read a resource by URI from MCP server '{server_name}'",
"parameters": {
"type": "object",
"properties": {
"uri": {
"type": "string",
"description": "URI of the resource to read",
},
},
"required": ["uri"],
},
},
"handler_key": "read_resource",
},
{
"schema": {
"name": f"mcp_{safe_name}_list_prompts",
"description": f"List available prompts from MCP server '{server_name}'",
"parameters": {
"type": "object",
"properties": {},
},
},
"handler_key": "list_prompts",
},
{
"schema": {
"name": f"mcp_{safe_name}_get_prompt",
"description": f"Get a prompt by name from MCP server '{server_name}'",
"parameters": {
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "Name of the prompt to retrieve",
},
"arguments": {
"type": "object",
"description": "Optional arguments to pass to the prompt",
},
},
"required": ["name"],
},
},
"handler_key": "get_prompt",
},
]
def _existing_tool_names() -> List[str]:
"""Return tool names for all currently connected servers."""
names: List[str] = []
@ -522,12 +777,18 @@ def _existing_tool_names() -> List[str]:
for mcp_tool in server._tools:
schema = _convert_mcp_schema(sname, mcp_tool)
names.append(schema["name"])
# Also include utility tool names
for entry in _build_utility_schemas(sname):
names.append(entry["schema"]["name"])
return names
async def _discover_and_register_server(name: str, config: dict) -> List[str]:
"""Connect to a single MCP server, discover tools, and register them.
Also registers utility tools for MCP Resources and Prompts support
(list_resources, read_resource, list_prompts, get_prompt).
Returns list of registered tool names.
"""
from tools.registry import registry
@ -559,6 +820,30 @@ async def _discover_and_register_server(name: str, config: dict) -> List[str]:
)
registered_names.append(tool_name_prefixed)
# Register MCP Resources & Prompts utility tools
_handler_factories = {
"list_resources": _make_list_resources_handler,
"read_resource": _make_read_resource_handler,
"list_prompts": _make_list_prompts_handler,
"get_prompt": _make_get_prompt_handler,
}
check_fn = _make_check_fn(name)
for entry in _build_utility_schemas(name):
schema = entry["schema"]
handler_key = entry["handler_key"]
handler = _handler_factories[handler_key](name, server.tool_timeout)
registry.register(
name=schema["name"],
toolset=toolset_name,
schema=schema,
handler=handler,
check_fn=check_fn,
is_async=False,
description=schema["description"],
)
registered_names.append(schema["name"])
# Create a custom toolset so these tools are discoverable
if registered_names:
create_custom_toolset(
@ -620,10 +905,8 @@ def discover_mcp_tools() -> List[str]:
try:
registered = await _discover_and_register_server(name, cfg)
transport_type = "HTTP" if "url" in cfg else "stdio"
print(f" MCP: '{name}' ({transport_type}) — {len(registered)} tool(s)")
return registered
except Exception as exc:
print(f" MCP: '{name}' — FAILED: {exc}")
logger.warning(
"Failed to connect to MCP server '%s': %s",
name, exc,
@ -666,12 +949,49 @@ def discover_mcp_tools() -> List[str]:
summary = f" MCP: {len(all_tools)} tool(s) from {ok_servers} server(s)"
if failed_count:
summary += f" ({failed_count} failed)"
print(summary)
logger.info(summary)
# Return ALL registered tools (existing + newly discovered)
return _existing_tool_names()
def get_mcp_status() -> List[dict]:
"""Return status of all configured MCP servers for banner display.
Returns a list of dicts with keys: name, transport, tools, connected.
Includes both successfully connected servers and configured-but-failed ones.
"""
result: List[dict] = []
# Get configured servers from config
configured = _load_mcp_config()
if not configured:
return result
with _lock:
active_servers = dict(_servers)
for name, cfg in configured.items():
transport = "http" if "url" in cfg else "stdio"
server = active_servers.get(name)
if server and server.session is not None:
result.append({
"name": name,
"transport": transport,
"tools": len(server._tools),
"connected": True,
})
else:
result.append({
"name": name,
"transport": transport,
"tools": 0,
"connected": False,
})
return result
def shutdown_mcp_servers():
"""Close all MCP server connections and stop the background loop.