mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-09 08:21:50 +00:00
fix(tools): percent-encode non-ascii URL components
This commit is contained in:
parent
cb3e41e2fd
commit
333f01bc7f
5 changed files with 161 additions and 6 deletions
|
|
@ -42,6 +42,25 @@ class TestBrowserSecretExfil:
|
|||
# Should NOT be blocked by secret detection
|
||||
assert "API key or token" not in parsed.get("error", "")
|
||||
|
||||
def test_normalizes_non_ascii_url_before_navigation(self):
|
||||
from tools.browser_tool import browser_navigate
|
||||
|
||||
captured = {}
|
||||
|
||||
def mock_run(_session_key, command, args, **_kwargs):
|
||||
if command == "open":
|
||||
captured["url"] = args[0]
|
||||
return {"success": True, "data": {"title": "ok", "url": args[0]}}
|
||||
|
||||
with patch("tools.browser_tool._run_browser_command", side_effect=mock_run), \
|
||||
patch("tools.browser_tool._get_session_info", return_value={"_first_nav": False}), \
|
||||
patch("tools.browser_tool._is_local_backend", return_value=True):
|
||||
result = browser_navigate("https://wttr.in/Köln")
|
||||
|
||||
parsed = json.loads(result)
|
||||
assert parsed["success"] is True
|
||||
assert captured["url"] == "https://wttr.in/K%C3%B6ln"
|
||||
|
||||
|
||||
class TestWebExtractSecretExfil:
|
||||
"""Verify web_extract_tool blocks URLs containing secrets."""
|
||||
|
|
@ -65,6 +84,56 @@ class TestWebExtractSecretExfil:
|
|||
# Should fail for API/config reason, not secret blocking
|
||||
assert "API key" not in parsed.get("error", "") or "Blocked" not in parsed.get("error", "")
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_normalizes_non_ascii_url_before_extract_provider(self, monkeypatch):
|
||||
from agent.web_search_provider import WebSearchProvider
|
||||
from agent import web_search_registry
|
||||
from tools import web_tools
|
||||
|
||||
class FakeExtractProvider(WebSearchProvider):
|
||||
@property
|
||||
def name(self) -> str:
|
||||
return "fake-extract"
|
||||
|
||||
def is_available(self) -> bool:
|
||||
return True
|
||||
|
||||
def supports_search(self) -> bool:
|
||||
return False
|
||||
|
||||
def supports_extract(self) -> bool:
|
||||
return True
|
||||
|
||||
def extract(self, urls, **_kwargs):
|
||||
return [
|
||||
{
|
||||
"url": urls[0],
|
||||
"title": "ok",
|
||||
"content": "ok",
|
||||
"raw_content": "ok",
|
||||
}
|
||||
]
|
||||
|
||||
async def allow_url(_url: str) -> bool:
|
||||
return True
|
||||
|
||||
web_search_registry._reset_for_tests()
|
||||
web_search_registry.register_provider(FakeExtractProvider())
|
||||
monkeypatch.setattr(web_tools, "_ensure_web_plugins_loaded", lambda: None)
|
||||
monkeypatch.setattr(web_tools, "_get_extract_backend", lambda: "fake-extract")
|
||||
monkeypatch.setattr(web_tools, "async_is_safe_url", allow_url)
|
||||
|
||||
try:
|
||||
result = await web_tools.web_extract_tool(
|
||||
urls=["https://wttr.in/Köln"],
|
||||
use_llm_processing=False,
|
||||
)
|
||||
finally:
|
||||
web_search_registry._reset_for_tests()
|
||||
|
||||
parsed = json.loads(result)
|
||||
assert parsed["results"][0]["url"] == "https://wttr.in/K%C3%B6ln"
|
||||
|
||||
|
||||
class TestBrowserSnapshotRedaction:
|
||||
"""Verify secrets in page snapshots are redacted before auxiliary LLM calls."""
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@ from tools.url_safety import (
|
|||
is_safe_url,
|
||||
async_is_safe_url,
|
||||
is_always_blocked_url,
|
||||
normalize_url_for_request,
|
||||
_is_blocked_ip,
|
||||
_global_allow_private_urls,
|
||||
_reset_allow_private_cache,
|
||||
|
|
@ -16,6 +17,32 @@ import ipaddress
|
|||
import pytest
|
||||
|
||||
|
||||
class TestNormalizeUrlForRequest:
|
||||
def test_percent_encodes_non_ascii_path(self):
|
||||
assert (
|
||||
normalize_url_for_request("https://wttr.in/Köln")
|
||||
== "https://wttr.in/K%C3%B6ln"
|
||||
)
|
||||
|
||||
def test_preserves_existing_percent_escapes(self):
|
||||
assert (
|
||||
normalize_url_for_request("https://wttr.in/K%C3%B6ln")
|
||||
== "https://wttr.in/K%C3%B6ln"
|
||||
)
|
||||
|
||||
def test_preserves_reserved_query_syntax(self):
|
||||
assert (
|
||||
normalize_url_for_request("https://example.com/search?q=Köln&lang=de")
|
||||
== "https://example.com/search?q=K%C3%B6ln&lang=de"
|
||||
)
|
||||
|
||||
def test_idna_encodes_hostname(self):
|
||||
assert (
|
||||
normalize_url_for_request("https://münich.example/Köln")
|
||||
== "https://xn--mnich-kva.example/K%C3%B6ln"
|
||||
)
|
||||
|
||||
|
||||
class TestIsSafeUrl:
|
||||
def test_public_url_allowed(self):
|
||||
with patch("socket.getaddrinfo", return_value=[
|
||||
|
|
|
|||
|
|
@ -78,10 +78,12 @@ try:
|
|||
from tools.url_safety import (
|
||||
is_safe_url as _is_safe_url,
|
||||
is_always_blocked_url as _is_always_blocked_url,
|
||||
normalize_url_for_request as _normalize_url_for_request,
|
||||
)
|
||||
except Exception:
|
||||
_is_safe_url = lambda url: False # noqa: E731 — fail-closed: block all if safety module unavailable
|
||||
_is_always_blocked_url = lambda url: True # noqa: E731 — fail-closed on the floor too
|
||||
_normalize_url_for_request = lambda url: url # noqa: E731 — best-effort fallback
|
||||
# Browser-provider ABC + registry — PR #25214 moved the per-vendor providers
|
||||
# (Browserbase / Browser Use / Firecrawl) out of ``tools/browser_providers/``
|
||||
# and into ``plugins/browser/<vendor>/``. The dispatcher consults the
|
||||
|
|
@ -2310,6 +2312,14 @@ def browser_navigate(url: str, task_id: Optional[str] = None) -> str:
|
|||
"error": "Blocked: URL contains what appears to be an API key or token. "
|
||||
"Secrets must not be sent in URLs.",
|
||||
})
|
||||
url = _normalize_url_for_request(url)
|
||||
normalized_decoded = urllib.parse.unquote(url)
|
||||
if _PREFIX_RE.search(url) or _PREFIX_RE.search(normalized_decoded):
|
||||
return json.dumps({
|
||||
"success": False,
|
||||
"error": "Blocked: URL contains what appears to be an API key or token. "
|
||||
"Secrets must not be sent in URLs.",
|
||||
})
|
||||
|
||||
# SSRF protection — block private/internal addresses before navigating.
|
||||
# Skipped for local backends (Camofox, headless Chromium without a cloud
|
||||
|
|
|
|||
|
|
@ -28,12 +28,53 @@ import logging
|
|||
import os
|
||||
import socket
|
||||
import asyncio
|
||||
from urllib.parse import urlparse
|
||||
from urllib.parse import quote, urlparse, urlsplit, urlunsplit
|
||||
|
||||
from utils import is_truthy_value
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def normalize_url_for_request(url: str) -> str:
|
||||
"""Return an ASCII-safe HTTP URL for Hermes-owned URL tools.
|
||||
|
||||
Browsers and HTTP clients expect URIs, but users and models often provide
|
||||
IRIs such as ``https://wttr.in/Köln``. Preserve URL syntax and existing
|
||||
percent escapes while encoding non-ASCII host/path/query/fragment text.
|
||||
This is intentionally for URL tool inputs only; arbitrary shell commands
|
||||
must not be rewritten.
|
||||
"""
|
||||
if not isinstance(url, str):
|
||||
return url
|
||||
|
||||
raw = url.strip()
|
||||
if not raw:
|
||||
return raw
|
||||
|
||||
try:
|
||||
parsed = urlsplit(raw)
|
||||
except ValueError:
|
||||
return raw
|
||||
|
||||
if parsed.scheme.lower() not in {"http", "https"}:
|
||||
return raw
|
||||
|
||||
netloc = parsed.netloc
|
||||
hostname = parsed.hostname
|
||||
if hostname:
|
||||
try:
|
||||
ascii_host = hostname.encode("idna").decode("ascii")
|
||||
except UnicodeError:
|
||||
ascii_host = hostname
|
||||
if ascii_host != hostname:
|
||||
netloc = netloc.replace(hostname, ascii_host, 1)
|
||||
|
||||
path = quote(parsed.path, safe="/%:@!$&'()*+,;=")
|
||||
query = quote(parsed.query, safe="/%:@!$&'()*+,;=?")
|
||||
fragment = quote(parsed.fragment, safe="/%:@!$&'()*+,;=?")
|
||||
|
||||
return urlunsplit((parsed.scheme, netloc, path, query, fragment))
|
||||
|
||||
# Hostnames that should always be blocked regardless of IP resolution
|
||||
# or any config toggle. These are cloud metadata endpoints that an
|
||||
# attacker could use to steal instance credentials.
|
||||
|
|
|
|||
|
|
@ -102,7 +102,7 @@ from tools.tool_backend_helpers import ( # noqa: F401
|
|||
nous_tool_gateway_unavailable_message,
|
||||
prefers_gateway,
|
||||
)
|
||||
from tools.url_safety import async_is_safe_url
|
||||
from tools.url_safety import async_is_safe_url, normalize_url_for_request
|
||||
import sys
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
|
@ -902,17 +902,25 @@ async def web_extract_tool(
|
|||
# URL-decode first so percent-encoded secrets (%73k- = sk-) are caught.
|
||||
from agent.redact import _PREFIX_RE
|
||||
from urllib.parse import unquote
|
||||
normalized_urls: List[str] = []
|
||||
for _url in urls:
|
||||
if _PREFIX_RE.search(_url) or _PREFIX_RE.search(unquote(_url)):
|
||||
normalized_url = normalize_url_for_request(_url)
|
||||
if (
|
||||
_PREFIX_RE.search(_url)
|
||||
or _PREFIX_RE.search(unquote(_url))
|
||||
or _PREFIX_RE.search(normalized_url)
|
||||
or _PREFIX_RE.search(unquote(normalized_url))
|
||||
):
|
||||
return json.dumps({
|
||||
"success": False,
|
||||
"error": "Blocked: URL contains what appears to be an API key or token. "
|
||||
"Secrets must not be sent in URLs.",
|
||||
})
|
||||
normalized_urls.append(normalized_url)
|
||||
|
||||
debug_call_data = {
|
||||
"parameters": {
|
||||
"urls": urls,
|
||||
"urls": normalized_urls,
|
||||
"format": format,
|
||||
"use_llm_processing": use_llm_processing,
|
||||
"model": model,
|
||||
|
|
@ -928,12 +936,12 @@ async def web_extract_tool(
|
|||
}
|
||||
|
||||
try:
|
||||
logger.info("Extracting content from %d URL(s)", len(urls))
|
||||
logger.info("Extracting content from %d URL(s)", len(normalized_urls))
|
||||
|
||||
# ── SSRF protection — filter out private/internal URLs before any backend ──
|
||||
safe_urls = []
|
||||
ssrf_blocked: List[Dict[str, Any]] = []
|
||||
for url in urls:
|
||||
for url in normalized_urls:
|
||||
if not await async_is_safe_url(url):
|
||||
ssrf_blocked.append({
|
||||
"url": url, "title": "", "content": "",
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue