hermes-agent/plugins/web/parallel/provider.py
kshitijk4poor 657e6d87cc fix(web): align _LEGACY_PREFERENCE with legacy 7-provider order + doc cleanup
Self-review of the plugin migration surfaced one warning and a handful of
doc/dead-code cleanups. None affect production behaviour through the main
dispatcher (which always calls `tools.web_tools._get_backend()` first and
preserves the full 7-provider walk), but direct callers of
`agent.web_search_registry.get_active_*_provider()` previously diverged
from the legacy order and could return `None` for users with credentials
but no explicit `web.backend` config key.

Changes
-------
1. `_LEGACY_PREFERENCE` was shipped as a 4-tuple
   `("brave-free", "firecrawl", "searxng", "ddgs")` while the PR
   description and the legacy `_get_backend()` candidate order both
   call for the 7-tuple
   `(firecrawl, parallel, tavily, exa, searxng, brave-free, ddgs)`.
   Replaced with the 7-tuple. Verified empirically: with TAVILY+EXA keys
   and no config, `get_active_search_provider()` now returns tavily
   (was None); with EXA+PARALLEL it returns parallel (was None); with
   BRAVE+FIRECRAWL it returns firecrawl (was brave-free).

2. `agent/web_search_registry.py` — module docstring, `_resolve` step-3
   docstring, and inline comment all listed the old 4-tuple and claimed
   "brave-free first because it was the shipped default". The legacy
   default is `"firecrawl"`. Rewritten to match the new ordering and
   reference `tools.web_tools._get_backend()` as the source of truth.

3. `agent/web_search_registry.py` — `get_active_crawl_provider`
   docstring said "only Tavily implements it among built-in providers".
   Firecrawl also advertises `supports_crawl=True` after the previous
   commit. Updated to "Tavily and Firecrawl".

4. `plugins/web/tavily/provider.py` — module docstring said "Tavily is
   the only built-in backend that natively crawls". Updated.

5. `agent/web_search_provider.py` — ABC docstring mentioned only
   `search` / `extract` capabilities. Added `crawl` for accuracy.

6. `plugins/web/{firecrawl,parallel,exa}/provider.py` — dead plugin-level
   cache globals (`_firecrawl_client`, `_parallel_client`,
   `_async_parallel_client`, `_exa_client`) were declared but never read
   (all reads/writes go through `_wt.*` per the `extracting-inline-
   helpers-to-plugins` recipe). Removed the dead declarations; the
   reset-for-tests helpers in firecrawl + parallel now clear the
   canonical `_wt._<name>` slots, matching the pattern exa already used.

Tests
-----
218/218 web-targeted tests still pass (no test changes needed). 4910/4910
in `tests/tools/` still green.
2026-05-13 22:31:28 -07:00

291 lines
9.9 KiB
Python

