feat(web): parallel plugin — first async-extract plugin

Migrates Parallel.ai from inline `_parallel_search()` / `_parallel_extract()`
in tools/web_tools.py to a bundled plugin at plugins/web/parallel/.

First plugin in the codebase to expose an async :meth:`extract`:

  - search() is sync — Parallel.beta.search
  - extract() is **async def** — AsyncParallel.beta.extract

The ABC's docstring on supports_extract() already permits sync-or-async;
this commit is the first to exercise the async path. The web_extract_tool
dispatcher (next commit) detects coroutines via
inspect.iscoroutinefunction and awaits accordingly.

Behavior preserved:
  - PARALLEL_API_KEY required (raises ValueError if missing → surfaced
    as {"success": False, "error": "..."} instead)
  - PARALLEL_SEARCH_MODE env var honored (agentic|fast|one-shot, default
    agentic), validated via _resolve_search_mode()
  - Limit capped at 20 server-side via min(limit, 20)
  - Per-URL failure mode preserved: response.errors[] each become a
    result dict with an "error" field rather than raising
  - Module-level _parallel_client / _async_parallel_client caches kept
    (mirrors legacy singleton pattern)

Adds "parallel" to _WEB_PLUGIN_SKIPLIST in hermes_cli/tools_config.py so
the picker doesn't double-list.

The legacy inline _parallel_search, _parallel_extract, _get_parallel_client,
_get_async_parallel_client in tools/web_tools.py are NOT deleted yet — the
dispatcher still calls them. They go away when the dispatcher cuts over.

E2E verified:
  - inspect.iscoroutinefunction(p.search) -> False
  - inspect.iscoroutinefunction(p.extract) -> True
  - extract() returns a coroutine (not a list)
  - 5 providers register correctly (brave-free, ddgs, exa, parallel, searxng)
This commit is contained in:
kshitijk4poor 2026-05-14 00:13:40 +05:30 committed by Teknium
parent ec8449e9c6
commit 4816646109
4 changed files with 289 additions and 1 deletions

View file

