hermes-agent/plugins/web/parallel/provider.py
kshitijk4poor 748f3e016b refactor(web): delete inline vendor helpers, re-export from plugins
Removes ~580 lines of dead code from tools/web_tools.py that were
superseded by the plugin migration but kept around in the cutover commit
to keep the diff focused. Replaces them with thin re-export shims so
existing tests and external callers that reach for the legacy
``tools.web_tools.<name>`` paths continue to work transparently.

Deleted from tools/web_tools.py
--------------------------------
- Lazy Firecrawl SDK proxy (_load_firecrawl_cls, _FirecrawlProxy,
  _FIRECRAWL_CLS_CACHE, the Firecrawl singleton)
- Firecrawl client section (_get_direct_firecrawl_config,
  _get_firecrawl_gateway_url, _is_tool_gateway_ready,
  _has_direct_firecrawl_config, _raise_web_backend_configuration_error,
  _firecrawl_backend_help_suffix, _get_firecrawl_client)
- Parallel client section (_get_parallel_client,
  _get_async_parallel_client, _parallel_client, _async_parallel_client)
- Tavily client section (_TAVILY_BASE_URL, _tavily_request,
  _normalize_tavily_search_results, _normalize_tavily_documents)
- Generic SDK normalizers (_to_plain_object, _normalize_result_list,
  _extract_web_search_results, _extract_scrape_payload)
- Exa client section (_get_exa_client, _exa_client, _exa_search,
  _exa_extract)
- Parallel helpers (_parallel_search, _parallel_extract)
- Duplicate inline check_firecrawl_api_key

Net: tools/web_tools.py drops from 2227 → 1613 lines (-614 lines).

Re-exports added at top of tools/web_tools.py
---------------------------------------------
- From plugins.web.firecrawl.provider:
  Firecrawl, _FirecrawlProxy, _FIRECRAWL_CLS_CACHE, _load_firecrawl_cls,
  _get_direct_firecrawl_config, _get_firecrawl_gateway_url,
  _is_tool_gateway_ready, _has_direct_firecrawl_config,
  _firecrawl_backend_help_suffix, _raise_web_backend_configuration_error,
  _get_firecrawl_client, _to_plain_object, _normalize_result_list,
  _extract_web_search_results, _extract_scrape_payload,
  check_firecrawl_api_key
- From plugins.web.tavily.provider:
  _tavily_request, _normalize_tavily_search_results,
  _normalize_tavily_documents
- From plugins.web.parallel.provider:
  _get_parallel_client, _get_async_parallel_client
- From plugins.web.exa.provider:
  _get_exa_client

Plus retained module-level imports for backward-compat with tests:
- httpx (tests patch tools.web_tools.httpx for tavily request mocking)
- build_vendor_gateway_url, _read_nous_access_token,
  resolve_managed_tool_gateway, managed_nous_tools_enabled,
  prefers_gateway (tests patch tools.web_tools.<name>)

Plugin indirection pattern (key technique)
------------------------------------------
For functions inside the firecrawl/parallel/exa plugins to honor
unit-test patches that target ``tools.web_tools.<name>``, the plugin
implementations now do ``import tools.web_tools as _wt`` at call time
and read helper names through that module (``_wt._read_nous_access_token``,
``_wt.Firecrawl``, ``_wt.prefers_gateway``, etc.). This makes the
existing test patches transparently reach the plugin code without any
test changes.

The cached client globals (_firecrawl_client, _firecrawl_client_config,
_parallel_client, _async_parallel_client, _exa_client) also now live on
tools.web_tools so existing test setup_method handlers that reset
``tools.web_tools._<vendor>_client = None`` between cases keep working.
The plugins read/write the cache via getattr/setattr on the web_tools
module.

Verified
--------
- 173/173 targeted web tests pass:
  test_web_providers.py, test_web_providers_brave_free.py,
  test_web_providers_ddgs.py, test_web_providers_searxng.py,
  test_web_tools_config.py, test_web_tools_tavily.py,
  test_website_policy.py, test_config_null_guard.py
- Compile-clean (py_compile.compile passes)
- All inline implementations now exist in exactly one place
  (plugins.web.<vendor>.provider)

Follow-up clean-up
------------------
- Drop _WEB_PLUGIN_SKIPLIST + hardcoded TOOL_CATEGORIES["web"] rows
  (next commit)
- Delete tools/web_providers/ directory entirely
- Add tests/plugins/web/ coverage
- Full tests/tools/ + tests/gateway/ regression sweep before promoting PR
2026-05-13 22:31:28 -07:00

288 lines
9.8 KiB
Python

"""Parallel.ai web search + content extraction — plugin form.
Subclasses :class:`agent.web_search_provider.WebSearchProvider`. Uses two
distinct Parallel SDK clients:
- ``Parallel`` (sync) — for :meth:`search`
- ``AsyncParallel`` (async) — for :meth:`extract`
This is the first plugin to exercise the **async-extract** code path in
the ABC: :meth:`extract` is declared ``async def``, and the dispatcher
in :func:`tools.web_tools.web_extract_tool` detects coroutines via
:func:`inspect.iscoroutinefunction` and awaits.
Config keys this provider responds to::
web:
search_backend: "parallel" # explicit per-capability
extract_backend: "parallel" # explicit per-capability
backend: "parallel" # shared fallback
# Optional: search mode (default "agentic"; also "fast" or "one-shot")
# via the PARALLEL_SEARCH_MODE env var.
Env vars::
PARALLEL_API_KEY=... # https://parallel.ai (required)
PARALLEL_SEARCH_MODE=agentic # optional: agentic|fast|one-shot
"""
from __future__ import annotations
import logging
import os
from typing import Any, Dict, List
from agent.web_search_provider import WebSearchProvider
logger = logging.getLogger(__name__)
# Module-level client caches mirroring the legacy `tools.web_tools._parallel_client`
# / `_async_parallel_client` pattern. For tests, the canonical cache lives on
# tools.web_tools so existing setup_method() handlers that reset
# ``tools.web_tools._parallel_client = None`` keep working — we read/write
# the cache via that module rather than these module-level globals.
_parallel_client: Any = None
_async_parallel_client: Any = None
def _ensure_parallel_sdk_installed() -> None:
"""Trigger lazy install of the parallel SDK if it isn't present.
Mirrors the lazy-deps pattern used by the legacy implementation.
Swallows benign ImportError from the lazy_deps helper itself; if the
SDK is genuinely missing the subsequent ``from parallel import ...``
raises ImportError that the caller can handle.
"""
try:
from tools.lazy_deps import ensure as _lazy_ensure
_lazy_ensure("search.parallel", prompt=False)
except ImportError:
pass
except Exception as exc: # noqa: BLE001 — surface install hint as ImportError
raise ImportError(str(exc))
def _get_sync_client() -> Any:
"""Lazy-load + cache the sync Parallel client.
Cache lives on :mod:`tools.web_tools` (as ``_parallel_client``) so unit
tests that reset that name between cases keep working.
"""
import tools.web_tools as _wt
cached = getattr(_wt, "_parallel_client", None)
if cached is not None:
return cached
api_key = os.getenv("PARALLEL_API_KEY")
if not api_key:
raise ValueError(
"PARALLEL_API_KEY environment variable not set. "
"Get your API key at https://parallel.ai"
)
_ensure_parallel_sdk_installed()
from parallel import Parallel # noqa: WPS433 — deliberately lazy
client = Parallel(api_key=api_key)
_wt._parallel_client = client
return client
def _get_async_client() -> Any:
"""Lazy-load + cache the async Parallel client.
Cache lives on :mod:`tools.web_tools` (as ``_async_parallel_client``).
"""
import tools.web_tools as _wt
cached = getattr(_wt, "_async_parallel_client", None)
if cached is not None:
return cached
api_key = os.getenv("PARALLEL_API_KEY")
if not api_key:
raise ValueError(
"PARALLEL_API_KEY environment variable not set. "
"Get your API key at https://parallel.ai"
)
_ensure_parallel_sdk_installed()
from parallel import AsyncParallel # noqa: WPS433 — deliberately lazy
client = AsyncParallel(api_key=api_key)
_wt._async_parallel_client = client
return client
def _reset_clients_for_tests() -> None:
"""Drop both cached clients so tests can re-instantiate cleanly."""
global _parallel_client, _async_parallel_client
_parallel_client = None
_async_parallel_client = None
# Backward-compatible aliases for the names that lived in tools.web_tools
# before the migration (matches existing tests + external callers).
_get_parallel_client = _get_sync_client
_get_async_parallel_client = _get_async_client
def _resolve_search_mode() -> str:
"""Return the validated PARALLEL_SEARCH_MODE value (default "agentic")."""
mode = os.getenv("PARALLEL_SEARCH_MODE", "agentic").lower().strip()
if mode not in {"fast", "one-shot", "agentic"}:
mode = "agentic"
return mode
class ParallelWebSearchProvider(WebSearchProvider):
"""Parallel.ai search + async extract provider."""
@property
def name(self) -> str:
return "parallel"
@property
def display_name(self) -> str:
return "Parallel"
def is_available(self) -> bool:
"""Return True when ``PARALLEL_API_KEY`` is set to a non-empty value."""
return bool(os.getenv("PARALLEL_API_KEY", "").strip())
def supports_search(self) -> bool:
return True
def supports_extract(self) -> bool:
return True
def search(self, query: str, limit: int = 5) -> Dict[str, Any]:
"""Execute a Parallel search (sync).
Uses the ``beta.search`` endpoint with the configured mode
(``PARALLEL_SEARCH_MODE`` env var, default "agentic"). Limit is
capped at 20 server-side.
"""
try:
from tools.interrupt import is_interrupted
if is_interrupted():
return {"success": False, "error": "Interrupted"}
mode = _resolve_search_mode()
logger.info(
"Parallel search: '%s' (mode=%s, limit=%d)", query, mode, limit
)
response = _get_sync_client().beta.search(
search_queries=[query],
objective=query,
mode=mode,
max_results=min(limit, 20),
)
web_results = []
for i, result in enumerate(response.results or []):
excerpts = result.excerpts or []
web_results.append(
{
"url": result.url or "",
"title": result.title or "",
"description": " ".join(excerpts) if excerpts else "",
"position": i + 1,
}
)
return {"success": True, "data": {"web": web_results}}
except ValueError as exc:
return {"success": False, "error": str(exc)}
except ImportError as exc:
return {
"success": False,
"error": f"Parallel SDK not installed: {exc}",
}
except Exception as exc: # noqa: BLE001
logger.warning("Parallel search error: %s", exc)
return {"success": False, "error": f"Parallel search failed: {exc}"}
async def extract(
self, urls: List[str], **kwargs: Any
) -> List[Dict[str, Any]]:
"""Extract content from one or more URLs via the async SDK.
Returns the legacy list-of-results shape that
:func:`tools.web_tools.web_extract_tool` expects: one entry per
successful URL plus one entry per failed URL with an ``error``
field. Errors are not raised — they're returned as per-URL items.
"""
try:
from tools.interrupt import is_interrupted
if is_interrupted():
return [
{"url": u, "error": "Interrupted", "title": ""} for u in urls
]
logger.info("Parallel extract: %d URL(s)", len(urls))
response = await _get_async_client().beta.extract(
urls=urls,
full_content=True,
)
results: List[Dict[str, Any]] = []
for result in response.results or []:
content = result.full_content or ""
if not content:
content = "\n\n".join(result.excerpts or [])
url = result.url or ""
title = result.title or ""
results.append(
{
"url": url,
"title": title,
"content": content,
"raw_content": content,
"metadata": {"sourceURL": url, "title": title},
}
)
for error in response.errors or []:
results.append(
{
"url": error.url or "",
"title": "",
"content": "",
"error": error.content or error.error_type or "extraction failed",
"metadata": {"sourceURL": error.url or ""},
}
)
return results
except ValueError as exc:
return [{"url": u, "title": "", "content": "", "error": str(exc)} for u in urls]
except ImportError as exc:
return [
{"url": u, "title": "", "content": "", "error": f"Parallel SDK not installed: {exc}"}
for u in urls
]
except Exception as exc: # noqa: BLE001
logger.warning("Parallel extract error: %s", exc)
return [
{"url": u, "title": "", "content": "", "error": f"Parallel extract failed: {exc}"}
for u in urls
]
def get_setup_schema(self) -> Dict[str, Any]:
return {
"name": "Parallel",
"badge": "paid",
"tag": "Objective-tuned search + parallel page extraction.",
"env_vars": [
{
"key": "PARALLEL_API_KEY",
"prompt": "Parallel API key",
"url": "https://parallel.ai",
},
],
}