mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-18 04:41:56 +00:00
Self-review of the plugin migration surfaced one warning and a handful of
doc/dead-code cleanups. None affect production behaviour through the main
dispatcher (which always calls `tools.web_tools._get_backend()` first and
preserves the full 7-provider walk), but direct callers of
`agent.web_search_registry.get_active_*_provider()` previously diverged
from the legacy order and could return `None` for users with credentials
but no explicit `web.backend` config key.
Changes
-------
1. `_LEGACY_PREFERENCE` was shipped as a 4-tuple
`("brave-free", "firecrawl", "searxng", "ddgs")` while the PR
description and the legacy `_get_backend()` candidate order both
call for the 7-tuple
`(firecrawl, parallel, tavily, exa, searxng, brave-free, ddgs)`.
Replaced with the 7-tuple. Verified empirically: with TAVILY+EXA keys
and no config, `get_active_search_provider()` now returns tavily
(was None); with EXA+PARALLEL it returns parallel (was None); with
BRAVE+FIRECRAWL it returns firecrawl (was brave-free).
2. `agent/web_search_registry.py` — module docstring, `_resolve` step-3
docstring, and inline comment all listed the old 4-tuple and claimed
"brave-free first because it was the shipped default". The legacy
default is `"firecrawl"`. Rewritten to match the new ordering and
reference `tools.web_tools._get_backend()` as the source of truth.
3. `agent/web_search_registry.py` — `get_active_crawl_provider`
docstring said "only Tavily implements it among built-in providers".
Firecrawl also advertises `supports_crawl=True` after the previous
commit. Updated to "Tavily and Firecrawl".
4. `plugins/web/tavily/provider.py` — module docstring said "Tavily is
the only built-in backend that natively crawls". Updated.
5. `agent/web_search_provider.py` — ABC docstring mentioned only
`search` / `extract` capabilities. Added `crawl` for accuracy.
6. `plugins/web/{firecrawl,parallel,exa}/provider.py` — dead plugin-level
cache globals (`_firecrawl_client`, `_parallel_client`,
`_async_parallel_client`, `_exa_client`) were declared but never read
(all reads/writes go through `_wt.*` per the `extracting-inline-
helpers-to-plugins` recipe). Removed the dead declarations; the
reset-for-tests helpers in firecrawl + parallel now clear the
canonical `_wt._<name>` slots, matching the pattern exa already used.
Tests
-----
218/218 web-targeted tests still pass (no test changes needed). 4910/4910
in `tests/tools/` still green.
291 lines
9.9 KiB
Python
291 lines
9.9 KiB
Python
"""Parallel.ai web search + content extraction — plugin form.
|
|
|
|
Subclasses :class:`agent.web_search_provider.WebSearchProvider`. Uses two
|
|
distinct Parallel SDK clients:
|
|
|
|
- ``Parallel`` (sync) — for :meth:`search`
|
|
- ``AsyncParallel`` (async) — for :meth:`extract`
|
|
|
|
This is the first plugin to exercise the **async-extract** code path in
|
|
the ABC: :meth:`extract` is declared ``async def``, and the dispatcher
|
|
in :func:`tools.web_tools.web_extract_tool` detects coroutines via
|
|
:func:`inspect.iscoroutinefunction` and awaits.
|
|
|
|
Config keys this provider responds to::
|
|
|
|
web:
|
|
search_backend: "parallel" # explicit per-capability
|
|
extract_backend: "parallel" # explicit per-capability
|
|
backend: "parallel" # shared fallback
|
|
# Optional: search mode (default "agentic"; also "fast" or "one-shot")
|
|
# via the PARALLEL_SEARCH_MODE env var.
|
|
|
|
Env vars::
|
|
|
|
PARALLEL_API_KEY=... # https://parallel.ai (required)
|
|
PARALLEL_SEARCH_MODE=agentic # optional: agentic|fast|one-shot
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import os
|
|
from typing import Any, Dict, List
|
|
|
|
from agent.web_search_provider import WebSearchProvider
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Module-level note: the canonical cache slots ``_parallel_client`` and
|
|
# ``_async_parallel_client`` live on :mod:`tools.web_tools` so tests that do
|
|
# ``tools.web_tools._parallel_client = None`` between cases see fresh state.
|
|
# The plugin reads/writes through that public module (see
|
|
# :func:`_get_sync_client` / :func:`_get_async_client`).
|
|
|
|
|
|
def _ensure_parallel_sdk_installed() -> None:
|
|
"""Trigger lazy install of the parallel SDK if it isn't present.
|
|
|
|
Mirrors the lazy-deps pattern used by the legacy implementation.
|
|
Swallows benign ImportError from the lazy_deps helper itself; if the
|
|
SDK is genuinely missing the subsequent ``from parallel import ...``
|
|
raises ImportError that the caller can handle.
|
|
"""
|
|
try:
|
|
from tools.lazy_deps import ensure as _lazy_ensure
|
|
|
|
_lazy_ensure("search.parallel", prompt=False)
|
|
except ImportError:
|
|
pass
|
|
except Exception as exc: # noqa: BLE001 — surface install hint as ImportError
|
|
raise ImportError(str(exc))
|
|
|
|
|
|
def _get_sync_client() -> Any:
|
|
"""Lazy-load + cache the sync Parallel client.
|
|
|
|
Cache lives on :mod:`tools.web_tools` (as ``_parallel_client``) so unit
|
|
tests that reset that name between cases keep working.
|
|
"""
|
|
import tools.web_tools as _wt
|
|
|
|
cached = getattr(_wt, "_parallel_client", None)
|
|
if cached is not None:
|
|
return cached
|
|
|
|
api_key = os.getenv("PARALLEL_API_KEY")
|
|
if not api_key:
|
|
raise ValueError(
|
|
"PARALLEL_API_KEY environment variable not set. "
|
|
"Get your API key at https://parallel.ai"
|
|
)
|
|
|
|
_ensure_parallel_sdk_installed()
|
|
from parallel import Parallel # noqa: WPS433 — deliberately lazy
|
|
|
|
client = Parallel(api_key=api_key)
|
|
_wt._parallel_client = client
|
|
return client
|
|
|
|
|
|
def _get_async_client() -> Any:
|
|
"""Lazy-load + cache the async Parallel client.
|
|
|
|
Cache lives on :mod:`tools.web_tools` (as ``_async_parallel_client``).
|
|
"""
|
|
import tools.web_tools as _wt
|
|
|
|
cached = getattr(_wt, "_async_parallel_client", None)
|
|
if cached is not None:
|
|
return cached
|
|
|
|
api_key = os.getenv("PARALLEL_API_KEY")
|
|
if not api_key:
|
|
raise ValueError(
|
|
"PARALLEL_API_KEY environment variable not set. "
|
|
"Get your API key at https://parallel.ai"
|
|
)
|
|
|
|
_ensure_parallel_sdk_installed()
|
|
from parallel import AsyncParallel # noqa: WPS433 — deliberately lazy
|
|
|
|
client = AsyncParallel(api_key=api_key)
|
|
_wt._async_parallel_client = client
|
|
return client
|
|
|
|
|
|
def _reset_clients_for_tests() -> None:
|
|
"""Drop both cached clients so tests can re-instantiate cleanly.
|
|
|
|
Clears the canonical slots on :mod:`tools.web_tools` (where
|
|
:func:`_get_sync_client` / :func:`_get_async_client` read/write them).
|
|
"""
|
|
import tools.web_tools as _wt
|
|
|
|
_wt._parallel_client = None
|
|
_wt._async_parallel_client = None
|
|
|
|
|
|
# Backward-compatible aliases for the names that lived in tools.web_tools
|
|
# before the migration (matches existing tests + external callers).
|
|
_get_parallel_client = _get_sync_client
|
|
_get_async_parallel_client = _get_async_client
|
|
|
|
|
|
def _resolve_search_mode() -> str:
|
|
"""Return the validated PARALLEL_SEARCH_MODE value (default "agentic")."""
|
|
mode = os.getenv("PARALLEL_SEARCH_MODE", "agentic").lower().strip()
|
|
if mode not in {"fast", "one-shot", "agentic"}:
|
|
mode = "agentic"
|
|
return mode
|
|
|
|
|
|
class ParallelWebSearchProvider(WebSearchProvider):
|
|
"""Parallel.ai search + async extract provider."""
|
|
|
|
@property
|
|
def name(self) -> str:
|
|
return "parallel"
|
|
|
|
@property
|
|
def display_name(self) -> str:
|
|
return "Parallel"
|
|
|
|
def is_available(self) -> bool:
|
|
"""Return True when ``PARALLEL_API_KEY`` is set to a non-empty value."""
|
|
return bool(os.getenv("PARALLEL_API_KEY", "").strip())
|
|
|
|
def supports_search(self) -> bool:
|
|
return True
|
|
|
|
def supports_extract(self) -> bool:
|
|
return True
|
|
|
|
def search(self, query: str, limit: int = 5) -> Dict[str, Any]:
|
|
"""Execute a Parallel search (sync).
|
|
|
|
Uses the ``beta.search`` endpoint with the configured mode
|
|
(``PARALLEL_SEARCH_MODE`` env var, default "agentic"). Limit is
|
|
capped at 20 server-side.
|
|
"""
|
|
try:
|
|
from tools.interrupt import is_interrupted
|
|
|
|
if is_interrupted():
|
|
return {"success": False, "error": "Interrupted"}
|
|
|
|
mode = _resolve_search_mode()
|
|
logger.info(
|
|
"Parallel search: '%s' (mode=%s, limit=%d)", query, mode, limit
|
|
)
|
|
response = _get_sync_client().beta.search(
|
|
search_queries=[query],
|
|
objective=query,
|
|
mode=mode,
|
|
max_results=min(limit, 20),
|
|
)
|
|
|
|
web_results = []
|
|
for i, result in enumerate(response.results or []):
|
|
excerpts = result.excerpts or []
|
|
web_results.append(
|
|
{
|
|
"url": result.url or "",
|
|
"title": result.title or "",
|
|
"description": " ".join(excerpts) if excerpts else "",
|
|
"position": i + 1,
|
|
}
|
|
)
|
|
|
|
return {"success": True, "data": {"web": web_results}}
|
|
except ValueError as exc:
|
|
return {"success": False, "error": str(exc)}
|
|
except ImportError as exc:
|
|
return {
|
|
"success": False,
|
|
"error": f"Parallel SDK not installed: {exc}",
|
|
}
|
|
except Exception as exc: # noqa: BLE001
|
|
logger.warning("Parallel search error: %s", exc)
|
|
return {"success": False, "error": f"Parallel search failed: {exc}"}
|
|
|
|
async def extract(
|
|
self, urls: List[str], **kwargs: Any
|
|
) -> List[Dict[str, Any]]:
|
|
"""Extract content from one or more URLs via the async SDK.
|
|
|
|
Returns the legacy list-of-results shape that
|
|
:func:`tools.web_tools.web_extract_tool` expects: one entry per
|
|
successful URL plus one entry per failed URL with an ``error``
|
|
field. Errors are not raised — they're returned as per-URL items.
|
|
"""
|
|
try:
|
|
from tools.interrupt import is_interrupted
|
|
|
|
if is_interrupted():
|
|
return [
|
|
{"url": u, "error": "Interrupted", "title": ""} for u in urls
|
|
]
|
|
|
|
logger.info("Parallel extract: %d URL(s)", len(urls))
|
|
response = await _get_async_client().beta.extract(
|
|
urls=urls,
|
|
full_content=True,
|
|
)
|
|
|
|
results: List[Dict[str, Any]] = []
|
|
for result in response.results or []:
|
|
content = result.full_content or ""
|
|
if not content:
|
|
content = "\n\n".join(result.excerpts or [])
|
|
url = result.url or ""
|
|
title = result.title or ""
|
|
results.append(
|
|
{
|
|
"url": url,
|
|
"title": title,
|
|
"content": content,
|
|
"raw_content": content,
|
|
"metadata": {"sourceURL": url, "title": title},
|
|
}
|
|
)
|
|
|
|
for error in response.errors or []:
|
|
results.append(
|
|
{
|
|
"url": error.url or "",
|
|
"title": "",
|
|
"content": "",
|
|
"error": error.content or error.error_type or "extraction failed",
|
|
"metadata": {"sourceURL": error.url or ""},
|
|
}
|
|
)
|
|
|
|
return results
|
|
except ValueError as exc:
|
|
return [{"url": u, "title": "", "content": "", "error": str(exc)} for u in urls]
|
|
except ImportError as exc:
|
|
return [
|
|
{"url": u, "title": "", "content": "", "error": f"Parallel SDK not installed: {exc}"}
|
|
for u in urls
|
|
]
|
|
except Exception as exc: # noqa: BLE001
|
|
logger.warning("Parallel extract error: %s", exc)
|
|
return [
|
|
{"url": u, "title": "", "content": "", "error": f"Parallel extract failed: {exc}"}
|
|
for u in urls
|
|
]
|
|
|
|
def get_setup_schema(self) -> Dict[str, Any]:
|
|
return {
|
|
"name": "Parallel",
|
|
"badge": "paid",
|
|
"tag": "Objective-tuned search + parallel page extraction.",
|
|
"env_vars": [
|
|
{
|
|
"key": "PARALLEL_API_KEY",
|
|
"prompt": "Parallel API key",
|
|
"url": "https://parallel.ai",
|
|
},
|
|
],
|
|
}
|