Merge pull request #52118 from NousResearch/salvage/36776-ddgs-timeout

fix(ddgs): bound DuckDuckGo search with a wall-clock timeout (#36776)
This commit is contained in:
kshitij 2026-06-25 01:56:26 +05:30 committed by GitHub
commit 77d2b50751
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 130 additions and 17 deletions

View file

@ -12,6 +12,7 @@ whether the package is importable; the plugin still registers either way so
from __future__ import annotations
import concurrent.futures as _cf
import logging
from typing import Any, Dict
@ -19,6 +20,40 @@ from agent.web_search_provider import WebSearchProvider
logger = logging.getLogger(__name__)
# Overall wall-clock cap for a single ddgs search. The DDGS constructor's
# ``timeout`` only bounds individual HTTP requests; ddgs's multi-engine retry
# loop has no overall cap, so a slow/rate-limited DuckDuckGo response can hang
# the (single, shared) agent loop indefinitely and block every platform
# (#36776). Enforce a hard cap here via a worker thread.
_SEARCH_TIMEOUT_SECS = 30
def _run_ddgs_search(query: str, safe_limit: int) -> list[dict[str, Any]]:
"""Run the blocking ddgs query and return normalized hits.
Module-level (not a closure) so tests can patch it directly without
spawning a real multi-second worker thread. ``DDGS(timeout=...)`` bounds
each individual HTTP request; the overall wall-clock cap is enforced by
the caller via a future timeout.
"""
from ddgs import DDGS # type: ignore
results: list[dict[str, Any]] = []
with DDGS(timeout=10) as client:
for i, hit in enumerate(client.text(query, max_results=safe_limit)):
if i >= safe_limit:
break
url = str(hit.get("href") or hit.get("url") or "")
results.append(
{
"title": str(hit.get("title", "")),
"url": url,
"description": str(hit.get("body", "")),
"position": i + 1,
}
)
return results
class DDGSWebSearchProvider(WebSearchProvider):
"""DuckDuckGo HTML-scrape search provider.
@ -57,9 +92,14 @@ class DDGSWebSearchProvider(WebSearchProvider):
return False
def search(self, query: str, limit: int = 5) -> Dict[str, Any]:
"""Execute a DuckDuckGo search and return normalized results."""
"""Execute a DuckDuckGo search and return normalized results.
The synchronous ``ddgs`` call is run in a worker thread with a hard
wall-clock timeout (``_SEARCH_TIMEOUT_SECS``) so a hung search cannot
block the shared agent loop indefinitely (#36776).
"""
try:
from ddgs import DDGS # type: ignore
import ddgs # type: ignore # noqa: F401 — availability probe
except ImportError:
return {
"success": False,
@ -70,24 +110,38 @@ class DDGSWebSearchProvider(WebSearchProvider):
# in case the package ignores the hint.
safe_limit = max(1, int(limit))
# A fresh single-worker pool per call (rather than a module-level one)
# is intentional: on timeout the blocking ddgs call cannot be cancelled
# and keeps running, so a shared pool would serialise every later search
# behind that hung worker. A per-call pool isolates each search from a
# previously-hung one.
pool = _cf.ThreadPoolExecutor(max_workers=1)
try:
web_results = []
with DDGS() as client:
for i, hit in enumerate(client.text(query, max_results=safe_limit)):
if i >= safe_limit:
break
url = str(hit.get("href") or hit.get("url") or "")
web_results.append(
{
"title": str(hit.get("title", "")),
"url": url,
"description": str(hit.get("body", "")),
"position": i + 1,
}
)
future = pool.submit(_run_ddgs_search, query, safe_limit)
try:
web_results = future.result(timeout=_SEARCH_TIMEOUT_SECS)
except _cf.TimeoutError:
logger.warning(
"DDGS search timed out after %ds for query: %r",
_SEARCH_TIMEOUT_SECS, query,
)
return {
"success": False,
"error": (
f"DuckDuckGo search timed out after {_SEARCH_TIMEOUT_SECS}s — "
"DuckDuckGo may be rate-limiting or slow. Try again later "
"or switch to a different search provider."
),
}
except Exception as exc: # noqa: BLE001 — ddgs raises its own exceptions
logger.warning("DDGS search error: %s", exc)
return {"success": False, "error": f"DuckDuckGo search failed: {exc}"}
finally:
# Return immediately without joining the worker. On timeout the
# already-running ddgs call can't be cancelled (cancel_futures only
# affects not-yet-started work), so the worker runs to completion
# on its own; it writes nothing shared, so leaking it is safe.
pool.shutdown(wait=False, cancel_futures=True)
logger.info("DDGS search '%s': %d results (limit %d)", query, len(web_results), limit)
return {"success": True, "data": {"web": web_results}}

View file

@ -18,20 +18,30 @@ import pytest
from tests.tools.conftest import register_all_web_providers
def _install_fake_ddgs(monkeypatch, *, text_results=None, text_raises=None):
def _install_fake_ddgs(monkeypatch, *, text_results=None, text_raises=None, text_sleep=None):
"""Install a stub ``ddgs`` module in sys.modules for the duration of a test.
``text_results``: iterable of dicts to yield from DDGS().text(...).
``text_raises``: if set, DDGS().text raises this exception instead.
``text_sleep``: if set, DDGS().text blocks for this many seconds before
yielding simulates a hung/slow search for the timeout test.
"""
import time as _time
fake = types.ModuleType("ddgs")
class _FakeDDGS:
def __init__(self, **kwargs):
# Accept timeout= (and any other constructor kwargs) — the provider
# now passes DDGS(timeout=10).
pass
def __enter__(self):
return self
def __exit__(self, *_a):
return False
def text(self, query, max_results=5):
if text_sleep is not None:
_time.sleep(text_sleep)
if text_raises is not None:
raise text_raises
for hit in (text_results or []):
@ -155,6 +165,55 @@ class TestDDGSProviderSearch:
assert result["success"] is True
assert result["data"]["web"] == []
def test_hung_search_times_out_and_returns_failure(self, monkeypatch):
"""#36776: a ddgs call that never returns must be bounded by the
wall-clock timeout and surface a failure instead of hanging the
shared agent loop. We patch the blocking helper to wait on an Event
(released in finally so no worker thread leaks past the test) and
shrink the timeout; search() must return success=False promptly."""
import threading
import time
# ddgs must import-probe True for search() to proceed.
_install_fake_ddgs(monkeypatch)
monkeypatch.delitem(sys.modules, "plugins.web.ddgs.provider", raising=False)
import plugins.web.ddgs.provider as _prov
release = threading.Event()
def _blocking_search(query, safe_limit):
release.wait(timeout=10) # bounded so the worker can never truly leak
return []
monkeypatch.setattr(_prov, "_run_ddgs_search", _blocking_search, raising=True)
monkeypatch.setattr(_prov, "_SEARCH_TIMEOUT_SECS", 0.3, raising=True)
try:
start = time.monotonic()
result = _prov.DDGSWebSearchProvider().search("hangs forever", limit=5)
elapsed = time.monotonic() - start
assert result["success"] is False
assert "timed out" in result["error"].lower()
# Returned well before the worker's 10s wait — proves the cap fired.
assert elapsed < 3.0, f"search did not return promptly ({elapsed:.1f}s)"
finally:
release.set() # let the orphaned worker finish immediately
def test_fast_search_not_affected_by_timeout_wrapper(self, monkeypatch):
"""Happy-path guard: the timeout wrapper must not break a normal,
fast search results flow through unchanged."""
_install_fake_ddgs(
monkeypatch,
text_results=[{"title": "T", "href": "https://e.com", "body": "B"}],
)
from plugins.web.ddgs.provider import DDGSWebSearchProvider
result = DDGSWebSearchProvider().search("q", limit=5)
assert result["success"] is True
assert result["data"]["web"][0]["url"] == "https://e.com"
assert result["data"]["web"][0]["title"] == "T"
# ---------------------------------------------------------------------------
# Integration: _is_backend_available / _get_backend / check_web_api_key