mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-29 06:31:32 +00:00
* ci(tests): install ripgrep from prebuilt tarball instead of apt
apt-get update + install of ripgrep takes ~4 min on the GHA Ubuntu
runners (the apt-get update against archive.ubuntu.com is the slow
part; ripgrep itself is small). Switching to the upstream musl
binary tarball cuts the step to a few seconds.
- Pinned to ripgrep 15.1.0 with sha256 verification (same hash as
published in the releases sha256 sidecar file).
- Drops the `rg` binary into /usr/local/bin so it is on PATH for
every subsequent step without GITHUB_PATH manipulation.
- Applied to both the test and e2e jobs in tests.yml.
* fix(cli): compile syntax check to tempdir, not source __pycache__
`_validate_critical_files_syntax` runs `py_compile.compile()` on each
critical bootstrap file after a successful `git pull`. The default
`py_compile` writes the resulting `.pyc` next to the source under
`__pycache__/`, which causes two real problems:
1. Parallel test workers walking the same source tree (e.g. running
the suite under per-file process isolation) can race against each
other on the `__pycache__` write — manifests as flaky 'directory
not empty' errors during teardown.
2. In production, the post-pull syntax check leaves a `.pyc` behind
that the next interpreter run might pick up — fine when the
interpreter version matches, sketchy if it doesn't.
Fix: write the compiled output to a `tempfile.TemporaryDirectory()`
that's discarded on function exit. We only care about the compile-or-not
signal, not the artifact.
* test(runner): per-file process isolation, drop manual state reset + xdist
Replace fragile manual _reset_module_state test fixtures with robust
per-file subprocess isolation. Each test file runs in a fresh
`python -m pytest <file>` subprocess via ThreadPoolExecutor. No xdist,
no custom pytest plugin, no shared worker state.
Key changes:
* scripts/run_tests_parallel.py — new runner: discovers test files,
runs N in parallel via ThreadPoolExecutor, captures stdout per file,
treats exit code 5 (no tests collected) as pass, kills all children
on exit. Change from cpu_count to cpu_count*2. The runner is
I/O-bound (waiting on subprocess.communicate() from pytest children)
The parent process does almost no CPU work, so 2x oversubscription
keeps more pipes full. When a file fails, immediately show the last
30 lines of pytest output (stack traces + FAILED summary) plus a
ready-to-copy repro command:
python -m pytest tests/agent/test_auxiliary_client.py
* scripts/run_tests.sh — delegates to run_tests_parallel.py
* .github/workflows/tests.yml — test step: python
scripts/run_tests_parallel.py
* pyproject.toml — drop pytest-xdist, pytest-split; simplify addopts
* tests/conftest.py — remove ~200 lines of manual state-reset fixtures
* AGENTS.md — update Testing section for per-file design
* test(runner): speed gateway test antipattern scan up
* fix(test): web search provider plugin test missing xai
* fix(tests): make 14 test files pass under per-file subprocess isolation
Tests that relied on cross-file state pollution from xdist workers
fail when run in isolation (per-file subprocess model). Root causes
and fixes:
Tool registry not populated:
- test_video_generation_tool_surface_matrix: add discover_builtin_tools()
- test_web_providers_brave_free/ddgs/searxng/general: autouse fixtures
registering all 8 bundled web providers, reset after each test
- test_website_policy: same provider registration pattern
- test_web_tools_tavily: same pattern across 3 dispatch test classes
- Also add is_safe_url/check_website_access mocks where SSRF check
blocks example.com (DNS resolution fails in isolated envs)
Stale check_fn cache:
- test_kanban_tools: invalidate_check_fn_cache() + _clear_tool_defs_cache()
in both kanban guidance tests (prior test cached False for kanban_show)
- test_discord_tool: cache invalidation in setup/teardown
- test_homeassistant_tool: invalidate_check_fn_cache() before registry queries
Module-level state pollution:
- test_auxiliary_client: autouse fixture clearing _aux_unhealthy_until cache
- test_skill_commands: set_session_vars() instead of patch.dict(os.environ)
(ContextVar takes precedence over os.environ)
- test_dm_topics: overwrite sys.modules + separate telegram.constants mock
+ force-reimport of gateway.platforms.telegram
- test_terminal_tool_requirements: removed duplicate class declaration,
autouse _clear_caches fixture
* change(tests): run_tests.sh explicitly includes env vars
instead of manually dropping some vars, now we just only include some
* fix(tests): 5 more isolation/NixOS fixes
- test_approval_plugin_hooks: isolate HERMES_HOME so real user's
command_allowlist doesn't short-circuit the approval path
- test_google_chat: skipif when Platform.GOOGLE_CHAT not in enum
(feature not merged on this branch)
- test_write_deny: test systemd prefix against tmp_path instead of
/etc/systemd which resolves to /nix/store on NixOS
- test_pty_bridge: use shutil.which('cat') instead of /bin/cat
(doesn't exist on NixOS)
- profiles.py: rmtree onexc handler chmod's parent dirs too, fixing
profile deletion when copytree preserved read-only modes from
nix store
* fix(tests): clear unhealthy cache in autouse fixture for auxiliary_client
* fix(tests): skip send_message when telegram not installed; handle missing worker_id in browser_supervisor
* fix: py3.11 rmtree onexc compat + belt-and-suspenders unhealthy cache clear for expired codex test
* fix: address PR #29016 review feedback
- Remove tracked .pytest-cache/ artifact and add to .gitignore
- Fix stale 'xdist worker' comment in conftest.py
- Deduplicate web provider registration into tests/tools/conftest.py
shared helper (register_all_web_providers), replacing 8 copy-pasted
blocks across 6 test files
- Update PR description: remove stale recovered-test-files claim,
fix worker count to match code (cpu_count*2)
* fix: eliminate race in stale-cache achievements test
The background scan thread could complete and overwrite _SNAPSHOT_CACHE
before evaluate_all() returned the stale data — only 10 fake sessions
made the scan finish instantly. Added scan_delay param to _FakeSessionDB
and set it to 2s in the stale-cache test so the background thread can't
win the race.
555 lines
20 KiB
Python
555 lines
20 KiB
Python
import json
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
import yaml
|
|
|
|
from tests.tools.conftest import register_all_web_providers
|
|
|
|
from tools.website_policy import WebsitePolicyError, check_website_access, load_website_blocklist
|
|
|
|
|
|
def test_load_website_blocklist_merges_config_and_shared_file(tmp_path):
|
|
shared = tmp_path / "community-blocklist.txt"
|
|
shared.write_text("# comment\nexample.org\nsub.bad.net\n", encoding="utf-8")
|
|
|
|
config_path = tmp_path / "config.yaml"
|
|
config_path.write_text(
|
|
yaml.safe_dump(
|
|
{
|
|
"security": {
|
|
"website_blocklist": {
|
|
"enabled": True,
|
|
"domains": ["example.com", "https://www.evil.test/path"],
|
|
"shared_files": [str(shared)],
|
|
}
|
|
}
|
|
},
|
|
sort_keys=False,
|
|
),
|
|
encoding="utf-8",
|
|
)
|
|
|
|
policy = load_website_blocklist(config_path)
|
|
|
|
assert policy["enabled"] is True
|
|
assert {rule["pattern"] for rule in policy["rules"]} == {
|
|
"example.com",
|
|
"evil.test",
|
|
"example.org",
|
|
"sub.bad.net",
|
|
}
|
|
|
|
|
|
def test_check_website_access_matches_parent_domain_subdomains(tmp_path):
|
|
config_path = tmp_path / "config.yaml"
|
|
config_path.write_text(
|
|
yaml.safe_dump(
|
|
{
|
|
"security": {
|
|
"website_blocklist": {
|
|
"enabled": True,
|
|
"domains": ["example.com"],
|
|
}
|
|
}
|
|
},
|
|
sort_keys=False,
|
|
),
|
|
encoding="utf-8",
|
|
)
|
|
|
|
blocked = check_website_access("https://docs.example.com/page", config_path=config_path)
|
|
|
|
assert blocked is not None
|
|
assert blocked["host"] == "docs.example.com"
|
|
assert blocked["rule"] == "example.com"
|
|
|
|
|
|
def test_check_website_access_supports_wildcard_subdomains_only(tmp_path):
|
|
config_path = tmp_path / "config.yaml"
|
|
config_path.write_text(
|
|
yaml.safe_dump(
|
|
{
|
|
"security": {
|
|
"website_blocklist": {
|
|
"enabled": True,
|
|
"domains": ["*.tracking.example"],
|
|
}
|
|
}
|
|
},
|
|
sort_keys=False,
|
|
),
|
|
encoding="utf-8",
|
|
)
|
|
|
|
assert check_website_access("https://a.tracking.example", config_path=config_path) is not None
|
|
assert check_website_access("https://www.tracking.example", config_path=config_path) is not None
|
|
assert check_website_access("https://tracking.example", config_path=config_path) is None
|
|
|
|
|
|
def test_default_config_exposes_website_blocklist_shape():
|
|
from hermes_cli.config import DEFAULT_CONFIG
|
|
|
|
website_blocklist = DEFAULT_CONFIG["security"]["website_blocklist"]
|
|
assert website_blocklist["enabled"] is False
|
|
assert website_blocklist["domains"] == []
|
|
assert website_blocklist["shared_files"] == []
|
|
|
|
|
|
def test_load_website_blocklist_uses_enabled_default_when_section_missing(tmp_path):
|
|
config_path = tmp_path / "config.yaml"
|
|
config_path.write_text(yaml.safe_dump({"display": {"tool_progress": "all"}}, sort_keys=False), encoding="utf-8")
|
|
|
|
policy = load_website_blocklist(config_path)
|
|
|
|
assert policy == {"enabled": False, "rules": []}
|
|
|
|
|
|
def test_load_website_blocklist_raises_clean_error_for_invalid_domains_type(tmp_path):
|
|
config_path = tmp_path / "config.yaml"
|
|
config_path.write_text(
|
|
yaml.safe_dump(
|
|
{
|
|
"security": {
|
|
"website_blocklist": {
|
|
"enabled": True,
|
|
"domains": "example.com",
|
|
}
|
|
}
|
|
},
|
|
sort_keys=False,
|
|
),
|
|
encoding="utf-8",
|
|
)
|
|
|
|
with pytest.raises(WebsitePolicyError, match="security.website_blocklist.domains must be a list"):
|
|
load_website_blocklist(config_path)
|
|
|
|
|
|
def test_load_website_blocklist_raises_clean_error_for_invalid_shared_files_type(tmp_path):
|
|
config_path = tmp_path / "config.yaml"
|
|
config_path.write_text(
|
|
yaml.safe_dump(
|
|
{
|
|
"security": {
|
|
"website_blocklist": {
|
|
"enabled": True,
|
|
"shared_files": "community-blocklist.txt",
|
|
}
|
|
}
|
|
},
|
|
sort_keys=False,
|
|
),
|
|
encoding="utf-8",
|
|
)
|
|
|
|
with pytest.raises(WebsitePolicyError, match="security.website_blocklist.shared_files must be a list"):
|
|
load_website_blocklist(config_path)
|
|
|
|
|
|
def test_load_website_blocklist_raises_clean_error_for_invalid_top_level_config_type(tmp_path):
|
|
config_path = tmp_path / "config.yaml"
|
|
config_path.write_text(yaml.safe_dump(["not", "a", "mapping"], sort_keys=False), encoding="utf-8")
|
|
|
|
with pytest.raises(WebsitePolicyError, match="config root must be a mapping"):
|
|
load_website_blocklist(config_path)
|
|
|
|
|
|
def test_load_website_blocklist_raises_clean_error_for_invalid_security_type(tmp_path):
|
|
config_path = tmp_path / "config.yaml"
|
|
config_path.write_text(yaml.safe_dump({"security": []}, sort_keys=False), encoding="utf-8")
|
|
|
|
with pytest.raises(WebsitePolicyError, match="security must be a mapping"):
|
|
load_website_blocklist(config_path)
|
|
|
|
|
|
def test_load_website_blocklist_raises_clean_error_for_invalid_website_blocklist_type(tmp_path):
|
|
config_path = tmp_path / "config.yaml"
|
|
config_path.write_text(
|
|
yaml.safe_dump(
|
|
{
|
|
"security": {
|
|
"website_blocklist": "block everything",
|
|
}
|
|
},
|
|
sort_keys=False,
|
|
),
|
|
encoding="utf-8",
|
|
)
|
|
|
|
with pytest.raises(WebsitePolicyError, match="security.website_blocklist must be a mapping"):
|
|
load_website_blocklist(config_path)
|
|
|
|
|
|
def test_load_website_blocklist_raises_clean_error_for_invalid_enabled_type(tmp_path):
|
|
config_path = tmp_path / "config.yaml"
|
|
config_path.write_text(
|
|
yaml.safe_dump(
|
|
{
|
|
"security": {
|
|
"website_blocklist": {
|
|
"enabled": "false",
|
|
}
|
|
}
|
|
},
|
|
sort_keys=False,
|
|
),
|
|
encoding="utf-8",
|
|
)
|
|
|
|
with pytest.raises(WebsitePolicyError, match="security.website_blocklist.enabled must be a boolean"):
|
|
load_website_blocklist(config_path)
|
|
|
|
|
|
def test_load_website_blocklist_raises_clean_error_for_malformed_yaml(tmp_path):
|
|
config_path = tmp_path / "config.yaml"
|
|
config_path.write_text("security: [oops\n", encoding="utf-8")
|
|
|
|
with pytest.raises(WebsitePolicyError, match="Invalid config YAML"):
|
|
load_website_blocklist(config_path)
|
|
|
|
|
|
def test_load_website_blocklist_wraps_shared_file_read_errors(tmp_path, monkeypatch):
|
|
shared = tmp_path / "community-blocklist.txt"
|
|
shared.write_text("example.org\n", encoding="utf-8")
|
|
|
|
config_path = tmp_path / "config.yaml"
|
|
config_path.write_text(
|
|
yaml.safe_dump(
|
|
{
|
|
"security": {
|
|
"website_blocklist": {
|
|
"enabled": True,
|
|
"shared_files": [str(shared)],
|
|
}
|
|
}
|
|
},
|
|
sort_keys=False,
|
|
),
|
|
encoding="utf-8",
|
|
)
|
|
|
|
def failing_read_text(self, *args, **kwargs):
|
|
raise PermissionError("no permission")
|
|
|
|
monkeypatch.setattr(Path, "read_text", failing_read_text)
|
|
|
|
# Unreadable shared files are now warned and skipped (not raised),
|
|
# so the blocklist loads successfully but without those rules.
|
|
result = load_website_blocklist(config_path)
|
|
assert result["enabled"] is True
|
|
assert result["rules"] == [] # shared file rules skipped
|
|
|
|
|
|
def test_check_website_access_uses_dynamic_hermes_home(monkeypatch, tmp_path):
|
|
hermes_home = tmp_path / "hermes-home"
|
|
hermes_home.mkdir()
|
|
(hermes_home / "config.yaml").write_text(
|
|
yaml.safe_dump(
|
|
{
|
|
"security": {
|
|
"website_blocklist": {
|
|
"enabled": True,
|
|
"domains": ["dynamic.example"],
|
|
}
|
|
}
|
|
},
|
|
sort_keys=False,
|
|
),
|
|
encoding="utf-8",
|
|
)
|
|
|
|
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
|
|
|
|
# Invalidate the module-level cache so the new HERMES_HOME is picked up.
|
|
# A prior test may have cached a default policy (enabled=False) under the
|
|
# old HERMES_HOME set by the autouse _isolate_hermes_home fixture.
|
|
from tools.website_policy import invalidate_cache
|
|
invalidate_cache()
|
|
|
|
blocked = check_website_access("https://dynamic.example/path")
|
|
|
|
assert blocked is not None
|
|
assert blocked["rule"] == "dynamic.example"
|
|
|
|
|
|
def test_check_website_access_blocks_scheme_less_urls(tmp_path):
|
|
config_path = tmp_path / "config.yaml"
|
|
config_path.write_text(
|
|
yaml.safe_dump(
|
|
{
|
|
"security": {
|
|
"website_blocklist": {
|
|
"enabled": True,
|
|
"domains": ["blocked.test"],
|
|
}
|
|
}
|
|
},
|
|
sort_keys=False,
|
|
),
|
|
encoding="utf-8",
|
|
)
|
|
|
|
blocked = check_website_access("www.blocked.test/path", config_path=config_path)
|
|
|
|
assert blocked is not None
|
|
assert blocked["host"] == "www.blocked.test"
|
|
assert blocked["rule"] == "blocked.test"
|
|
|
|
|
|
def test_browser_navigate_returns_policy_block(monkeypatch):
|
|
from tools import browser_tool
|
|
|
|
# Allow SSRF check to pass so the policy check is reached
|
|
monkeypatch.setattr(browser_tool, "_is_safe_url", lambda url: True)
|
|
monkeypatch.setattr(
|
|
browser_tool,
|
|
"check_website_access",
|
|
lambda url: {
|
|
"host": "blocked.test",
|
|
"rule": "blocked.test",
|
|
"source": "config",
|
|
"message": "Blocked by website policy",
|
|
},
|
|
)
|
|
monkeypatch.setattr(
|
|
browser_tool,
|
|
"_run_browser_command",
|
|
lambda *args, **kwargs: pytest.fail("browser command should not run for blocked URL"),
|
|
)
|
|
|
|
result = json.loads(browser_tool.browser_navigate("https://blocked.test"))
|
|
|
|
assert result["success"] is False
|
|
assert result["blocked_by_policy"]["rule"] == "blocked.test"
|
|
|
|
|
|
def test_browser_navigate_allows_when_shared_file_missing(monkeypatch, tmp_path):
|
|
"""Missing shared blocklist files are warned and skipped, not fatal."""
|
|
from tools import browser_tool
|
|
|
|
config_path = tmp_path / "config.yaml"
|
|
config_path.write_text(
|
|
yaml.safe_dump(
|
|
{
|
|
"security": {
|
|
"website_blocklist": {
|
|
"enabled": True,
|
|
"shared_files": ["missing-blocklist.txt"],
|
|
}
|
|
}
|
|
},
|
|
sort_keys=False,
|
|
),
|
|
encoding="utf-8",
|
|
)
|
|
|
|
# check_website_access should return None (allow) — missing file is skipped
|
|
result = check_website_access("https://allowed.test", config_path=config_path)
|
|
assert result is None
|
|
|
|
|
|
class TestWebToolPolicy:
|
|
"""Tests that exercise web_extract_tool / web_crawl_tool with website-policy gates.
|
|
|
|
These tests need the bundled web providers to be registered in the
|
|
agent.web_search_registry so the tool dispatchers can find an active
|
|
provider. Without registration, the tools return an error dict that
|
|
lacks a ``results`` key, causing ``KeyError``.
|
|
"""
|
|
|
|
_register_providers = staticmethod(register_all_web_providers)
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def _populate_web_registry(self):
|
|
self._register_providers()
|
|
yield
|
|
from agent.web_search_registry import _reset_for_tests
|
|
_reset_for_tests()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_web_extract_short_circuits_blocked_url(self, monkeypatch):
|
|
from tools import web_tools
|
|
from plugins.web.firecrawl import provider as firecrawl_provider
|
|
|
|
# Allow test URLs past SSRF check so website policy is what gets tested
|
|
monkeypatch.setattr(web_tools, "is_safe_url", lambda url: True)
|
|
# The per-URL website-policy gate moved into the firecrawl plugin's
|
|
# extract() during the web-provider migration. Patch it at the new
|
|
# location; the dispatcher-level gate (used by web_crawl_tool's
|
|
# pre-flight) still lives on tools.web_tools.
|
|
monkeypatch.setattr(
|
|
firecrawl_provider,
|
|
"check_website_access",
|
|
lambda url: {
|
|
"host": "blocked.test",
|
|
"rule": "blocked.test",
|
|
"source": "config",
|
|
"message": "Blocked by website policy",
|
|
},
|
|
)
|
|
monkeypatch.setattr(
|
|
firecrawl_provider,
|
|
"_get_firecrawl_client",
|
|
lambda: pytest.fail("firecrawl should not run for blocked URL"),
|
|
)
|
|
monkeypatch.setattr("tools.interrupt.is_interrupted", lambda: False)
|
|
# Force the firecrawl plugin to be the active extract provider.
|
|
monkeypatch.setenv("FIRECRAWL_API_KEY", "fake-key")
|
|
|
|
result = json.loads(await web_tools.web_extract_tool(["https://blocked.test"], use_llm_processing=False))
|
|
|
|
assert result["results"][0]["url"] == "https://blocked.test"
|
|
assert "Blocked by website policy" in result["results"][0]["error"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_web_extract_blocks_redirected_final_url(self, monkeypatch):
|
|
from tools import web_tools
|
|
from plugins.web.firecrawl import provider as firecrawl_provider
|
|
|
|
# Allow test URLs past SSRF check so website policy is what gets tested
|
|
monkeypatch.setattr(web_tools, "is_safe_url", lambda url: True)
|
|
|
|
def fake_check(url):
|
|
if url == "https://allowed.test":
|
|
return None
|
|
if url == "https://blocked.test/final":
|
|
return {
|
|
"host": "blocked.test",
|
|
"rule": "blocked.test",
|
|
"source": "config",
|
|
"message": "Blocked by website policy",
|
|
}
|
|
pytest.fail(f"unexpected URL checked: {url}")
|
|
|
|
class FakeFirecrawlClient:
|
|
def scrape(self, url, formats):
|
|
return {
|
|
"markdown": "secret content",
|
|
"metadata": {
|
|
"title": "Redirected",
|
|
"sourceURL": "https://blocked.test/final",
|
|
},
|
|
}
|
|
|
|
# After the web-provider migration, the per-URL gate + firecrawl client
|
|
# live in the plugin. Patch both at the plugin location.
|
|
monkeypatch.setattr(firecrawl_provider, "check_website_access", fake_check)
|
|
monkeypatch.setattr(firecrawl_provider, "_get_firecrawl_client", lambda: FakeFirecrawlClient())
|
|
monkeypatch.setattr("tools.interrupt.is_interrupted", lambda: False)
|
|
monkeypatch.setenv("FIRECRAWL_API_KEY", "fake-key")
|
|
|
|
result = json.loads(await web_tools.web_extract_tool(["https://allowed.test"], use_llm_processing=False))
|
|
|
|
assert result["results"][0]["url"] == "https://blocked.test/final"
|
|
assert result["results"][0]["content"] == ""
|
|
assert result["results"][0]["blocked_by_policy"]["rule"] == "blocked.test"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_web_crawl_short_circuits_blocked_url(self, monkeypatch):
|
|
from tools import web_tools
|
|
|
|
# web_crawl_tool checks for Firecrawl env before website policy
|
|
monkeypatch.setenv("FIRECRAWL_API_KEY", "fake-key")
|
|
# Allow test URLs past SSRF check so website policy is what gets tested
|
|
monkeypatch.setattr(web_tools, "is_safe_url", lambda url: True)
|
|
# The dispatcher-level (seed-URL) policy gate still lives on web_tools.
|
|
# No per-page gate runs in this test because the dispatcher returns
|
|
# immediately when the seed is blocked, before delegating to the plugin.
|
|
monkeypatch.setattr(
|
|
web_tools,
|
|
"check_website_access",
|
|
lambda url: {
|
|
"host": "blocked.test",
|
|
"rule": "blocked.test",
|
|
"source": "config",
|
|
"message": "Blocked by website policy",
|
|
},
|
|
)
|
|
# If the dispatcher ever reaches the firecrawl plugin's crawl(), the test
|
|
# fails — pin the plugin module's client lookup so we'd notice.
|
|
from plugins.web.firecrawl import provider as firecrawl_provider
|
|
monkeypatch.setattr(
|
|
firecrawl_provider,
|
|
"_get_firecrawl_client",
|
|
lambda: pytest.fail("firecrawl plugin should not run for blocked crawl URL"),
|
|
)
|
|
monkeypatch.setattr("tools.interrupt.is_interrupted", lambda: False)
|
|
|
|
result = json.loads(await web_tools.web_crawl_tool("https://blocked.test", use_llm_processing=False))
|
|
|
|
assert result["results"][0]["url"] == "https://blocked.test"
|
|
assert result["results"][0]["blocked_by_policy"]["rule"] == "blocked.test"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_web_crawl_blocks_redirected_final_url(self, monkeypatch):
|
|
from tools import web_tools
|
|
from plugins.web.firecrawl import provider as firecrawl_provider
|
|
|
|
# Force the firecrawl plugin to be the active crawl provider.
|
|
monkeypatch.setenv("FIRECRAWL_API_KEY", "fake-key")
|
|
# Allow test URLs past SSRF check so website policy is what gets tested
|
|
monkeypatch.setattr(web_tools, "is_safe_url", lambda url: True)
|
|
|
|
def fake_check(url):
|
|
# Dispatcher seed-URL gate (web_tools.check_website_access call)
|
|
# and plugin per-page gate (firecrawl_provider.check_website_access
|
|
# call) both flow through this single fake_check.
|
|
if url == "https://allowed.test":
|
|
return None
|
|
if url == "https://blocked.test/final":
|
|
return {
|
|
"host": "blocked.test",
|
|
"rule": "blocked.test",
|
|
"source": "config",
|
|
"message": "Blocked by website policy",
|
|
}
|
|
pytest.fail(f"unexpected URL checked: {url}")
|
|
|
|
class FakeCrawlClient:
|
|
def crawl(self, url, **kwargs):
|
|
return {
|
|
"data": [
|
|
{
|
|
"markdown": "secret crawl content",
|
|
"metadata": {
|
|
"title": "Redirected crawl page",
|
|
"sourceURL": "https://blocked.test/final",
|
|
},
|
|
}
|
|
]
|
|
}
|
|
|
|
# After PR #25182 follow-up: per-page policy gate lives in
|
|
# plugins.web.firecrawl.provider.crawl(). Patch the gate + client at
|
|
# the plugin location. The dispatcher-level (seed) gate also reads
|
|
# web_tools.check_website_access — patch both.
|
|
monkeypatch.setattr(web_tools, "check_website_access", fake_check)
|
|
monkeypatch.setattr(firecrawl_provider, "check_website_access", fake_check)
|
|
monkeypatch.setattr(firecrawl_provider, "_get_firecrawl_client", lambda: FakeCrawlClient())
|
|
monkeypatch.setattr("tools.interrupt.is_interrupted", lambda: False)
|
|
|
|
result = json.loads(await web_tools.web_crawl_tool("https://allowed.test", use_llm_processing=False))
|
|
|
|
assert result["results"][0]["content"] == ""
|
|
assert result["results"][0]["error"] == "Blocked by website policy"
|
|
assert result["results"][0]["blocked_by_policy"]["rule"] == "blocked.test"
|
|
|
|
|
|
def test_check_website_access_fails_open_on_malformed_config(tmp_path, monkeypatch):
|
|
"""Malformed config with default path should fail open (return None), not crash."""
|
|
config_path = tmp_path / "config.yaml"
|
|
config_path.write_text("security: [oops\n", encoding="utf-8")
|
|
|
|
# With explicit config_path (test mode), errors propagate
|
|
with pytest.raises(WebsitePolicyError):
|
|
check_website_access("https://example.com", config_path=config_path)
|
|
|
|
# Simulate default path by pointing HERMES_HOME to tmp_path
|
|
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
|
|
from tools import website_policy
|
|
website_policy.invalidate_cache()
|
|
|
|
# With default path, errors are caught and fail open
|
|
result = check_website_access("https://example.com")
|
|
assert result is None # allowed, not crashed
|