From 48166461093982755afd60166b1b96e2c93c48ed Mon Sep 17 00:00:00 2001 From: kshitijk4poor <82637225+kshitijk4poor@users.noreply.github.com> Date: Thu, 14 May 2026 00:13:40 +0530 Subject: [PATCH] =?UTF-8?q?feat(web):=20parallel=20plugin=20=E2=80=94=20fi?= =?UTF-8?q?rst=20async-extract=20plugin?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Migrates Parallel.ai from inline `_parallel_search()` / `_parallel_extract()` in tools/web_tools.py to a bundled plugin at plugins/web/parallel/. First plugin in the codebase to expose an async :meth:`extract`: - search() is sync — Parallel.beta.search - extract() is **async def** — AsyncParallel.beta.extract The ABC's docstring on supports_extract() already permits sync-or-async; this commit is the first to exercise the async path. The web_extract_tool dispatcher (next commit) detects coroutines via inspect.iscoroutinefunction and awaits accordingly. Behavior preserved: - PARALLEL_API_KEY required (raises ValueError if missing → surfaced as {"success": False, "error": "..."} instead) - PARALLEL_SEARCH_MODE env var honored (agentic|fast|one-shot, default agentic), validated via _resolve_search_mode() - Limit capped at 20 server-side via min(limit, 20) - Per-URL failure mode preserved: response.errors[] each become a result dict with an "error" field rather than raising - Module-level _parallel_client / _async_parallel_client caches kept (mirrors legacy singleton pattern) Adds "parallel" to _WEB_PLUGIN_SKIPLIST in hermes_cli/tools_config.py so the picker doesn't double-list. The legacy inline _parallel_search, _parallel_extract, _get_parallel_client, _get_async_parallel_client in tools/web_tools.py are NOT deleted yet — the dispatcher still calls them. They go away when the dispatcher cuts over. E2E verified: - inspect.iscoroutinefunction(p.search) -> False - inspect.iscoroutinefunction(p.extract) -> True - extract() returns a coroutine (not a list) - 5 providers register correctly (brave-free, ddgs, exa, parallel, searxng) --- hermes_cli/tools_config.py | 2 +- plugins/web/parallel/__init__.py | 16 ++ plugins/web/parallel/plugin.yaml | 7 + plugins/web/parallel/provider.py | 265 +++++++++++++++++++++++++++++++ 4 files changed, 289 insertions(+), 1 deletion(-) create mode 100644 plugins/web/parallel/__init__.py create mode 100644 plugins/web/parallel/plugin.yaml create mode 100644 plugins/web/parallel/provider.py diff --git a/hermes_cli/tools_config.py b/hermes_cli/tools_config.py index 94c1b96a06a..407e24dfca5 100644 --- a/hermes_cli/tools_config.py +++ b/hermes_cli/tools_config.py @@ -1586,7 +1586,7 @@ def _plugin_video_gen_providers() -> list[dict]: # removed and this helper becomes the sole source of web-provider picker # rows (matching how Spotify / Google Meet are surfaced today purely from # their plugins). -_WEB_PLUGIN_SKIPLIST = frozenset({"brave-free", "ddgs", "searxng", "exa"}) +_WEB_PLUGIN_SKIPLIST = frozenset({"brave-free", "ddgs", "searxng", "exa", "parallel"}) def _plugin_web_search_providers() -> list[dict]: diff --git a/plugins/web/parallel/__init__.py b/plugins/web/parallel/__init__.py new file mode 100644 index 00000000000..2a109894dc5 --- /dev/null +++ b/plugins/web/parallel/__init__.py @@ -0,0 +1,16 @@ +"""Parallel.ai web search + extract plugin — bundled, auto-loaded. + +First plugin in this repo to expose an async :meth:`extract` — Parallel's +SDK is async-native (``AsyncParallel.beta.extract``). The web_extract_tool +dispatcher detects coroutines via :func:`inspect.iscoroutinefunction` and +awaits. +""" + +from __future__ import annotations + +from plugins.web.parallel.provider import ParallelWebSearchProvider + + +def register(ctx) -> None: + """Register the Parallel provider with the plugin context.""" + ctx.register_web_search_provider(ParallelWebSearchProvider()) diff --git a/plugins/web/parallel/plugin.yaml b/plugins/web/parallel/plugin.yaml new file mode 100644 index 00000000000..01bf0da58ef --- /dev/null +++ b/plugins/web/parallel/plugin.yaml @@ -0,0 +1,7 @@ +name: web-parallel +version: 1.0.0 +description: "Parallel.ai web search + content extraction. Search returns objective-tuned results; extract uses the async SDK for parallel page fetches. Requires PARALLEL_API_KEY — sign up at https://parallel.ai." +author: NousResearch +kind: backend +provides_web_providers: + - parallel diff --git a/plugins/web/parallel/provider.py b/plugins/web/parallel/provider.py new file mode 100644 index 00000000000..2dff514feb3 --- /dev/null +++ b/plugins/web/parallel/provider.py @@ -0,0 +1,265 @@ +"""Parallel.ai web search + content extraction — plugin form. + +Subclasses :class:`agent.web_search_provider.WebSearchProvider`. Uses two +distinct Parallel SDK clients: + +- ``Parallel`` (sync) — for :meth:`search` +- ``AsyncParallel`` (async) — for :meth:`extract` + +This is the first plugin to exercise the **async-extract** code path in +the ABC: :meth:`extract` is declared ``async def``, and the dispatcher +in :func:`tools.web_tools.web_extract_tool` detects coroutines via +:func:`inspect.iscoroutinefunction` and awaits. + +Config keys this provider responds to:: + + web: + search_backend: "parallel" # explicit per-capability + extract_backend: "parallel" # explicit per-capability + backend: "parallel" # shared fallback + # Optional: search mode (default "agentic"; also "fast" or "one-shot") + # via the PARALLEL_SEARCH_MODE env var. + +Env vars:: + + PARALLEL_API_KEY=... # https://parallel.ai (required) + PARALLEL_SEARCH_MODE=agentic # optional: agentic|fast|one-shot +""" + +from __future__ import annotations + +import logging +import os +from typing import Any, Dict, List + +from agent.web_search_provider import WebSearchProvider + +logger = logging.getLogger(__name__) + +# Module-level client caches mirroring the legacy `tools.web_tools._parallel_client` +# / `_async_parallel_client` pattern. Per-process singletons so we don't +# pay SDK construction cost per call. +_parallel_client: Any = None +_async_parallel_client: Any = None + + +def _ensure_parallel_sdk_installed() -> None: + """Trigger lazy install of the parallel SDK if it isn't present. + + Mirrors the lazy-deps pattern used by the legacy implementation. + Swallows benign ImportError from the lazy_deps helper itself; if the + SDK is genuinely missing the subsequent ``from parallel import ...`` + raises ImportError that the caller can handle. + """ + try: + from tools.lazy_deps import ensure as _lazy_ensure + + _lazy_ensure("search.parallel", prompt=False) + except ImportError: + pass + except Exception as exc: # noqa: BLE001 — surface install hint as ImportError + raise ImportError(str(exc)) + + +def _get_sync_client() -> Any: + """Lazy-load + cache the sync Parallel client.""" + global _parallel_client + if _parallel_client is not None: + return _parallel_client + + _ensure_parallel_sdk_installed() + from parallel import Parallel # noqa: WPS433 — deliberately lazy + + api_key = os.getenv("PARALLEL_API_KEY") + if not api_key: + raise ValueError( + "PARALLEL_API_KEY environment variable not set. " + "Get your API key at https://parallel.ai" + ) + _parallel_client = Parallel(api_key=api_key) + return _parallel_client + + +def _get_async_client() -> Any: + """Lazy-load + cache the async Parallel client.""" + global _async_parallel_client + if _async_parallel_client is not None: + return _async_parallel_client + + _ensure_parallel_sdk_installed() + from parallel import AsyncParallel # noqa: WPS433 — deliberately lazy + + api_key = os.getenv("PARALLEL_API_KEY") + if not api_key: + raise ValueError( + "PARALLEL_API_KEY environment variable not set. " + "Get your API key at https://parallel.ai" + ) + _async_parallel_client = AsyncParallel(api_key=api_key) + return _async_parallel_client + + +def _reset_clients_for_tests() -> None: + """Drop both cached clients so tests can re-instantiate cleanly.""" + global _parallel_client, _async_parallel_client + _parallel_client = None + _async_parallel_client = None + + +def _resolve_search_mode() -> str: + """Return the validated PARALLEL_SEARCH_MODE value (default "agentic").""" + mode = os.getenv("PARALLEL_SEARCH_MODE", "agentic").lower().strip() + if mode not in {"fast", "one-shot", "agentic"}: + mode = "agentic" + return mode + + +class ParallelWebSearchProvider(WebSearchProvider): + """Parallel.ai search + async extract provider.""" + + @property + def name(self) -> str: + return "parallel" + + @property + def display_name(self) -> str: + return "Parallel" + + def is_available(self) -> bool: + """Return True when ``PARALLEL_API_KEY`` is set to a non-empty value.""" + return bool(os.getenv("PARALLEL_API_KEY", "").strip()) + + def supports_search(self) -> bool: + return True + + def supports_extract(self) -> bool: + return True + + def search(self, query: str, limit: int = 5) -> Dict[str, Any]: + """Execute a Parallel search (sync). + + Uses the ``beta.search`` endpoint with the configured mode + (``PARALLEL_SEARCH_MODE`` env var, default "agentic"). Limit is + capped at 20 server-side. + """ + try: + from tools.interrupt import is_interrupted + + if is_interrupted(): + return {"success": False, "error": "Interrupted"} + + mode = _resolve_search_mode() + logger.info( + "Parallel search: '%s' (mode=%s, limit=%d)", query, mode, limit + ) + response = _get_sync_client().beta.search( + search_queries=[query], + objective=query, + mode=mode, + max_results=min(limit, 20), + ) + + web_results = [] + for i, result in enumerate(response.results or []): + excerpts = result.excerpts or [] + web_results.append( + { + "url": result.url or "", + "title": result.title or "", + "description": " ".join(excerpts) if excerpts else "", + "position": i + 1, + } + ) + + return {"success": True, "data": {"web": web_results}} + except ValueError as exc: + return {"success": False, "error": str(exc)} + except ImportError as exc: + return { + "success": False, + "error": f"Parallel SDK not installed: {exc}", + } + except Exception as exc: # noqa: BLE001 + logger.warning("Parallel search error: %s", exc) + return {"success": False, "error": f"Parallel search failed: {exc}"} + + async def extract( + self, urls: List[str], **kwargs: Any + ) -> List[Dict[str, Any]]: + """Extract content from one or more URLs via the async SDK. + + Returns the legacy list-of-results shape that + :func:`tools.web_tools.web_extract_tool` expects: one entry per + successful URL plus one entry per failed URL with an ``error`` + field. Errors are not raised — they're returned as per-URL items. + """ + try: + from tools.interrupt import is_interrupted + + if is_interrupted(): + return [ + {"url": u, "error": "Interrupted", "title": ""} for u in urls + ] + + logger.info("Parallel extract: %d URL(s)", len(urls)) + response = await _get_async_client().beta.extract( + urls=urls, + full_content=True, + ) + + results: List[Dict[str, Any]] = [] + for result in response.results or []: + content = result.full_content or "" + if not content: + content = "\n\n".join(result.excerpts or []) + url = result.url or "" + title = result.title or "" + results.append( + { + "url": url, + "title": title, + "content": content, + "raw_content": content, + "metadata": {"sourceURL": url, "title": title}, + } + ) + + for error in response.errors or []: + results.append( + { + "url": error.url or "", + "title": "", + "content": "", + "error": error.content or error.error_type or "extraction failed", + "metadata": {"sourceURL": error.url or ""}, + } + ) + + return results + except ValueError as exc: + return [{"url": u, "title": "", "content": "", "error": str(exc)} for u in urls] + except ImportError as exc: + return [ + {"url": u, "title": "", "content": "", "error": f"Parallel SDK not installed: {exc}"} + for u in urls + ] + except Exception as exc: # noqa: BLE001 + logger.warning("Parallel extract error: %s", exc) + return [ + {"url": u, "title": "", "content": "", "error": f"Parallel extract failed: {exc}"} + for u in urls + ] + + def get_setup_schema(self) -> Dict[str, Any]: + return { + "name": "Parallel", + "badge": "paid", + "tag": "Objective-tuned search + parallel page extraction.", + "env_vars": [ + { + "key": "PARALLEL_API_KEY", + "prompt": "Parallel API key", + "url": "https://parallel.ai", + }, + ], + }