mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-15 09:21:36 +00:00
Remove the free Parallel Search MCP path and restore the keyed Parallel backend behavior from before it was introduced. Also drops the keyless fallback registration/display labeling tests and returns the Parallel SDK pin to the prior version.
291 lines
9.9 KiB
Python
291 lines
9.9 KiB
Python
"""Parallel.ai web search + content extraction — plugin form.
|
|
|
|
Subclasses :class:`agent.web_search_provider.WebSearchProvider`. Uses two
|
|
distinct Parallel SDK clients:
|
|
|
|
- ``Parallel`` (sync) — for :meth:`search`
|
|
- ``AsyncParallel`` (async) — for :meth:`extract`
|
|
|
|
This is the first plugin to exercise the **async-extract** code path in
|
|
the ABC: :meth:`extract` is declared ``async def``, and the dispatcher
|
|
in :func:`tools.web_tools.web_extract_tool` detects coroutines via
|
|
:func:`inspect.iscoroutinefunction` and awaits.
|
|
|
|
Config keys this provider responds to::
|
|
|
|
web:
|
|
search_backend: "parallel" # explicit per-capability
|
|
extract_backend: "parallel" # explicit per-capability
|
|
backend: "parallel" # shared fallback
|
|
# Optional: search mode (default "agentic"; also "fast" or "one-shot")
|
|
# via the PARALLEL_SEARCH_MODE env var.
|
|
|
|
Env vars::
|
|
|
|
PARALLEL_API_KEY=... # https://parallel.ai (required)
|
|
PARALLEL_SEARCH_MODE=agentic # optional: agentic|fast|one-shot
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import os
|
|
from typing import Any, Dict, List
|
|
|
|
from agent.web_search_provider import WebSearchProvider
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Module-level note: the canonical cache slots ``_parallel_client`` and
|
|
# ``_async_parallel_client`` live on :mod:`tools.web_tools` so tests that do
|
|
# ``tools.web_tools._parallel_client = None`` between cases see fresh state.
|
|
# The plugin reads/writes through that public module (see
|
|
# :func:`_get_sync_client` / :func:`_get_async_client`).
|
|
|
|
|
|
def _ensure_parallel_sdk_installed() -> None:
|
|
"""Trigger lazy install of the parallel SDK if it isn't present.
|
|
|
|
Mirrors the lazy-deps pattern used by the legacy implementation.
|
|
Swallows benign ImportError from the lazy_deps helper itself; if the
|
|
SDK is genuinely missing the subsequent ``from parallel import ...``
|
|
raises ImportError that the caller can handle.
|
|
"""
|
|
try:
|
|
from tools.lazy_deps import ensure as _lazy_ensure
|
|
|
|
_lazy_ensure("search.parallel", prompt=False)
|
|
except ImportError:
|
|
pass
|
|
except Exception as exc: # noqa: BLE001 — surface install hint as ImportError
|
|
raise ImportError(str(exc))
|
|
|
|
|
|
def _get_sync_client() -> Any:
|
|
"""Lazy-load + cache the sync Parallel client.
|
|
|
|
Cache lives on :mod:`tools.web_tools` (as ``_parallel_client``) so unit
|
|
tests that reset that name between cases keep working.
|
|
"""
|
|
import tools.web_tools as _wt
|
|
|
|
cached = getattr(_wt, "_parallel_client", None)
|
|
if cached is not None:
|
|
return cached
|
|
|
|
api_key = os.getenv("PARALLEL_API_KEY")
|
|
if not api_key:
|
|
raise ValueError(
|
|
"PARALLEL_API_KEY environment variable not set. "
|
|
"Get your API key at https://parallel.ai"
|
|
)
|
|
|
|
_ensure_parallel_sdk_installed()
|
|
from parallel import Parallel # noqa: WPS433 — deliberately lazy
|
|
|
|
client = Parallel(api_key=api_key)
|
|
_wt._parallel_client = client
|
|
return client
|
|
|
|
|
|
def _get_async_client() -> Any:
|
|
"""Lazy-load + cache the async Parallel client.
|
|
|
|
Cache lives on :mod:`tools.web_tools` (as ``_async_parallel_client``).
|
|
"""
|
|
import tools.web_tools as _wt
|
|
|
|
cached = getattr(_wt, "_async_parallel_client", None)
|
|
if cached is not None:
|
|
return cached
|
|
|
|
api_key = os.getenv("PARALLEL_API_KEY")
|
|
if not api_key:
|
|
raise ValueError(
|
|
"PARALLEL_API_KEY environment variable not set. "
|
|
"Get your API key at https://parallel.ai"
|
|
)
|
|
|
|
_ensure_parallel_sdk_installed()
|
|
from parallel import AsyncParallel # noqa: WPS433 — deliberately lazy
|
|
|
|
client = AsyncParallel(api_key=api_key)
|
|
_wt._async_parallel_client = client
|
|
return client
|
|
|
|
|
|
def _reset_clients_for_tests() -> None:
|
|
"""Drop both cached clients so tests can re-instantiate cleanly.
|
|
|
|
Clears the canonical slots on :mod:`tools.web_tools` (where
|
|
:func:`_get_sync_client` / :func:`_get_async_client` read/write them).
|
|
"""
|
|
import tools.web_tools as _wt
|
|
|
|
_wt._parallel_client = None
|
|
_wt._async_parallel_client = None
|
|
|
|
|
|
# Backward-compatible aliases for the names that lived in tools.web_tools
|
|
# before the migration (matches existing tests + external callers).
|
|
_get_parallel_client = _get_sync_client
|
|
_get_async_parallel_client = _get_async_client
|
|
|
|
|
|
def _resolve_search_mode() -> str:
|
|
"""Return the validated PARALLEL_SEARCH_MODE value (default "agentic")."""
|
|
mode = os.getenv("PARALLEL_SEARCH_MODE", "agentic").lower().strip()
|
|
if mode not in {"fast", "one-shot", "agentic"}:
|
|
mode = "agentic"
|
|
return mode
|
|
|
|
|
|
class ParallelWebSearchProvider(WebSearchProvider):
|
|
"""Parallel.ai search + async extract provider."""
|
|
|
|
@property
|
|
def name(self) -> str:
|
|
return "parallel"
|
|
|
|
@property
|
|
def display_name(self) -> str:
|
|
return "Parallel"
|
|
|
|
def is_available(self) -> bool:
|
|
"""Return True when ``PARALLEL_API_KEY`` is set to a non-empty value."""
|
|
return bool(os.getenv("PARALLEL_API_KEY", "").strip())
|
|
|
|
def supports_search(self) -> bool:
|
|
return True
|
|
|
|
def supports_extract(self) -> bool:
|
|
return True
|
|
|
|
def search(self, query: str, limit: int = 5) -> Dict[str, Any]:
|
|
"""Execute a Parallel search (sync).
|
|
|
|
Uses the ``beta.search`` endpoint with the configured mode
|
|
(``PARALLEL_SEARCH_MODE`` env var, default "agentic"). Limit is
|
|
capped at 20 server-side.
|
|
"""
|
|
try:
|
|
from tools.interrupt import is_interrupted
|
|
|
|
if is_interrupted():
|
|
return {"success": False, "error": "Interrupted"}
|
|
|
|
mode = _resolve_search_mode()
|
|
logger.info(
|
|
"Parallel search: '%s' (mode=%s, limit=%d)", query, mode, limit
|
|
)
|
|
response = _get_sync_client().beta.search(
|
|
search_queries=[query],
|
|
objective=query,
|
|
mode=mode,
|
|
max_results=min(limit, 20),
|
|
)
|
|
|
|
web_results = []
|
|
for i, result in enumerate(response.results or []):
|
|
excerpts = result.excerpts or []
|
|
web_results.append(
|
|
{
|
|
"url": result.url or "",
|
|
"title": result.title or "",
|
|
"description": " ".join(excerpts) if excerpts else "",
|
|
"position": i + 1,
|
|
}
|
|
)
|
|
|
|
return {"success": True, "data": {"web": web_results}}
|
|
except ValueError as exc:
|
|
return {"success": False, "error": str(exc)}
|
|
except ImportError as exc:
|
|
return {
|
|
"success": False,
|
|
"error": f"Parallel SDK not installed: {exc}",
|
|
}
|
|
except Exception as exc: # noqa: BLE001
|
|
logger.warning("Parallel search error: %s", exc)
|
|
return {"success": False, "error": f"Parallel search failed: {exc}"}
|
|
|
|
async def extract(
|
|
self, urls: List[str], **kwargs: Any
|
|
) -> List[Dict[str, Any]]:
|
|
"""Extract content from one or more URLs via the async SDK.
|
|
|
|
Returns the legacy list-of-results shape that
|
|
:func:`tools.web_tools.web_extract_tool` expects: one entry per
|
|
successful URL plus one entry per failed URL with an ``error``
|
|
field. Errors are not raised — they're returned as per-URL items.
|
|
"""
|
|
try:
|
|
from tools.interrupt import is_interrupted
|
|
|
|
if is_interrupted():
|
|
return [
|
|
{"url": u, "error": "Interrupted", "title": ""} for u in urls
|
|
]
|
|
|
|
logger.info("Parallel extract: %d URL(s)", len(urls))
|
|
response = await _get_async_client().beta.extract(
|
|
urls=urls,
|
|
full_content=True,
|
|
)
|
|
|
|
results: List[Dict[str, Any]] = []
|
|
for result in response.results or []:
|
|
content = result.full_content or ""
|
|
if not content:
|
|
content = "\n\n".join(result.excerpts or [])
|
|
url = result.url or ""
|
|
title = result.title or ""
|
|
results.append(
|
|
{
|
|
"url": url,
|
|
"title": title,
|
|
"content": content,
|
|
"raw_content": content,
|
|
"metadata": {"sourceURL": url, "title": title},
|
|
}
|
|
)
|
|
|
|
for error in response.errors or []:
|
|
results.append(
|
|
{
|
|
"url": error.url or "",
|
|
"title": "",
|
|
"content": "",
|
|
"error": error.content or error.error_type or "extraction failed",
|
|
"metadata": {"sourceURL": error.url or ""},
|
|
}
|
|
)
|
|
|
|
return results
|
|
except ValueError as exc:
|
|
return [{"url": u, "title": "", "content": "", "error": str(exc)} for u in urls]
|
|
except ImportError as exc:
|
|
return [
|
|
{"url": u, "title": "", "content": "", "error": f"Parallel SDK not installed: {exc}"}
|
|
for u in urls
|
|
]
|
|
except Exception as exc: # noqa: BLE001
|
|
logger.warning("Parallel extract error: %s", exc)
|
|
return [
|
|
{"url": u, "title": "", "content": "", "error": f"Parallel extract failed: {exc}"}
|
|
for u in urls
|
|
]
|
|
|
|
def get_setup_schema(self) -> Dict[str, Any]:
|
|
return {
|
|
"name": "Parallel",
|
|
"badge": "paid",
|
|
"tag": "Objective-tuned search + parallel page extraction.",
|
|
"env_vars": [
|
|
{
|
|
"key": "PARALLEL_API_KEY",
|
|
"prompt": "Parallel API key",
|
|
"url": "https://parallel.ai",
|
|
},
|
|
],
|
|
}
|