"""Parallel.ai web search + content extraction — plugin form.
Subclasses :class:`agent.web_search_provider.WebSearchProvider`. Uses two
distinct Parallel SDK clients:
- ``Parallel`` (sync) — for :meth:`search`
- ``AsyncParallel`` (async) — for :meth:`extract`
This is the first plugin to exercise the **async-extract** code path in
the ABC: :meth:`extract` is declared ``async def``, and the dispatcher
in :func:`tools.web_tools.web_extract_tool` detects coroutines via
:func:`inspect.iscoroutinefunction` and awaits.
Config keys this provider responds to::
web:
search_backend: "parallel" # explicit per-capability
extract_backend: "parallel" # explicit per-capability
backend: "parallel" # shared fallback
# Optional: search mode (default "agentic"; also "fast" or "one-shot")
# via the PARALLEL_SEARCH_MODE env var.
Env vars::
PARALLEL_API_KEY=... # https://parallel.ai (required)
PARALLEL_SEARCH_MODE=agentic # optional: agentic|fast|one-shot
"""
from __future__ import annotations
import logging
import os
from typing import Any, Dict, List
from agent.web_search_provider import WebSearchProvider
logger = logging.getLogger(__name__)
# Module-level note: the canonical cache slots ``_parallel_client`` and
# ``_async_parallel_client`` live on :mod:`tools.web_tools` so tests that do
# ``tools.web_tools._parallel_client = None`` between cases see fresh state.
# The plugin reads/writes through that public module (see
# :func:`_get_sync_client` / :func:`_get_async_client`).
def _ensure_parallel_sdk_installed() -> None:
"""Trigger lazy install of the parallel SDK if it isn't present.
Mirrors the lazy-deps pattern used by the legacy implementation.
Swallows benign ImportError from the lazy_deps helper itself; if the
SDK is genuinely missing the subsequent ``from parallel import ...``
raises ImportError that the caller can handle.
"""
try:
from tools.lazy_deps import ensure as _lazy_ensure
_lazy_ensure("search.parallel", prompt=False)
except ImportError:
pass
except Exception as exc: # noqa: BLE001 — surface install hint as ImportError
raise ImportError(str(exc))
def _get_sync_client() -> Any:
"""Lazy-load + cache the sync Parallel client.
Cache lives on :mod:`tools.web_tools` (as ``_parallel_client``) so unit
tests that reset that name between cases keep working.
"""
import tools.web_tools as _wt
cached = getattr(_wt, "_parallel_client", None)
if cached is not None:
return cached
api_key = os.getenv("PARALLEL_API_KEY")
if not api_key:
raise ValueError(
"PARALLEL_API_KEY environment variable not set. "
"Get your API key at https://parallel.ai"
)
_ensure_parallel_sdk_installed()
from parallel import Parallel # noqa: WPS433 — deliberately lazy
client = Parallel(api_key=api_key)
_wt._parallel_client = client
return client
def _get_async_client() -> Any:
"""Lazy-load + cache the async Parallel client.
Cache lives on :mod:`tools.web_tools` (as ``_async_parallel_client``).
"""
import tools.web_tools as _wt
cached = getattr(_wt, "_async_parallel_client", None)
if cached is not None:
return cached
api_key = os.getenv("PARALLEL_API_KEY")
if not api_key:
raise ValueError(
"PARALLEL_API_KEY environment variable not set. "
"Get your API key at https://parallel.ai"
)
_ensure_parallel_sdk_installed()
from parallel import AsyncParallel # noqa: WPS433 — deliberately lazy
client = AsyncParallel(api_key=api_key)
_wt._async_parallel_client = client
return client
def _reset_clients_for_tests() -> None:
"""Drop both cached clients so tests can re-instantiate cleanly.
Clears the canonical slots on :mod:`tools.web_tools` (where
:func:`_get_sync_client` / :func:`_get_async_client` read/write them).
"""
import tools.web_tools as _wt
_wt._parallel_client = None
_wt._async_parallel_client = None
# Backward-compatible aliases for the names that lived in tools.web_tools
# before the migration (matches existing tests + external callers).
_get_parallel_client = _get_sync_client
_get_async_parallel_client = _get_async_client
def _resolve_search_mode() -> str:
"""Return the validated PARALLEL_SEARCH_MODE value (default "agentic")."""
mode = os.getenv("PARALLEL_SEARCH_MODE", "agentic").lower().strip()
if mode not in {"fast", "one-shot", "agentic"}:
mode = "agentic"
return mode
class ParallelWebSearchProvider(WebSearchProvider):
"""Parallel.ai search + async extract provider."""
@property
def name(self) -> str:
return "parallel"
@property
def display_name(self) -> str:
return "Parallel"
def is_available(self) -> bool:
"""Return True when ``PARALLEL_API_KEY`` is set to a non-empty value."""
return bool(os.getenv("PARALLEL_API_KEY", "").strip())
def supports_search(self) -> bool:
return True
def supports_extract(self) -> bool:
return True
def search(self, query: str, limit: int = 5) -> Dict[str, Any]:
"""Execute a Parallel search (sync).
Uses the ``beta.search`` endpoint with the configured mode
(``PARALLEL_SEARCH_MODE`` env var, default "agentic"). Limit is
capped at 20 server-side.
"""
try:
from tools.interrupt import is_interrupted
if is_interrupted():
return {"success": False, "error": "Interrupted"}
mode = _resolve_search_mode()
logger.info(
"Parallel search: '%s' (mode=%s, limit=%d)", query, mode, limit
)
response = _get_sync_client().beta.search(
search_queries=[query],
objective=query,
mode=mode,
max_results=min(limit, 20),
)
web_results = []
for i, result in enumerate(response.results or []):
excerpts = result.excerpts or []
web_results.append(
{
"url": result.url or "",
"title": result.title or "",
"description": " ".join(excerpts) if excerpts else "",
"position": i + 1,
}
)
return {"success": True, "data": {"web": web_results}}
except ValueError as exc:
return {"success": False, "error": str(exc)}
except ImportError as exc:
return {
"success": False,
"error": f"Parallel SDK not installed: {exc}",
}
except Exception as exc: # noqa: BLE001
logger.warning("Parallel search error: %s", exc)
return {"success": False, "error": f"Parallel search failed: {exc}"}
async def extract(
self, urls: List[str], **kwargs: Any
) -> List[Dict[str, Any]]:
"""Extract content from one or more URLs via the async SDK.
Returns the legacy list-of-results shape that
:func:`tools.web_tools.web_extract_tool` expects: one entry per
successful URL plus one entry per failed URL with an ``error``
field. Errors are not raised — they're returned as per-URL items.
"""
try:
from tools.interrupt import is_interrupted
if is_interrupted():
return [
{"url": u, "error": "Interrupted", "title": ""} for u in urls
]
logger.info("Parallel extract: %d URL(s)", len(urls))
response = await _get_async_client().beta.extract(
urls=urls,
full_content=True,
)
results: List[Dict[str, Any]] = []
for result in response.results or []:
content = result.full_content or ""
if not content:
content = "\n\n".join(result.excerpts or [])
url = result.url or ""
title = result.title or ""
results.append(
{
"url": url,
"title": title,
"content": content,
"raw_content": content,
"metadata": {"sourceURL": url, "title": title},
}
)
for error in response.errors or []:
results.append(
{
"url": error.url or "",
"title": "",
"content": "",
"error": error.content or error.error_type or "extraction failed",
"metadata": {"sourceURL": error.url or ""},
}
)
return results
except ValueError as exc:
return [{"url": u, "title": "", "content": "", "error": str(exc)} for u in urls]
except ImportError as exc:
return [
{"url": u, "title": "", "content": "", "error": f"Parallel SDK not installed: {exc}"}
for u in urls
]
except Exception as exc: # noqa: BLE001
logger.warning("Parallel extract error: %s", exc)
return [
{"url": u, "title": "", "content": "", "error": f"Parallel extract failed: {exc}"}
for u in urls
]
def get_setup_schema(self) -> Dict[str, Any]:
return {
"name": "Parallel",
"badge": "paid",
"tag": "Objective-tuned search + parallel page extraction.",
"env_vars": [
{
"key": "PARALLEL_API_KEY",
"prompt": "Parallel API key",
"url": "https://parallel.ai",
},
],
}