hermes-agent/plugins/web/parallel/provider.py
Teknium 0a5762c78d fix(web): genericize free-MCP client identity per telemetry policy
Replace the hermes-identifying clientInfo/User-Agent/session-id prefix on
the keyless Parallel Search MCP path with a neutral 'mcp-web-client'
identity. Project policy forbids third-party usage attribution without an
explicit user opt-in (see telemetry PR policy); MCP requires a clientInfo,
so a generic one satisfies the spec without attributing traffic.

Also adds the contributor AUTHOR_MAP entry and refreshes uv.lock against
current main (parallel-web 0.6.0).
2026-06-10 19:54:38 -07:00

696 lines
26 KiB
Python

"""Parallel.ai web search + content extraction — plugin form.
Subclasses :class:`agent.web_search_provider.WebSearchProvider`.
Search runs on one of two transports, picked by credential:
- **No key →** the free hosted Search MCP at ``https://search.parallel.ai/mcp``
(anonymous Streamable-HTTP JSON-RPC). This makes ``web_search`` work out of
the box with zero setup, which is why ``parallel`` is the keyless default
backend in :func:`tools.web_tools._get_backend`.
- **``PARALLEL_API_KEY`` →** the ``parallel`` SDK's v1 ``search`` / ``extract``
REST endpoints (objective-tuned, mode-selectable, higher rate limits).
Extract mirrors search: keyed uses the async SDK (``AsyncParallel``) v1
``extract``; keyless uses the free MCP's ``web_fetch``. :meth:`extract` is
declared ``async def`` and the dispatcher in
:func:`tools.web_tools.web_extract_tool` detects coroutines via
:func:`inspect.iscoroutinefunction` and awaits.
Config keys this provider responds to::
web:
search_backend: "parallel" # explicit per-capability
extract_backend: "parallel" # explicit per-capability
backend: "parallel" # shared fallback
# Optional: search mode (default "advanced"; also "basic")
# via the PARALLEL_SEARCH_MODE env var. REST path only.
Env vars::
PARALLEL_API_KEY=... # https://parallel.ai (optional — unlocks
# the v1 REST Search API; without it,
# search and extract use the free MCP)
PARALLEL_SEARCH_MODE=advanced # optional: basic|advanced (legacy
# fast/one-shot map to basic, agentic to
# advanced). REST path only.
"""
from __future__ import annotations
import asyncio
import json
import logging
import os
import uuid
from typing import Any, Dict, List
import httpx
from agent.web_search_provider import WebSearchProvider
logger = logging.getLogger(__name__)
# Free hosted Search MCP — anonymous-friendly, used when no PARALLEL_API_KEY is
# configured. Docs: https://docs.parallel.ai/integrations/mcp/search-mcp
_MCP_SEARCH_URL = "https://search.parallel.ai/mcp"
_MCP_PROTOCOL_VERSION = "2025-06-18"
# Deliberately generic client identity. Project policy (see the telemetry PR
# policy in AGENTS.md) forbids third-party usage attribution without an
# explicit user opt-in, so neither clientInfo nor the User-Agent names
# hermes. MCP requires *a* clientInfo; a neutral one satisfies the spec
# without attributing traffic.
_MCP_CLIENT_NAME = "mcp-web-client"
_MCP_CLIENT_VERSION = "1.0.0"
_MCP_USER_AGENT = f"{_MCP_CLIENT_NAME}/{_MCP_CLIENT_VERSION}"
_MCP_TIMEOUT_SECONDS = 30.0
# Free-tier attribution. The hosted Search MCP is free to use; surfacing this
# on keyless results credits Parallel and matches the free-tier terms
# (https://parallel.ai/customer-terms).
_FREE_MCP_ATTRIBUTION = (
"Search powered by the free Parallel Web Search MCP (https://parallel.ai)."
)
def _new_session_id() -> str:
"""Mint a fresh Parallel ``session_id`` for a single tool call.
Per-call rather than process-global: one process serves many unrelated
chats in the gateway/batch runners, and a shared id would pool their
searches into one Parallel session. The prefix is deliberately generic
(no hermes attribution — telemetry policy).
"""
return f"{_MCP_CLIENT_NAME}-{uuid.uuid4().hex}"
# Module-level note: the canonical cache slots ``_parallel_client`` and
# ``_async_parallel_client`` live on :mod:`tools.web_tools` so tests that do
# ``tools.web_tools._parallel_client = None`` between cases see fresh state.
# The plugin reads/writes through that public module (see
# :func:`_get_sync_client` / :func:`_get_async_client`).
def _ensure_parallel_sdk_installed() -> None:
"""Trigger lazy install of the parallel SDK if it isn't present.
Mirrors the lazy-deps pattern used by the legacy implementation.
Swallows benign ImportError from the lazy_deps helper itself; if the
SDK is genuinely missing the subsequent ``from parallel import ...``
raises ImportError that the caller can handle.
"""
try:
from tools.lazy_deps import ensure as _lazy_ensure
_lazy_ensure("search.parallel", prompt=False)
except ImportError:
pass
except Exception as exc: # noqa: BLE001 — surface install hint as ImportError
raise ImportError(str(exc))
def _get_sync_client() -> Any:
"""Lazy-load + cache the sync Parallel client.
Cache lives on :mod:`tools.web_tools` (as ``_parallel_client``) so unit
tests that reset that name between cases keep working.
"""
import tools.web_tools as _wt
cached = getattr(_wt, "_parallel_client", None)
if cached is not None:
return cached
api_key = os.getenv("PARALLEL_API_KEY")
if not api_key:
raise ValueError(
"PARALLEL_API_KEY environment variable not set. "
"Get your API key at https://parallel.ai"
)
_ensure_parallel_sdk_installed()
from parallel import Parallel # noqa: WPS433 — deliberately lazy
client = Parallel(api_key=api_key)
_wt._parallel_client = client
return client
def _get_async_client() -> Any:
"""Lazy-load + cache the async Parallel client.
Cache lives on :mod:`tools.web_tools` (as ``_async_parallel_client``).
"""
import tools.web_tools as _wt
cached = getattr(_wt, "_async_parallel_client", None)
if cached is not None:
return cached
api_key = os.getenv("PARALLEL_API_KEY")
if not api_key:
raise ValueError(
"PARALLEL_API_KEY environment variable not set. "
"Get your API key at https://parallel.ai"
)
_ensure_parallel_sdk_installed()
from parallel import AsyncParallel # noqa: WPS433 — deliberately lazy
client = AsyncParallel(api_key=api_key)
_wt._async_parallel_client = client
return client
def _reset_clients_for_tests() -> None:
"""Drop both cached clients so tests can re-instantiate cleanly.
Clears the canonical slots on :mod:`tools.web_tools` (where
:func:`_get_sync_client` / :func:`_get_async_client` read/write them).
"""
import tools.web_tools as _wt
_wt._parallel_client = None
_wt._async_parallel_client = None
# Backward-compatible aliases for the names that lived in tools.web_tools
# before the migration (matches existing tests + external callers).
_get_parallel_client = _get_sync_client
_get_async_parallel_client = _get_async_client
def _resolve_search_mode() -> str:
"""Return the validated v1 search mode (default "advanced").
V1 collapses the three Beta modes into two. We accept the v1 values
directly and map the legacy Beta values for back-compat with anyone who
still sets ``PARALLEL_SEARCH_MODE=fast|one-shot|agentic``:
- ``fast`` / ``one-shot`` → ``basic`` (lower latency)
- ``agentic`` → ``advanced`` (higher quality, the v1 default)
"""
mode = os.getenv("PARALLEL_SEARCH_MODE", "advanced").lower().strip()
if mode == "basic" or mode in {"fast", "one-shot"}:
return "basic"
# advanced, legacy "agentic", and anything unrecognized → the v1 default.
return "advanced"
# ---------------------------------------------------------------------------
# Free Search MCP transport (keyless path)
# ---------------------------------------------------------------------------
#
# A small hand-rolled Streamable-HTTP JSON-RPC client for the hosted Search
# MCP, rather than the full MCP-client subsystem: we only call two tools
# (``web_search`` / ``web_fetch``), so keeping it inline lets web_search and
# web_extract stay ordinary tools with the MCP endpoint as just their wire
# protocol.
def _mcp_headers(
session_id: str | None,
api_key: str | None,
protocol_version: str | None = None,
) -> Dict[str, str]:
"""Headers for an MCP request.
A Bearer token is attached only when we actually hold a key — the free
endpoint is anonymous, and sending an empty/garbage token would make it
401 instead of serving the anonymous tier. After ``initialize`` the
Streamable-HTTP spec expects the negotiated ``MCP-Protocol-Version`` on
every follow-up request, so we echo it once known.
"""
headers = {
"Content-Type": "application/json",
"Accept": "application/json, text/event-stream",
"User-Agent": _MCP_USER_AGENT,
}
if session_id:
headers["Mcp-Session-Id"] = session_id
if protocol_version:
headers["MCP-Protocol-Version"] = protocol_version
if api_key:
headers["Authorization"] = f"Bearer {api_key}"
return headers
def _iter_mcp_messages(text: str):
"""Yield JSON-RPC message dicts from a plain-JSON or SSE response body.
Handles ``application/json`` (a single object) and ``text/event-stream``
(SSE: events separated by blank lines; an event's one-or-more ``data:``
lines concatenate into a single JSON payload). Unparseable chunks and
non-``data`` SSE fields (``event:``/``id:``/comments) are skipped.
"""
def _emit(payload):
# Streamable HTTP allows batching responses/notifications into a JSON
# array — flatten so callers always see individual message dicts.
if isinstance(payload, list):
yield from payload
elif payload is not None:
yield payload
body = (text or "").strip()
if not body:
return
if body.startswith("{") or body.startswith("["):
try:
parsed = json.loads(body)
except json.JSONDecodeError:
return
yield from _emit(parsed)
return
data_lines: List[str] = []
def _flush():
if not data_lines:
return None
try:
return json.loads("\n".join(data_lines))
except json.JSONDecodeError:
return None
for raw in body.split("\n"):
line = raw.rstrip("\r")
if line.startswith("data:"):
data_lines.append(line[len("data:"):].lstrip())
elif line.strip() == "": # event boundary
yield from _emit(_flush())
data_lines = []
yield from _emit(_flush())
def _mcp_response_envelope(text: str, request_id: str) -> Dict[str, Any]:
"""Select the JSON-RPC response for *request_id* from an MCP response body.
Streamable-HTTP servers may emit progress/log notifications before the
final result, so we scan the whole stream and return the result/error
message whose ``id`` matches our request. Falls back to the last
result/error-bearing message if no id matches; ``{}`` if none is present.
"""
fallback: Dict[str, Any] = {}
for msg in _iter_mcp_messages(text):
if not isinstance(msg, dict) or not ("result" in msg or "error" in msg):
continue
if msg.get("id") == request_id:
return msg
fallback = msg
return fallback
def _mcp_payload(envelope: Dict[str, Any]) -> Dict[str, Any]:
"""Extract the tool result payload from a ``tools/call`` envelope.
Prefers ``structuredContent`` (authoritative machine-readable form);
otherwise scans text blocks for the first JSON-parseable one. Raises on a
JSON-RPC error or a tool-level ``isError``.
"""
if "error" in envelope:
raise RuntimeError(f"Parallel MCP error: {str(envelope['error'])[:500]}")
result = envelope.get("result") or {}
if result.get("isError"):
raise RuntimeError(f"Parallel MCP tool error: {str(result)[:500]}")
structured = result.get("structuredContent")
if isinstance(structured, dict):
return structured
for block in result.get("content", []) or []:
if isinstance(block, dict) and block.get("type") == "text":
text = str(block.get("text") or "")
if not text:
continue
try:
return json.loads(text)
except json.JSONDecodeError:
continue
raise RuntimeError(
f"Parallel MCP returned no parseable content: {str(result)[:500]}"
)
def _mcp_call(
tool_name: str, arguments: Dict[str, Any], api_key: str | None
) -> Dict[str, Any]:
"""Run the MCP handshake then a single ``tools/call`` and return its payload.
initialize → (capture ``Mcp-Session-Id``) → notifications/initialized →
tools/call ``tool_name``. Returns the parsed tool payload dict (see
:func:`_mcp_payload`). A Bearer token is attached only when *api_key* is set.
"""
with httpx.Client(timeout=_MCP_TIMEOUT_SECONDS) as client:
# 1. initialize — capture the server-assigned MCP session id.
init_id = str(uuid.uuid4())
init = client.post(
_MCP_SEARCH_URL,
headers=_mcp_headers(None, api_key),
json={
"jsonrpc": "2.0",
"id": init_id,
"method": "initialize",
"params": {
"protocolVersion": _MCP_PROTOCOL_VERSION,
"capabilities": {},
"clientInfo": {
"name": _MCP_CLIENT_NAME,
"version": _MCP_CLIENT_VERSION,
},
},
},
)
init.raise_for_status()
# Only echo a session id the server actually issued. Stateless
# Streamable-HTTP servers may omit it; inventing one and sending it on
# follow-up requests can get those requests rejected (the server never
# created that session). When absent, the Mcp-Session-Id header is simply
# omitted (see _mcp_headers). This is separate from the tool-arg
# ``session_id`` below, which is a client-minted rate-limit/grouping id.
mcp_session_id = init.headers.get("mcp-session-id")
init_env = _mcp_response_envelope(init.text, init_id)
# Echo the negotiated protocol version on every post-init request, per
# the Streamable-HTTP spec (servers may enforce it).
negotiated_version = (
(init_env.get("result") or {}).get("protocolVersion")
or _MCP_PROTOCOL_VERSION
)
# 2. notifications/initialized — required handshake ack.
client.post(
_MCP_SEARCH_URL,
headers=_mcp_headers(mcp_session_id, api_key, negotiated_version),
json={"jsonrpc": "2.0", "method": "notifications/initialized"},
)
# 3. tools/call.
call_id = str(uuid.uuid4())
call = client.post(
_MCP_SEARCH_URL,
headers=_mcp_headers(mcp_session_id, api_key, negotiated_version),
json={
"jsonrpc": "2.0",
"id": call_id,
"method": "tools/call",
"params": {"name": tool_name, "arguments": arguments},
},
)
call.raise_for_status()
return _mcp_payload(_mcp_response_envelope(call.text, call_id))
def _mcp_web_search(query: str, limit: int, api_key: str | None) -> Dict[str, Any]:
"""Run a ``web_search`` tool call against the hosted Search MCP.
Returns the standard provider search shape
(``{"success": True, "data": {"web": [...]}}``). The MCP serves a fixed
result count, so ``limit`` is applied client-side. The MCP requires
``objective`` (REST treats it as optional), so we mirror the query.
"""
payload = _mcp_call(
"web_search",
{
"objective": query,
"search_queries": [query],
"session_id": _new_session_id(),
},
api_key,
)
web_results: List[Dict[str, Any]] = []
for i, result in enumerate((payload.get("results") or [])[: max(limit, 1)]):
if not isinstance(result, dict):
continue
excerpts = result.get("excerpts") or []
web_results.append(
{
"url": result.get("url") or "",
"title": result.get("title") or "",
"description": " ".join(excerpts) if excerpts else "",
"position": i + 1,
}
)
# Credit the free tier (anonymous path only — keyed search uses REST and
# carries no attribution).
return {
"success": True,
"data": {"web": web_results},
"provider": "parallel",
"attribution": _FREE_MCP_ATTRIBUTION,
}
def _mcp_web_fetch(urls: List[str], api_key: str | None) -> List[Dict[str, Any]]:
"""Run a ``web_fetch`` tool call against the hosted Search MCP.
Returns the per-URL extract shape that
:func:`tools.web_tools.web_extract_tool` expects — exactly one row per input
URL, in request order (including duplicates). We pass ``full_content=True``
so the page body comes back as markdown (matching the keyed SDK path and
what extract callers/summarizers expect), falling back to excerpts only when
full content is absent. Any input the MCP didn't return is emitted as a
per-URL error row.
"""
payload = _mcp_call(
"web_fetch",
{"urls": list(urls), "full_content": True, "session_id": _new_session_id()},
api_key,
)
# Index the response by URL, then emit one row per *input* URL in order so
# duplicates and positional alignment with the request list are preserved.
by_url: Dict[str, Dict[str, Any]] = {}
for item in payload.get("results") or []:
if isinstance(item, dict) and item.get("url"):
by_url.setdefault(item["url"], item)
results: List[Dict[str, Any]] = []
for url in urls:
item = by_url.get(url)
if item is None:
results.append(
{
"url": url,
"title": "",
"content": "",
"error": "extraction failed (no content returned)",
"metadata": {"sourceURL": url},
}
)
continue
title = item.get("title") or ""
# Prefer the full page body; fall back to joined excerpts (mirrors the
# keyed SDK extract path).
content = item.get("full_content") or "\n\n".join(item.get("excerpts") or [])
results.append(
{
"url": url,
"title": title,
"content": content,
"raw_content": content,
"metadata": {"sourceURL": url, "title": title},
}
)
return results
class ParallelWebSearchProvider(WebSearchProvider):
"""Parallel.ai search + async extract provider."""
@property
def name(self) -> str:
return "parallel"
@property
def display_name(self) -> str:
return "Parallel"
def is_available(self) -> bool:
"""Return True when ``PARALLEL_API_KEY`` is set.
Deliberately key-based: this gates the registry's active-provider walk
and the ``hermes tools`` picker (auto-selecting Parallel for a user who
hasn't named it), so it must not claim availability on the keyless path.
The keyless free-MCP path is reached independently via
:func:`tools.web_tools._get_backend`'s ``parallel`` terminal default.
"""
return bool(os.getenv("PARALLEL_API_KEY", "").strip())
def supports_search(self) -> bool:
return True
def supports_extract(self) -> bool:
return True
def search(self, query: str, limit: int = 5) -> Dict[str, Any]:
"""Execute a Parallel search (sync).
With ``PARALLEL_API_KEY`` set, uses the v1 ``search`` REST endpoint with
the configured mode (``PARALLEL_SEARCH_MODE`` env var, default
"advanced"; limit requested via advanced_settings.max_results, capped at
20). Without a key, falls back to the free hosted Search MCP so search
still works with zero setup.
"""
try:
from tools.interrupt import is_interrupted
if is_interrupted():
return {"success": False, "error": "Interrupted"}
api_key = os.getenv("PARALLEL_API_KEY", "").strip()
if not api_key:
logger.info(
"Parallel search (free MCP): '%s' (limit=%d)", query, limit
)
return _mcp_web_search(query, limit, api_key=None)
mode = _resolve_search_mode()
logger.info(
"Parallel search (v1 REST): '%s' (mode=%s, limit=%d)",
query, mode, limit,
)
# v1 Search API. Request the caller's limit via max_results (capped
# at 20) so we don't rely on the API default — the slice below can
# only trim, not ask for more.
response = _get_sync_client().search(
search_queries=[query],
objective=query,
mode=mode,
session_id=_new_session_id(),
advanced_settings={"max_results": min(max(limit, 1), 20)},
)
web_results = []
for i, result in enumerate((response.results or [])[: max(limit, 1)]):
excerpts = result.excerpts or []
web_results.append(
{
"url": result.url or "",
"title": result.title or "",
"description": " ".join(excerpts) if excerpts else "",
"position": i + 1,
}
)
# Paid/REST path: no attribution and no "[Parallel]" label — the
# branding is specifically for the free Search MCP tier.
return {"success": True, "data": {"web": web_results}}
except ValueError as exc:
return {"success": False, "error": str(exc)}
except ImportError as exc:
return {
"success": False,
"error": f"Parallel SDK not installed: {exc}",
}
except Exception as exc: # noqa: BLE001
logger.warning("Parallel search error: %s", exc)
return {"success": False, "error": f"Parallel search failed: {exc}"}
async def extract(
self, urls: List[str], **kwargs: Any
) -> List[Dict[str, Any]]:
"""Extract content from one or more URLs.
With ``PARALLEL_API_KEY`` set, uses the async SDK's v1 ``extract`` for
full page content. Without a key, falls back to the free hosted Search
MCP's ``web_fetch`` tool so extraction works with zero setup, mirroring
the keyless search path.
Returns the legacy list-of-results shape that
:func:`tools.web_tools.web_extract_tool` expects: one entry per
successful URL plus one entry per failed URL with an ``error``
field. Errors are not raised — they're returned as per-URL items.
"""
try:
from tools.interrupt import is_interrupted
if is_interrupted():
return [
{"url": u, "error": "Interrupted", "title": ""} for u in urls
]
api_key = os.getenv("PARALLEL_API_KEY", "").strip()
if not api_key:
logger.info(
"Parallel extract (free MCP web_fetch): %d URL(s)", len(urls)
)
# _mcp_web_fetch is sync httpx; run off the event loop.
return await asyncio.to_thread(_mcp_web_fetch, list(urls), None)
logger.info("Parallel extract (v1 REST): %d URL(s)", len(urls))
# v1 Extract API (client.extract, /v1/extract); full_content is set
# via advanced_settings.
response = await _get_async_client().extract(
urls=urls,
advanced_settings={"full_content": True},
session_id=_new_session_id(),
)
results: List[Dict[str, Any]] = []
for result in response.results or []:
content = result.full_content or ""
if not content:
content = "\n\n".join(result.excerpts or [])
url = result.url or ""
title = result.title or ""
results.append(
{
"url": url,
"title": title,
"content": content,
"raw_content": content,
"metadata": {"sourceURL": url, "title": title},
}
)
for error in response.errors or []:
err_url = getattr(error, "url", "") or ""
err_msg = (
getattr(error, "message", None)
or getattr(error, "content", None)
or getattr(error, "error_type", None)
or "extraction failed"
)
results.append(
{
"url": err_url,
"title": "",
"content": "",
"error": err_msg,
"metadata": {"sourceURL": err_url},
}
)
return results
except ValueError as exc:
return [{"url": u, "title": "", "content": "", "error": str(exc)} for u in urls]
except ImportError as exc:
return [
{"url": u, "title": "", "content": "", "error": f"Parallel SDK not installed: {exc}"}
for u in urls
]
except Exception as exc: # noqa: BLE001
logger.warning("Parallel extract error: %s", exc)
return [
{"url": u, "title": "", "content": "", "error": f"Parallel extract failed: {exc}"}
for u in urls
]
def get_setup_schema(self) -> Dict[str, Any]:
return {
"name": "Parallel",
"badge": "free",
"tag": (
"Free web search + extraction via Parallel's hosted Search MCP "
"— no key needed. Add PARALLEL_API_KEY for the v1 REST Search "
"API (richer modes, higher limits)."
),
"env_vars": [
{
"key": "PARALLEL_API_KEY",
"prompt": "Parallel API key (optional — unlocks the v1 REST Search API)",
"url": "https://parallel.ai",
},
],
}