@ -0,0 +1,265 @@
"""Parallel.ai web search + content extraction — plugin form.
Subclasses :class:`agent.web_search_provider.WebSearchProvider`. Uses two
distinct Parallel SDK clients:
- ``Parallel`` (sync) for :meth:`search`
- ``AsyncParallel`` (async) for :meth:`extract`
This is the first plugin to exercise the **async-extract** code path in
the ABC: :meth:`extract` is declared ``async def``, and the dispatcher
in :func:`tools.web_tools.web_extract_tool` detects coroutines via
:func:`inspect.iscoroutinefunction` and awaits.
Config keys this provider responds to::
web:
search_backend: "parallel" # explicit per-capability
extract_backend: "parallel" # explicit per-capability
backend: "parallel" # shared fallback
# Optional: search mode (default "agentic"; also "fast" or "one-shot")
# via the PARALLEL_SEARCH_MODE env var.
Env vars::
PARALLEL_API_KEY=... # https://parallel.ai (required)
PARALLEL_SEARCH_MODE=agentic # optional: agentic|fast|one-shot
"""
from __future__ import annotations
import logging
import os
from typing import Any, Dict, List
from agent.web_search_provider import WebSearchProvider
logger = logging.getLogger(__name__)
# Module-level client caches mirroring the legacy `tools.web_tools._parallel_client`
# / `_async_parallel_client` pattern. Per-process singletons so we don't
# pay SDK construction cost per call.
_parallel_client: Any = None
_async_parallel_client: Any = None
def _ensure_parallel_sdk_installed() -> None:
"""Trigger lazy install of the parallel SDK if it isn't present.
Mirrors the lazy-deps pattern used by the legacy implementation.
Swallows benign ImportError from the lazy_deps helper itself; if the
SDK is genuinely missing the subsequent ``from parallel import ...``
raises ImportError that the caller can handle.
"""
try:
from tools.lazy_deps import ensure as _lazy_ensure
_lazy_ensure("search.parallel", prompt=False)
except ImportError:
pass
except Exception as exc: # noqa: BLE001 — surface install hint as ImportError
raise ImportError(str(exc))
def _get_sync_client() -> Any:
"""Lazy-load + cache the sync Parallel client."""
global _parallel_client
if _parallel_client is not None:
return _parallel_client
_ensure_parallel_sdk_installed()
from parallel import Parallel # noqa: WPS433 — deliberately lazy
api_key = os.getenv("PARALLEL_API_KEY")
if not api_key:
raise ValueError(
"PARALLEL_API_KEY environment variable not set. "
"Get your API key at https://parallel.ai"
)
_parallel_client = Parallel(api_key=api_key)
return _parallel_client
def _get_async_client() -> Any:
"""Lazy-load + cache the async Parallel client."""
global _async_parallel_client
if _async_parallel_client is not None:
return _async_parallel_client
_ensure_parallel_sdk_installed()
from parallel import AsyncParallel # noqa: WPS433 — deliberately lazy
api_key = os.getenv("PARALLEL_API_KEY")
if not api_key:
raise ValueError(
"PARALLEL_API_KEY environment variable not set. "
"Get your API key at https://parallel.ai"
)
_async_parallel_client = AsyncParallel(api_key=api_key)
return _async_parallel_client
def _reset_clients_for_tests() -> None:
"""Drop both cached clients so tests can re-instantiate cleanly."""
global _parallel_client, _async_parallel_client
_parallel_client = None
_async_parallel_client = None
def _resolve_search_mode() -> str:
"""Return the validated PARALLEL_SEARCH_MODE value (default "agentic")."""
mode = os.getenv("PARALLEL_SEARCH_MODE", "agentic").lower().strip()
if mode not in {"fast", "one-shot", "agentic"}:
mode = "agentic"
return mode
class ParallelWebSearchProvider(WebSearchProvider):
"""Parallel.ai search + async extract provider."""
@property
def name(self) -> str:
return "parallel"
@property
def display_name(self) -> str:
return "Parallel"
def is_available(self) -> bool:
"""Return True when ``PARALLEL_API_KEY`` is set to a non-empty value."""
return bool(os.getenv("PARALLEL_API_KEY", "").strip())
def supports_search(self) -> bool:
return True
def supports_extract(self) -> bool:
return True
def search(self, query: str, limit: int = 5) -> Dict[str, Any]:
"""Execute a Parallel search (sync).
Uses the ``beta.search`` endpoint with the configured mode
(``PARALLEL_SEARCH_MODE`` env var, default "agentic"). Limit is
capped at 20 server-side.
"""
try:
from tools.interrupt import is_interrupted
if is_interrupted():
return {"success": False, "error": "Interrupted"}
mode = _resolve_search_mode()
logger.info(
"Parallel search: '%s' (mode=%s, limit=%d)", query, mode, limit
)
response = _get_sync_client().beta.search(
search_queries=[query],
objective=query,
mode=mode,
max_results=min(limit, 20),
)
web_results = []
for i, result in enumerate(response.results or []):
excerpts = result.excerpts or []
web_results.append(
{
"url": result.url or "",
"title": result.title or "",
"description": " ".join(excerpts) if excerpts else "",
"position": i + 1,
}
)
return {"success": True, "data": {"web": web_results}}
except ValueError as exc:
return {"success": False, "error": str(exc)}
except ImportError as exc:
return {
"success": False,
"error": f"Parallel SDK not installed: {exc}",
}
except Exception as exc: # noqa: BLE001
logger.warning("Parallel search error: %s", exc)
return {"success": False, "error": f"Parallel search failed: {exc}"}
async def extract(
self, urls: List[str], **kwargs: Any
) -> List[Dict[str, Any]]:
"""Extract content from one or more URLs via the async SDK.
Returns the legacy list-of-results shape that
:func:`tools.web_tools.web_extract_tool` expects: one entry per
successful URL plus one entry per failed URL with an ``error``
field. Errors are not raised they're returned as per-URL items.
"""
try:
from tools.interrupt import is_interrupted
if is_interrupted():
return [
{"url": u, "error": "Interrupted", "title": ""} for u in urls
]
logger.info("Parallel extract: %d URL(s)", len(urls))
response = await _get_async_client().beta.extract(
urls=urls,
full_content=True,
)
results: List[Dict[str, Any]] = []
for result in response.results or []:
content = result.full_content or ""
if not content:
content = "\n\n".join(result.excerpts or [])
url = result.url or ""
title = result.title or ""
results.append(
{
"url": url,
"title": title,
"content": content,
"raw_content": content,
"metadata": {"sourceURL": url, "title": title},
}
)
for error in response.errors or []:
results.append(
{
"url": error.url or "",
"title": "",
"content": "",
"error": error.content or error.error_type or "extraction failed",
"metadata": {"sourceURL": error.url or ""},
}
)
return results
except ValueError as exc:
return [{"url": u, "title": "", "content": "", "error": str(exc)} for u in urls]
except ImportError as exc:
return [
{"url": u, "title": "", "content": "", "error": f"Parallel SDK not installed: {exc}"}
for u in urls
]
except Exception as exc: # noqa: BLE001
logger.warning("Parallel extract error: %s", exc)
return [
{"url": u, "title": "", "content": "", "error": f"Parallel extract failed: {exc}"}
for u in urls
]
def get_setup_schema(self) -> Dict[str, Any]:
return {
"name": "Parallel",
"badge": "paid",
"tag": "Objective-tuned search + parallel page extraction.",
"env_vars": [
{
"key": "PARALLEL_API_KEY",
"prompt": "Parallel API key",
"url": "https://parallel.ai",
},
],
}