Merge branch 'main' into docker_s6

This commit is contained in:
Ben Barclay 2026-05-25 09:39:27 +10:00 committed by GitHub
commit 59da190512
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
417 changed files with 26434 additions and 3321 deletions

View file

@ -57,6 +57,59 @@ def _build_parser():
return parser
class TestChatVerboseArg:
"""Verify chat --verbose preserves config fallback when absent."""
def test_chat_without_verbose_leaves_attribute_unset(self):
from hermes_cli._parser import build_top_level_parser
parser, _subparsers, _chat_parser = build_top_level_parser()
args = parser.parse_args(["chat"])
assert not hasattr(args, "verbose")
def test_chat_verbose_sets_attribute_true(self):
from hermes_cli._parser import build_top_level_parser
parser, _subparsers, _chat_parser = build_top_level_parser()
args = parser.parse_args(["chat", "--verbose"])
assert args.verbose is True
def test_cmd_chat_forwards_none_when_verbose_is_absent(self, monkeypatch):
import types
import sys
import hermes_cli.main as main_mod
from hermes_cli._parser import build_top_level_parser
parser, _subparsers, chat_parser = build_top_level_parser()
chat_parser.set_defaults(func=main_mod.cmd_chat)
args = parser.parse_args(["chat"])
captured = {}
fake_cli = types.ModuleType("cli")
def fake_main(**kwargs):
captured.update(kwargs)
setattr(fake_cli, "main", fake_main)
fake_banner = types.ModuleType("hermes_cli.banner")
setattr(fake_banner, "prefetch_update_check", lambda: None)
fake_skills_sync = types.ModuleType("tools.skills_sync")
setattr(fake_skills_sync, "sync_skills", lambda quiet=True: None)
monkeypatch.setitem(sys.modules, "cli", fake_cli)
monkeypatch.setitem(sys.modules, "hermes_cli.banner", fake_banner)
monkeypatch.setitem(sys.modules, "tools.skills_sync", fake_skills_sync)
monkeypatch.setattr(main_mod, "_has_any_provider_configured", lambda: True)
monkeypatch.setattr(main_mod, "_pin_kanban_board_env", lambda: None)
main_mod.cmd_chat(args)
assert captured["quiet"] is False
assert "verbose" not in captured
class TestYoloEnvVar:
"""Verify --yolo sets HERMES_YOLO_MODE regardless of flag position.

View file

@ -392,8 +392,84 @@ def test_get_qwen_auth_status_logged_in(qwen_env):
assert status["api_key"] == "status-at"
def test_get_qwen_auth_status_refreshes_expired_token(qwen_env):
expired_ms = int((time.time() - 3600) * 1000)
tokens = _make_qwen_tokens(access_token="old-at", expiry_date=expired_ms)
_write_qwen_creds(qwen_env, tokens)
refreshed = _make_qwen_tokens(access_token="refreshed-at")
with patch(
"hermes_cli.auth._refresh_qwen_cli_tokens", return_value=refreshed
) as mock_refresh:
status = get_qwen_auth_status()
mock_refresh.assert_called_once()
assert status["logged_in"] is True
assert status["api_key"] == "refreshed-at"
def test_get_qwen_auth_status_expired_unrefreshable_token_is_not_logged_in(qwen_env):
expired_ms = int((time.time() - 3600) * 1000)
tokens = _make_qwen_tokens(access_token="dead-at", expiry_date=expired_ms)
_write_qwen_creds(qwen_env, tokens)
with patch(
"hermes_cli.auth._refresh_qwen_cli_tokens",
side_effect=AuthError(
"Qwen refresh rejected. Re-run 'qwen auth qwen-oauth'.",
provider="qwen-oauth",
code="qwen_refresh_failed",
),
) as mock_refresh:
status = get_qwen_auth_status()
mock_refresh.assert_called_once()
assert status["logged_in"] is False
assert "qwen auth qwen-oauth" in status["error"]
def test_get_qwen_auth_status_not_logged_in(qwen_env):
# No credentials file
status = get_qwen_auth_status()
assert status["logged_in"] is False
assert "error" in status
def test_model_flow_qwen_oauth_stale_token_shows_reauth_guidance(qwen_env, monkeypatch, capsys):
from hermes_cli.main import _model_flow_qwen_oauth
expired_ms = int((time.time() - 3600) * 1000)
tokens = _make_qwen_tokens(access_token="dead-at", expiry_date=expired_ms)
_write_qwen_creds(qwen_env, tokens)
monkeypatch.setattr(
"hermes_cli.auth._refresh_qwen_cli_tokens",
lambda *args, **kwargs: (_ for _ in ()).throw(
AuthError(
"Qwen refresh rejected. Re-run 'qwen auth qwen-oauth'.",
provider="qwen-oauth",
code="qwen_refresh_failed",
)
),
)
prompt_called = {"value": False}
update_called = {"value": False}
monkeypatch.setattr(
"hermes_cli.auth._prompt_model_selection",
lambda *args, **kwargs: prompt_called.__setitem__("value", True),
)
monkeypatch.setattr(
"hermes_cli.auth._update_config_for_provider",
lambda *args, **kwargs: update_called.__setitem__("value", True),
)
_model_flow_qwen_oauth({}, current_model="qwen3-coder-plus")
out = capsys.readouterr().out
assert "Run: qwen auth qwen-oauth" in out
assert "Qwen refresh rejected" in out
assert prompt_called["value"] is False
assert update_called["value"] is False

View file

@ -0,0 +1,13 @@
"""Tests for placeholder API key detection in hermes_cli.auth."""
from hermes_cli.auth import has_usable_secret
def test_has_usable_secret_rejects_documented_placeholder_key() -> None:
"""Network-exposed API server key must reject static documentation placeholders."""
assert not has_usable_secret("your_api_key_here", min_length=8)
def test_has_usable_secret_accepts_generated_key() -> None:
"""Random-looking keys should still be accepted."""
assert has_usable_secret("b4d59f7fe8b857d0b367ef0f5710b6a4", min_length=8)

View file

@ -0,0 +1,131 @@
"""Tests for curses color compatibility on low-color terminals (Docker).
Regression test for #13688: ``hermes plugins`` crashes with
``curses.error: init_pair() : color number is greater than COLORS-1``
in Docker containers where curses.COLORS == 8 (only colors 0-7 exist).
The bug was ``curses.init_pair(4, 8, -1)`` using raw color 8 ("bright
black" / dim gray) which does not exist on 8-color terminals. The fix
clamps with ``min(8, curses.COLORS - 1)``.
"""
import curses
import re
from pathlib import Path
from unittest.mock import patch, MagicMock, call
import pytest
# Path to the source files under test
_SRC_ROOT = Path(__file__).parent.parent.parent / "hermes_cli"
class TestInitPairClampingBehavior:
"""Simulate curses color initialization on low-color terminals.
Patches curses.COLORS to 8 (Docker default) and verifies that
init_pair is never called with a color >= COLORS.
"""
def _collect_init_pair_calls(self, draw_fn, colors_value):
"""Run a curses draw function with a mock stdscr and patched COLORS.
Returns list of (pair_number, fg, bg) tuples from init_pair calls.
"""
calls = []
real_init_pair = curses.init_pair
def tracking_init_pair(pair, fg, bg):
calls.append((pair, fg, bg))
mock_stdscr = MagicMock()
mock_stdscr.getmaxyx.return_value = (24, 80)
mock_stdscr.getch.return_value = 27 # ESC to exit
with patch("curses.COLORS", colors_value, create=True), \
patch("curses.init_pair", side_effect=tracking_init_pair), \
patch("curses.has_colors", return_value=True), \
patch("curses.start_color"), \
patch("curses.use_default_colors"), \
patch("curses.curs_set"):
try:
draw_fn(mock_stdscr)
except (SystemExit, StopIteration, Exception):
pass # draw functions loop until keypress
return calls
def test_8_color_terminal_no_color_exceeds_limit(self):
"""On an 8-color terminal (Docker), no init_pair fg color >= 8."""
# Simulate the color init pattern from plugins_cmd.py
def _simulated_color_init(stdscr):
if curses.has_colors():
curses.start_color()
curses.use_default_colors()
curses.init_pair(1, curses.COLOR_GREEN, -1)
curses.init_pair(2, curses.COLOR_YELLOW, -1)
curses.init_pair(3, curses.COLOR_CYAN, -1)
curses.init_pair(4, 8 if curses.COLORS > 8 else curses.COLOR_WHITE, -1)
calls = self._collect_init_pair_calls(_simulated_color_init, 8)
for pair, fg, bg in calls:
assert fg < 8, (
f"init_pair({pair}, {fg}, {bg}) uses color {fg} which "
f"does not exist on an 8-color terminal (valid: 0-7)"
)
def test_256_color_terminal_uses_color_8(self):
"""On a 256-color terminal, color 8 (dim gray) should be used."""
def _simulated_color_init(stdscr):
if curses.has_colors():
curses.start_color()
curses.use_default_colors()
curses.init_pair(4, 8 if curses.COLORS > 8 else curses.COLOR_WHITE, -1)
calls = self._collect_init_pair_calls(_simulated_color_init, 256)
assert any(fg == 8 for _, fg, _ in calls), (
"On 256-color terminals, color 8 (dim gray) should be used"
)
def test_16_color_terminal_uses_color_8(self):
"""On a 16-color terminal, color 8 should be available."""
def _simulated_color_init(stdscr):
if curses.has_colors():
curses.start_color()
curses.use_default_colors()
curses.init_pair(4, 8 if curses.COLORS > 8 else curses.COLOR_WHITE, -1)
calls = self._collect_init_pair_calls(_simulated_color_init, 16)
assert any(fg == 8 for _, fg, _ in calls)
class TestSourceCodeGuardrails:
"""Regression guardrails: raw color 8 must not reappear in source.
These complement the behavioral tests above they catch regressions
introduced by copy-paste of the old pattern.
"""
_RAW_COLOR_8_PATTERN = re.compile(r'init_pair\(\d+,\s*8\s*,')
def test_no_raw_color_8_in_plugins_cmd(self):
source = (_SRC_ROOT / "plugins_cmd.py").read_text()
matches = self._RAW_COLOR_8_PATTERN.findall(source)
assert not matches, (
f"plugins_cmd.py contains unclamped color 8: {matches}"
)
def test_no_raw_color_8_in_main(self):
source = (_SRC_ROOT / "main.py").read_text()
matches = self._RAW_COLOR_8_PATTERN.findall(source)
assert not matches, (
f"main.py contains unclamped color 8: {matches}"
)
def test_no_raw_color_8_in_curses_ui(self):
source = (_SRC_ROOT / "curses_ui.py").read_text()
matches = self._RAW_COLOR_8_PATTERN.findall(source)
assert not matches, (
f"curses_ui.py contains unclamped color 8: {matches}"
)

View file

@ -353,6 +353,40 @@ class TestCaptureLogSnapshotRedaction:
assert snap.full_text is not None
assert _REDACT_FIXTURE_TOKEN not in snap.full_text
def test_default_redacts_email_addresses_for_public_share(
self, hermes_home_with_secret
):
from hermes_cli.debug import _capture_log_snapshot
log_path = hermes_home_with_secret / "logs" / "agent.log"
log_path.write_text(
"2026-04-12 17:00:00 INFO gateway.run: "
"inbound message: platform=bluebubbles "
"user=person@example.com chat=iMessage;-;person@example.com msg='hello'\n"
)
snap = _capture_log_snapshot("agent", tail_lines=10)
assert "person@example.com" not in snap.tail_text
assert "[REDACTED_EMAIL]" in snap.tail_text
assert snap.full_text is not None
assert "person@example.com" not in snap.full_text
def test_no_redact_preserves_email_addresses(self, hermes_home_with_secret):
from hermes_cli.debug import _capture_log_snapshot
log_path = hermes_home_with_secret / "logs" / "agent.log"
log_path.write_text(
"2026-04-12 17:00:00 INFO gateway.run: "
"inbound message: platform=bluebubbles "
"user=person@example.com chat=iMessage;-;person@example.com msg='hello'\n"
)
snap = _capture_log_snapshot("agent", tail_lines=10, redact=False)
assert "person@example.com" in snap.tail_text
assert "person@example.com" in (snap.full_text or "")
def test_capture_default_log_snapshots_threads_redact(
self, hermes_home_with_secret
):

View file

@ -70,6 +70,23 @@ def test_user_env_takes_precedence_over_project_env(tmp_path, monkeypatch):
assert os.getenv("OPENAI_API_KEY") == "project-key"
def test_null_bytes_in_user_env_are_stripped(tmp_path, monkeypatch):
home = tmp_path / "hermes"
home.mkdir()
env_file = home / ".env"
# Null bytes can be introduced when copy-pasting API keys.
env_file.write_text("GLM_API_KEY=abc\x00\x00\nOPENAI_API_KEY=sk-123\n", encoding="utf-8")
monkeypatch.delenv("GLM_API_KEY", raising=False)
monkeypatch.delenv("OPENAI_API_KEY", raising=False)
loaded = load_hermes_dotenv(hermes_home=home)
assert loaded == [env_file]
assert os.getenv("GLM_API_KEY") == "abc"
assert os.getenv("OPENAI_API_KEY") == "sk-123"
def test_main_import_applies_user_env_over_shell_values(tmp_path, monkeypatch):
home = tmp_path / "hermes"
home.mkdir()

View file

@ -55,6 +55,31 @@ class TestReadChain:
{"provider": "nous", "model": "Hermes-4-Llama-3.1-405B"},
]
def test_merges_new_and_legacy_formats(self):
from hermes_cli.fallback_cmd import _read_chain
cfg = {
"fallback_providers": [
{"provider": "openrouter", "model": "anthropic/claude-sonnet-4.6"},
],
"fallback_model": {"provider": "nous", "model": "Hermes-4"},
}
assert _read_chain(cfg) == [
{"provider": "openrouter", "model": "anthropic/claude-sonnet-4.6"},
{"provider": "nous", "model": "Hermes-4"},
]
def test_legacy_duplicate_is_deduplicated_after_merge(self):
from hermes_cli.fallback_cmd import _read_chain
cfg = {
"fallback_providers": [
{"provider": "openrouter", "model": "anthropic/claude-sonnet-4.6"},
],
"fallback_model": {"provider": "OpenRouter", "model": "anthropic/claude-sonnet-4.6"},
}
assert _read_chain(cfg) == [
{"provider": "openrouter", "model": "anthropic/claude-sonnet-4.6"},
]
def test_migrates_legacy_single_dict(self):
from hermes_cli.fallback_cmd import _read_chain
cfg = {"fallback_model": {"provider": "openrouter", "model": "gpt-5.4"}}

View file

@ -69,18 +69,19 @@ class TestPluginPickerInjection:
assert "Myimg" in names
assert "myimg" in plugin_names
def test_fal_skipped_to_avoid_duplicate(self, monkeypatch):
def test_fal_surfaced_alongside_other_plugins(self, monkeypatch):
from hermes_cli import tools_config
# Simulate a FAL plugin being registered — the picker already has
# hardcoded FAL rows in TOOL_CATEGORIES, so plugin-FAL must be
# skipped to avoid showing FAL twice.
# After #26241, FAL is itself a plugin (`plugins/image_gen/fal/`)
# and the hardcoded `TOOL_CATEGORIES["image_gen"]` FAL row is
# gone. The plugin-row builder therefore surfaces it like any
# other backend — no deduplication step needed.
image_gen_registry.register_provider(_FakeProvider("fal"))
image_gen_registry.register_provider(_FakeProvider("openai"))
rows = tools_config._plugin_image_gen_providers()
names = [r.get("image_gen_plugin_name") for r in rows]
assert "fal" not in names
assert "fal" in names
assert "openai" in names
def test_visible_providers_includes_plugins_for_image_gen(self, monkeypatch):

View file

@ -1,4 +1,4 @@
"""Tests for ``install_cua_driver`` upgrade semantics.
"""Tests for ``install_cua_driver`` upgrade semantics and architecture pre-check.
The cua-driver upstream installer always pulls the latest release tag, so
re-running it is the canonical upgrade path. ``install_cua_driver(upgrade=True)``
@ -10,18 +10,18 @@ must:
fix for the "we only pulled cua-driver once on enable" complaint).
* Preserve original ``upgrade=False`` behaviour for the toolset-enable flow:
skip if installed, install otherwise, warn on non-macOS.
* Pre-check architecture compatibility before downloading to avoid raw 404
errors on Intel macOS when the upstream release lacks x86_64 assets.
"""
from __future__ import annotations
from unittest.mock import patch
import json
from unittest.mock import MagicMock, patch
class TestInstallCuaDriverUpgrade:
def test_upgrade_on_non_macos_is_silent_noop(self):
"""``hermes update`` calls install_cua_driver(upgrade=True) for every
user. On Linux/Windows it must return False without printing the
"macOS-only; skipping" warning that the toolset-enable path emits."""
from hermes_cli import tools_config
with patch.object(tools_config, "_print_warning") as warn, \
@ -30,8 +30,6 @@ class TestInstallCuaDriverUpgrade:
warn.assert_not_called()
def test_non_upgrade_on_non_macos_warns(self):
"""The toolset-enable path (upgrade=False) should still warn loudly
when the user tries to enable Computer Use on a non-macOS host."""
from hermes_cli import tools_config
with patch.object(tools_config, "_print_warning") as warn, \
@ -40,43 +38,36 @@ class TestInstallCuaDriverUpgrade:
warn.assert_called()
def test_upgrade_on_macos_with_binary_runs_installer(self):
"""When cua-driver is already on PATH and upgrade=True, we must
re-run the upstream installer (this is the fix for the bug report).
"""
from hermes_cli import tools_config
with patch("platform.system", return_value="Darwin"), \
patch.object(tools_config.shutil, "which",
side_effect=lambda n: "/usr/local/bin/" + n
if n in {"cua-driver", "curl"} else None), \
patch.object(tools_config, "_check_cua_driver_asset_for_arch",
return_value=True), \
patch.object(tools_config, "_run_cua_driver_installer",
return_value=True) as runner, \
patch("subprocess.run"):
assert tools_config.install_cua_driver(upgrade=True) is True
runner.assert_called_once()
# Refresh path uses non-verbose mode so we don't re-print the
# "grant macOS permissions" block on every `hermes update`.
kwargs = runner.call_args.kwargs
assert kwargs.get("verbose") is False
def test_upgrade_on_macos_without_binary_runs_installer(self):
"""upgrade=True with cua-driver missing must still trigger an
install equivalent to a fresh install. (Don't silently no-op.)"""
from hermes_cli import tools_config
with patch("platform.system", return_value="Darwin"), \
patch.object(tools_config.shutil, "which",
side_effect=lambda n: "/usr/bin/curl" if n == "curl" else None), \
patch.object(tools_config, "_check_cua_driver_asset_for_arch",
return_value=True), \
patch.object(tools_config, "_run_cua_driver_installer",
return_value=True) as runner:
assert tools_config.install_cua_driver(upgrade=True) is True
runner.assert_called_once()
def test_non_upgrade_on_macos_with_binary_skips_install(self):
"""Original toolset-enable behaviour: cua-driver already installed
+ upgrade=False confirm and return without re-running installer.
This is the behaviour that ``hermes tools`` (re)enable depends on,
so the new helper must not regress it."""
from hermes_cli import tools_config
with patch("platform.system", return_value="Darwin"), \
@ -89,27 +80,133 @@ class TestInstallCuaDriverUpgrade:
runner.assert_not_called()
def test_non_upgrade_on_macos_without_binary_runs_installer(self):
"""Original fresh-install path must still work."""
from hermes_cli import tools_config
with patch("platform.system", return_value="Darwin"), \
patch.object(tools_config.shutil, "which",
side_effect=lambda n: "/usr/bin/curl" if n == "curl" else None), \
patch.object(tools_config, "_check_cua_driver_asset_for_arch",
return_value=True), \
patch.object(tools_config, "_run_cua_driver_installer",
return_value=True) as runner:
assert tools_config.install_cua_driver(upgrade=False) is True
runner.assert_called_once()
def test_upgrade_without_curl_does_not_crash(self):
"""If curl isn't on PATH we can't refresh — must warn and return
the current install state, not raise."""
class TestCheckCuaDriverAssetForArch:
def test_arm64_always_returns_true(self):
from hermes_cli import tools_config
# cua-driver present, curl missing.
def _which(name):
return "/usr/local/bin/cua-driver" if name == "cua-driver" else None
with patch("platform.machine", return_value="arm64"):
assert tools_config._check_cua_driver_asset_for_arch() is True
def test_x86_64_with_asset_returns_true(self):
from hermes_cli import tools_config
release = {
"tag_name": "cua-driver-v0.1.6",
"assets": [
{"name": "cua-driver-0.1.6-darwin-arm64.tar.gz"},
{"name": "cua-driver-0.1.6-darwin-x86_64.tar.gz"},
],
}
mock_resp = MagicMock()
mock_resp.read.return_value = json.dumps(release).encode()
mock_resp.__enter__ = lambda s: s
mock_resp.__exit__ = MagicMock(return_value=False)
with patch("platform.machine", return_value="x86_64"), \
patch("urllib.request.urlopen", return_value=mock_resp):
assert tools_config._check_cua_driver_asset_for_arch() is True
def test_x86_64_without_asset_returns_false(self):
from hermes_cli import tools_config
release = {
"tag_name": "cua-driver-v0.1.6",
"assets": [
{"name": "cua-driver-0.1.6-darwin-arm64.tar.gz"},
{"name": "cua-driver.tar.gz"},
],
}
mock_resp = MagicMock()
mock_resp.read.return_value = json.dumps(release).encode()
mock_resp.__enter__ = lambda s: s
mock_resp.__exit__ = MagicMock(return_value=False)
with patch("platform.machine", return_value="x86_64"), \
patch("urllib.request.urlopen", return_value=mock_resp), \
patch.object(tools_config, "_print_warning") as warn, \
patch.object(tools_config, "_print_info"):
assert tools_config._check_cua_driver_asset_for_arch() is False
warn.assert_called_once()
assert "no Intel" in warn.call_args[0][0].lower() or "x86_64" in warn.call_args[0][0]
def test_x86_64_api_failure_returns_true(self):
"""Network failure should fail open — let the installer handle it."""
from hermes_cli import tools_config
with patch("platform.machine", return_value="x86_64"), \
patch("urllib.request.urlopen", side_effect=Exception("timeout")):
assert tools_config._check_cua_driver_asset_for_arch() is True
def test_fresh_install_x86_64_no_asset_skips_installer(self):
"""When the latest release has no Intel asset, skip the installer."""
from hermes_cli import tools_config
release = {
"tag_name": "cua-driver-v0.1.6",
"assets": [{"name": "cua-driver-0.1.6-darwin-arm64.tar.gz"}],
}
mock_resp = MagicMock()
mock_resp.read.return_value = json.dumps(release).encode()
mock_resp.__enter__ = lambda s: s
mock_resp.__exit__ = MagicMock(return_value=False)
with patch("platform.system", return_value="Darwin"), \
patch.object(tools_config.shutil, "which", side_effect=_which), \
patch.object(tools_config, "_print_warning"):
patch.object(tools_config.shutil, "which",
side_effect=lambda n: "/usr/bin/curl" if n == "curl" else None), \
patch("platform.machine", return_value="x86_64"), \
patch("urllib.request.urlopen", return_value=mock_resp), \
patch.object(tools_config, "_print_warning"), \
patch.object(tools_config, "_print_info"), \
patch.object(tools_config, "_run_cua_driver_installer") as runner:
assert tools_config.install_cua_driver(upgrade=False) is False
runner.assert_not_called()
def test_upgrade_x86_64_no_asset_returns_existing_status(self):
"""On upgrade with no Intel asset, return whether binary existed."""
from hermes_cli import tools_config
release = {
"tag_name": "cua-driver-v0.1.6",
"assets": [{"name": "cua-driver-0.1.6-darwin-arm64.tar.gz"}],
}
mock_resp = MagicMock()
mock_resp.read.return_value = json.dumps(release).encode()
mock_resp.__enter__ = lambda s: s
mock_resp.__exit__ = MagicMock(return_value=False)
# With binary installed — returns True (binary exists)
with patch("platform.system", return_value="Darwin"), \
patch.object(tools_config.shutil, "which",
side_effect=lambda n: "/usr/local/bin/" + n
if n in ("cua-driver", "curl") else None), \
patch("platform.machine", return_value="x86_64"), \
patch("urllib.request.urlopen", return_value=mock_resp), \
patch.object(tools_config, "_print_warning"), \
patch.object(tools_config, "_print_info"), \
patch.object(tools_config, "_run_cua_driver_installer") as runner:
assert tools_config.install_cua_driver(upgrade=True) is True
runner.assert_not_called()
# Without binary — returns False
with patch("platform.system", return_value="Darwin"), \
patch.object(tools_config.shutil, "which",
side_effect=lambda n: "/usr/bin/curl" if n == "curl" else None), \
patch("platform.machine", return_value="x86_64"), \
patch("urllib.request.urlopen", return_value=mock_resp), \
patch.object(tools_config, "_print_warning"), \
patch.object(tools_config, "_print_info"), \
patch.object(tools_config, "_run_cua_driver_installer") as runner:
assert tools_config.install_cua_driver(upgrade=True) is False
runner.assert_not_called()

View file

@ -1470,6 +1470,138 @@ def test_worktree_workspace_returns_intended_path(kanban_home, tmp_path):
assert str(ws) == target
# ---------------------------------------------------------------------------
# Scratch cleanup containment (#28818)
# ---------------------------------------------------------------------------
def test_cleanup_workspace_removes_managed_scratch_dir(kanban_home):
"""A scratch workspace under the kanban workspaces root is removed."""
with kb.connect() as conn:
t = kb.create_task(conn, title="scratchy")
task = kb.get_task(conn, t)
ws = kb.resolve_workspace(task)
kb.set_workspace_path(conn, t, ws)
assert ws.is_dir()
kb.complete_task(conn, t, result="ok")
assert not ws.exists(), "Hermes-managed scratch dir should be cleaned up"
def test_cleanup_workspace_refuses_path_outside_scratch_root(kanban_home, tmp_path):
"""A scratch task with a user path outside the workspaces root must NOT be deleted (#28818).
Reproduces the data-loss vector where a board's ``default_workdir`` is set
to a real source directory; tasks created without an explicit
``workspace_kind`` inherit ``scratch`` semantics, and the old cleanup path
would ``shutil.rmtree`` the user's source tree on task completion.
"""
real_source = tmp_path / "real-source"
real_source.mkdir()
(real_source / ".git").mkdir()
(real_source / "README.md").write_text("important", encoding="utf-8")
with kb.connect() as conn:
t = kb.create_task(conn, title="ship")
# Simulate the bad state directly: workspace_kind='scratch' (default)
# but workspace_path pointing at the user's real source tree, which is
# exactly what board.default_workdir produces when the task is created
# without an explicit workspace_kind.
conn.execute(
"UPDATE tasks SET workspace_kind=?, workspace_path=? WHERE id=?",
("scratch", str(real_source), t),
)
conn.commit()
kb.complete_task(conn, t, result="ok")
assert real_source.exists(), "User source tree must not be deleted by scratch cleanup"
assert (real_source / ".git").exists()
assert (real_source / "README.md").read_text(encoding="utf-8") == "important"
def test_cleanup_workspace_honors_workspaces_root_env_override(tmp_path, monkeypatch):
"""``HERMES_KANBAN_WORKSPACES_ROOT`` extends the managed-scratch set.
Worker subprocesses run with this env var injected by the dispatcher. The
cleanup containment check must treat paths under it as managed even when
they sit outside the active kanban home.
"""
home = tmp_path / ".hermes"
home.mkdir()
monkeypatch.setenv("HERMES_HOME", str(home))
monkeypatch.setattr(Path, "home", lambda: tmp_path)
workspaces_override = tmp_path / "ext-workspaces"
workspaces_override.mkdir()
monkeypatch.setenv("HERMES_KANBAN_WORKSPACES_ROOT", str(workspaces_override))
kb.init_db()
with kb.connect() as conn:
t = kb.create_task(conn, title="ext")
scratch_dir = workspaces_override / t
scratch_dir.mkdir()
conn.execute(
"UPDATE tasks SET workspace_kind=?, workspace_path=? WHERE id=?",
("scratch", str(scratch_dir), t),
)
conn.commit()
kb.complete_task(conn, t, result="ok")
assert not scratch_dir.exists(), "Override-root scratch dir should be cleaned up"
def test_is_managed_scratch_path_accepts_per_board_workspaces(kanban_home, tmp_path):
"""Per-board scratch dirs under ``<kanban_home>/kanban/boards/<slug>/workspaces`` are managed."""
board_scratch = kanban_home / "kanban" / "boards" / "my-board" / "workspaces" / "task-1"
board_scratch.mkdir(parents=True)
assert kb._is_managed_scratch_path(board_scratch)
def test_is_managed_scratch_path_rejects_real_source_tree(kanban_home, tmp_path):
"""A path outside any managed root (e.g. a user's repo) is NOT managed."""
real = tmp_path / "code" / "my-project"
real.mkdir(parents=True)
assert not kb._is_managed_scratch_path(real)
def test_is_managed_scratch_path_rejects_kanban_metadata_subtrees(kanban_home):
"""Hermes' own DB/metadata/log subtrees under ``<kanban_home>/kanban`` are NOT managed.
Regression guard for the Copilot finding on #28819: a scratch task whose
``workspace_path`` was mis-set to the kanban home, the logs dir, or a
board's metadata dir (i.e. the board root itself, not its ``workspaces/``
child) must be refused. Without this, the containment check would happily
``shutil.rmtree`` Hermes' DB/metadata/logs on task completion.
"""
kanban_root = kanban_home / "kanban"
kanban_root.mkdir(parents=True, exist_ok=True)
assert not kb._is_managed_scratch_path(kanban_root)
logs_dir = kanban_root / "logs"
logs_dir.mkdir(parents=True, exist_ok=True)
assert not kb._is_managed_scratch_path(logs_dir)
board_root = kanban_root / "boards" / "my-board"
board_root.mkdir(parents=True, exist_ok=True)
# The board root itself is NOT a managed scratch dir — only the
# ``workspaces/`` child (and its descendants) are.
assert not kb._is_managed_scratch_path(board_root)
# Sibling subtrees of ``workspaces/`` under a board (e.g. its kanban.db
# or board.json living next to ``workspaces/``) are also not managed.
board_logs = board_root / "logs"
board_logs.mkdir(parents=True, exist_ok=True)
assert not kb._is_managed_scratch_path(board_logs)
# Now create the board's workspaces dir and a task scratch dir under it —
# the latter is the only thing the guard should allow.
board_workspaces = board_root / "workspaces"
board_workspaces.mkdir(parents=True, exist_ok=True)
# The workspaces root itself is also NOT managed — deleting it would
# wipe every task's scratch dir at once.
assert not kb._is_managed_scratch_path(board_workspaces)
task_dir = board_workspaces / "task-42"
task_dir.mkdir(parents=True, exist_ok=True)
assert kb._is_managed_scratch_path(task_dir)
# ---------------------------------------------------------------------------
# Tenancy
# ---------------------------------------------------------------------------
@ -2464,13 +2596,32 @@ def test_task_dict_survives_corrupt_created_at(tmp_path, monkeypatch):
# ---------------------------------------------------------------------------
def test_create_task_without_workspace_inherits_board_default_workdir(kanban_home, monkeypatch):
"""Board with default_workdir → create_task without workspace_path → inherits default."""
def test_create_task_scratch_without_workspace_ignores_board_default_workdir(kanban_home, monkeypatch):
"""Scratch tasks must NOT inherit board.default_workdir — would point auto-cleanup
at the user's source tree on completion (#28818)."""
default_wd = "/home/user/project"
kb.create_board("work-proj", default_workdir=default_wd)
with kb.connect(board="work-proj") as conn:
tid = kb.create_task(conn, title="inherited", board="work-proj")
tid = kb.create_task(conn, title="scratch-task", board="work-proj")
t = kb.get_task(conn, tid)
assert t is not None
assert t.workspace_kind == "scratch"
assert t.workspace_path is None
def test_create_task_dir_without_workspace_inherits_board_default_workdir(kanban_home, monkeypatch):
"""Board default_workdir is for persistent dir/worktree workspaces, not scratch."""
default_wd = "/home/user/project"
kb.create_board("work-proj-dir", default_workdir=default_wd)
with kb.connect(board="work-proj-dir") as conn:
tid = kb.create_task(
conn,
title="inherited",
workspace_kind="dir",
board="work-proj-dir",
)
t = kb.get_task(conn, tid)
assert t is not None
assert t.workspace_path == default_wd
@ -2981,3 +3132,210 @@ def test_detect_stale_does_not_tick_failure_counter(kanban_home, monkeypatch):
assert "stale" in kinds, (
f"Expected 'stale' event in task_events; got {kinds!r}"
)
# ---------------------------------------------------------------------------
# Corruption guard (issue #30687)
# ---------------------------------------------------------------------------
def _write_corrupt_db(path: Path) -> bytes:
"""Write a kanban DB with a VALID SQLite header but malformed page content.
This is the corruption shape the integrity guard specifically targets
(e.g. issue #29507 follow-up reports where the file's first 16 bytes
pass the header byte check but ``PRAGMA integrity_check`` then fails
because the internal pages are damaged). It's what main's header-only
validator was letting through, and what this PR adds the full guard
for.
"""
# 100-byte SQLite header (magic + minimal valid-looking fields) so the
# cheap header check passes, then deliberate garbage so sqlite refuses
# to read the file past the header.
header = b"SQLite format 3\x00" + b"\x10\x00\x02\x02\x00\x40\x20\x20"
header += b"\x00\x00\x00\x0c\x00\x00\x23\x46\x00\x00\x00\x00"
header = header.ljust(100, b"\x00")
payload = b"definitely not a valid sqlite page \x00\x01\x02\x03" * 64
blob = header + payload
path.write_bytes(blob)
return blob
def test_init_db_refuses_corrupt_existing_file(tmp_path):
db_path = tmp_path / "kanban.db"
original = _write_corrupt_db(db_path)
# Ensure the cache doesn't mask the guard.
kb._INITIALIZED_PATHS.discard(str(db_path.resolve()))
with pytest.raises(kb.KanbanDbCorruptError) as excinfo:
kb.init_db(db_path=db_path)
err = excinfo.value
assert err.db_path == db_path
assert err.backup_path is not None
assert err.backup_path.exists()
assert err.backup_path.read_bytes() == original
# Original bytes untouched — no schema was written on top.
assert db_path.read_bytes() == original
assert str(db_path) in str(err)
assert str(err.backup_path) in str(err)
def test_connect_refuses_corrupt_existing_file(tmp_path):
db_path = tmp_path / "kanban.db"
_write_corrupt_db(db_path)
kb._INITIALIZED_PATHS.discard(str(db_path.resolve()))
with pytest.raises(kb.KanbanDbCorruptError):
kb.connect(db_path=db_path)
def test_locked_healthy_db_does_not_classify_as_corrupt(tmp_path, monkeypatch):
"""A transient lock during the probe must not produce a .corrupt backup
and must not be reported as :class:`KanbanDbCorruptError`. Raw sqlite
``OperationalError`` (lock/busy) is acceptable and expected."""
db_path = tmp_path / "kanban.db"
kb.init_db(db_path=db_path)
kb._INITIALIZED_PATHS.discard(str(db_path.resolve()))
real_connect = sqlite3.connect
def flaky_connect(*args, **kwargs):
# First call is the integrity probe — simulate a lock.
raise sqlite3.OperationalError("database is locked")
monkeypatch.setattr(kb.sqlite3, "connect", flaky_connect)
with pytest.raises(sqlite3.OperationalError):
kb.connect(db_path=db_path)
# No .corrupt backup may be produced for a healthy-but-locked DB.
backups = list(tmp_path.glob("*.corrupt.*"))
assert backups == [], f"unexpected corrupt backups: {backups}"
# And once the lock clears, normal access still works.
monkeypatch.setattr(kb.sqlite3, "connect", real_connect)
with kb.connect(db_path=db_path) as conn:
kb.create_task(conn, title="still here")
titles = [t.title for t in kb.list_tasks(conn)]
assert "still here" in titles
def test_init_db_allows_missing_then_healthy(tmp_path):
db_path = tmp_path / "fresh.db"
assert not db_path.exists()
kb.init_db(db_path=db_path)
assert db_path.exists() and db_path.stat().st_size > 0
# Idempotent on a healthy DB: data survives a second init.
with kb.connect(db_path=db_path) as conn:
kb.create_task(conn, title="keeps")
kb.init_db(db_path=db_path)
with kb.connect(db_path=db_path) as conn:
tasks = kb.list_tasks(conn)
assert [t.title for t in tasks] == ["keeps"]
# ---------------------------------------------------------------------------
# First-use tip for scratch workspaces
# ---------------------------------------------------------------------------
def test_maybe_emit_scratch_tip_fires_once_per_install(kanban_home, caplog):
"""First scratch workspace materialization warns + emits an event.
Subsequent scratch workspaces on the SAME install stay silent the
sentinel file under kanban_home() flips after the first emit.
"""
import logging
with kb.connect() as conn:
t1 = kb.create_task(conn, title="first scratch")
t2 = kb.create_task(conn, title="second scratch")
# Sentinel must not exist yet on a fresh install.
assert not kb._scratch_tip_shown()
with caplog.at_level(logging.WARNING, logger="hermes_cli.kanban_db"):
with kb.connect() as conn:
kb._maybe_emit_scratch_tip(conn, t1, "scratch")
# Sentinel is now set.
assert kb._scratch_tip_shown()
assert kb._scratch_tip_sentinel_path().exists()
# Warning was logged exactly once.
tip_records = [
r for r in caplog.records
if "scratch workspaces are ephemeral" in r.getMessage()
]
assert len(tip_records) == 1, (
f"Expected exactly one tip warning, got {len(tip_records)}: "
f"{[r.getMessage() for r in tip_records]!r}"
)
# An event row was appended on the first task.
with kb.connect() as conn:
events = conn.execute(
"SELECT kind FROM task_events WHERE task_id = ? ORDER BY id",
(t1,),
).fetchall()
kinds = [e["kind"] for e in events]
assert "tip_scratch_workspace" in kinds, (
f"Expected tip_scratch_workspace event on first scratch task; "
f"got {kinds!r}"
)
# Second scratch materialization on the same install stays silent.
caplog.clear()
with caplog.at_level(logging.WARNING, logger="hermes_cli.kanban_db"):
with kb.connect() as conn:
kb._maybe_emit_scratch_tip(conn, t2, "scratch")
tip_records2 = [
r for r in caplog.records
if "scratch workspaces are ephemeral" in r.getMessage()
]
assert tip_records2 == [], (
f"Tip should not re-fire after sentinel is set; got "
f"{[r.getMessage() for r in tip_records2]!r}"
)
with kb.connect() as conn:
events2 = conn.execute(
"SELECT kind FROM task_events WHERE task_id = ? ORDER BY id",
(t2,),
).fetchall()
assert "tip_scratch_workspace" not in [e["kind"] for e in events2], (
"Tip event should not be appended for subsequent scratch tasks."
)
def test_maybe_emit_scratch_tip_skips_non_scratch_workspaces(kanban_home, caplog):
"""worktree/dir workspaces are preserved on completion and must not
trigger the scratch-cleanup tip."""
import logging
with kb.connect() as conn:
t_wt = kb.create_task(conn, title="worktree task")
t_dir = kb.create_task(conn, title="dir task")
assert not kb._scratch_tip_shown()
with caplog.at_level(logging.WARNING, logger="hermes_cli.kanban_db"):
with kb.connect() as conn:
kb._maybe_emit_scratch_tip(conn, t_wt, "worktree")
kb._maybe_emit_scratch_tip(conn, t_dir, "dir")
# Sentinel stays unset — these workspaces are preserved by design,
# so the warning is irrelevant for them and we save the one-shot
# for a real scratch user.
assert not kb._scratch_tip_shown()
tip_records = [
r for r in caplog.records
if "scratch workspaces are ephemeral" in r.getMessage()
]
assert tip_records == []
with kb.connect() as conn:
for tid in (t_wt, t_dir):
events = conn.execute(
"SELECT kind FROM task_events WHERE task_id = ?", (tid,),
).fetchall()
assert "tip_scratch_workspace" not in [e["kind"] for e in events]

View file

@ -17,6 +17,11 @@ def kanban_home(tmp_path, monkeypatch):
home.mkdir()
monkeypatch.setenv("HERMES_HOME", str(home))
monkeypatch.setattr(Path, "home", lambda: tmp_path)
# Allow the kanban notifier path-validator to upload artifacts the
# tests write under ``tmp_path``. Without this, every artifact-delivery
# test silently drops files because ``tmp_path`` isn't inside the
# default ``MEDIA_DELIVERY_SAFE_ROOTS`` cache dirs.
monkeypatch.setenv("HERMES_MEDIA_ALLOW_DIRS", str(tmp_path))
kb.init_db()
return home
@ -482,7 +487,7 @@ async def test_gateway_create_autosubscribes_on_explicit_board(kanban_home):
@pytest.mark.asyncio
async def test_notifier_uploads_artifacts_on_completion(kanban_home, tmp_path):
async def test_notifier_uploads_artifacts_on_completion(kanban_home, tmp_path, monkeypatch):
"""When a completed event carries ``artifacts`` in its payload, the
notifier uploads each file to the subscribed chat as a native
attachment. Images batch through send_multiple_images; documents
@ -494,6 +499,13 @@ async def test_notifier_uploads_artifacts_on_completion(kanban_home, tmp_path):
from gateway.config import Platform
from tools import kanban_tools as kt
# ``_deliver_kanban_artifacts`` routes candidates through
# ``BasePlatformAdapter.filter_local_delivery_paths``, which only accepts
# paths under ``MEDIA_DELIVERY_SAFE_ROOTS`` or roots explicitly allowlisted
# via ``HERMES_MEDIA_ALLOW_DIRS``. Test fixtures live under ``tmp_path``,
# so allowlist it for the duration of the test.
monkeypatch.setenv("HERMES_MEDIA_ALLOW_DIRS", str(tmp_path))
# Materialize real files so os.path.isfile passes inside the helper.
chart_path = tmp_path / "q3-revenue.png"
chart_path.write_bytes(b"PNG-fake-bytes")
@ -572,7 +584,7 @@ async def test_notifier_uploads_artifacts_on_completion(kanban_home, tmp_path):
@pytest.mark.asyncio
async def test_notifier_artifact_delivery_skips_missing_files(kanban_home, tmp_path):
async def test_notifier_artifact_delivery_skips_missing_files(kanban_home, tmp_path, monkeypatch):
"""Missing artifact paths are silently skipped — they may have been
referenced by name only. The notifier must not crash and must still
deliver any artifacts that do exist."""
@ -581,6 +593,10 @@ async def test_notifier_artifact_delivery_skips_missing_files(kanban_home, tmp_p
from gateway.config import Platform
from tools import kanban_tools as kt
# Allow ``tmp_path`` through the media-delivery safety filter. See the
# companion test for the full explanation.
monkeypatch.setenv("HERMES_MEDIA_ALLOW_DIRS", str(tmp_path))
real_pdf = tmp_path / "real.pdf"
real_pdf.write_bytes(b"%PDF-fake")

View file

@ -0,0 +1,254 @@
"""Tests for the kanban `promote` verb (issue #28822).
The realistic bug scenario from #28822 is: a child task ends up in
``todo`` with all its parents already ``done`` (because the
auto-promote daemon hasn't run, or a manual close raced it).
Direct-SQL setup is used to construct that state deterministically.
"""
from __future__ import annotations
import argparse
import json
from pathlib import Path
import pytest
from hermes_cli import kanban as kb_cli
from hermes_cli import kanban_db as kb
@pytest.fixture
def kanban_home(tmp_path, monkeypatch):
home = tmp_path / ".hermes"
home.mkdir()
monkeypatch.setenv("HERMES_HOME", str(home))
monkeypatch.setattr(Path, "home", lambda: tmp_path)
db_path = kb.kanban_db_path(board="default")
kb._INITIALIZED_PATHS.discard(str(db_path.resolve()))
kb.init_db()
return home
@pytest.fixture
def conn(kanban_home):
with kb.connect() as c:
yield c
def _stuck_todo(conn, *, parents_done=True, n_parents=1):
"""Build the #28822 scenario: child in 'todo' whose parents may
have closed as 'done' without the auto-promote logic firing.
"""
parent_ids = [
kb.create_task(conn, title=f"parent{i}", assignee="setup")
for i in range(n_parents)
]
child_id = kb.create_task(
conn, title="child", parents=parent_ids, assignee="setup"
)
assert kb.get_task(conn, child_id).status == "todo"
if parents_done:
for pid in parent_ids:
conn.execute(
"UPDATE tasks SET status='done' WHERE id=?", (pid,)
)
return child_id, parent_ids
def test_promote_stuck_todo_succeeds(conn):
child, _ = _stuck_todo(conn, parents_done=True)
ok, err = kb.promote_task(conn, child, actor="tester")
assert ok and err is None
assert kb.get_task(conn, child).status == "ready"
def test_promote_refuses_when_parent_not_done(conn):
child, parents = _stuck_todo(conn, parents_done=False)
ok, err = kb.promote_task(conn, child, actor="tester")
assert ok is False
assert err is not None and "unsatisfied parent dependencies" in err
assert parents[0] in err
assert kb.get_task(conn, child).status == "todo"
def test_promote_with_force_bypasses_dependency_check(conn):
child, _ = _stuck_todo(conn, parents_done=False)
ok, err = kb.promote_task(
conn, child, actor="tester", reason="recovery", force=True
)
assert ok and err is None
assert kb.get_task(conn, child).status == "ready"
def test_promote_emits_audit_event(conn):
child, _ = _stuck_todo(conn, parents_done=True)
kb.promote_task(conn, child, actor="tester", reason="manual recovery")
ev = conn.execute(
"SELECT kind, payload FROM task_events "
"WHERE task_id = ? AND kind = 'promoted_manual'",
(child,),
).fetchone()
assert ev is not None
payload = json.loads(ev["payload"])
assert payload["actor"] == "tester"
assert payload["reason"] == "manual recovery"
assert payload["forced"] is False
def test_promote_force_records_forced_flag(conn):
child, _ = _stuck_todo(conn, parents_done=False)
kb.promote_task(conn, child, actor="tester", force=True, reason="r")
ev = conn.execute(
"SELECT payload FROM task_events "
"WHERE task_id = ? AND kind = 'promoted_manual'",
(child,),
).fetchone()
assert json.loads(ev["payload"])["forced"] is True
def test_promote_does_not_change_assignee(conn):
child, _ = _stuck_todo(conn, parents_done=True)
before = kb.get_task(conn, child).assignee
kb.promote_task(conn, child, actor="someone_else")
after = kb.get_task(conn, child).assignee
assert before == after
def test_promote_dry_run_does_not_mutate(conn):
child, _ = _stuck_todo(conn, parents_done=True)
ok, err = kb.promote_task(conn, child, actor="tester", dry_run=True)
assert ok and err is None
assert kb.get_task(conn, child).status == "todo"
n = conn.execute(
"SELECT COUNT(*) AS n FROM task_events "
"WHERE task_id = ? AND kind = 'promoted_manual'",
(child,),
).fetchone()["n"]
assert n == 0
def test_promote_dry_run_reports_dependency_failure(conn):
child, _ = _stuck_todo(conn, parents_done=False)
ok, err = kb.promote_task(conn, child, actor="tester", dry_run=True)
assert ok is False
assert err is not None and "unsatisfied" in err
def test_promote_rejects_non_todo_status(conn):
tid = kb.create_task(conn, title="standalone")
assert kb.get_task(conn, tid).status == "ready"
ok, err = kb.promote_task(conn, tid, actor="tester")
assert ok is False
assert "'ready'" in err and "promote only applies" in err
def test_promote_rejects_unknown_task(conn):
ok, err = kb.promote_task(conn, "t_doesnotexist", actor="tester")
assert ok is False
assert err is not None and "not found" in err
def test_promote_blocked_task_works(conn):
tid = kb.create_task(conn, title="t")
conn.execute("UPDATE tasks SET status='blocked' WHERE id=?", (tid,))
ok, err = kb.promote_task(
conn, tid, actor="tester", reason="ready now"
)
assert ok and err is None
assert kb.get_task(conn, tid).status == "ready"
# ---------------------------------------------------------------------------
# CLI `_cmd_promote` — bulk via `--ids` (the issue's anti-respawn use case:
# promote all children of a closed parent in one command).
# ---------------------------------------------------------------------------
def _promote_ns(task_id, *, ids=None, reason=None, force=False,
dry_run=False, as_json=False):
return argparse.Namespace(
task_id=task_id,
reason=list(reason or []),
ids=list(ids or []) or None,
force=force,
dry_run=dry_run,
json=as_json,
)
def test_cli_promote_bulk_ids_promotes_all(kanban_home, capsys):
with kb.connect() as conn:
parent = kb.create_task(conn, title="parent")
children = [
kb.create_task(conn, title=f"c{i}", parents=[parent])
for i in range(3)
]
conn.execute("UPDATE tasks SET status='done' WHERE id=?", (parent,))
rc = kb_cli._cmd_promote(_promote_ns(children[0], ids=children[1:]))
assert rc == 0
out = capsys.readouterr().out
for c in children:
assert c in out
with kb.connect() as conn:
for c in children:
assert kb.get_task(conn, c).status == "ready"
def test_cli_promote_bulk_partial_failure_exits_1(kanban_home, capsys):
"""Bulk with one bad id: good ones still promote, exit code reflects failure."""
with kb.connect() as conn:
parent = kb.create_task(conn, title="parent")
good = kb.create_task(conn, title="good", parents=[parent])
conn.execute("UPDATE tasks SET status='done' WHERE id=?", (parent,))
rc = kb_cli._cmd_promote(_promote_ns(good, ids=["t_nope"]))
assert rc == 1
captured = capsys.readouterr()
assert good in captured.out # good one promoted
assert "t_nope" in captured.err and "not found" in captured.err
with kb.connect() as conn:
assert kb.get_task(conn, good).status == "ready"
def test_cli_promote_bulk_json_emits_list(kanban_home, capsys):
with kb.connect() as conn:
parent = kb.create_task(conn, title="parent")
a = kb.create_task(conn, title="a", parents=[parent])
b = kb.create_task(conn, title="b", parents=[parent])
conn.execute("UPDATE tasks SET status='done' WHERE id=?", (parent,))
rc = kb_cli._cmd_promote(_promote_ns(a, ids=[b], as_json=True))
assert rc == 0
payload = json.loads(capsys.readouterr().out)
assert isinstance(payload, list) and len(payload) == 2
assert {r["task_id"] for r in payload} == {a, b}
assert all(r["promoted"] for r in payload)
def test_cli_promote_single_json_stays_flat_object(kanban_home, capsys):
"""Back-compat: single-id JSON is still a flat object, not a list."""
with kb.connect() as conn:
parent = kb.create_task(conn, title="parent")
child = kb.create_task(conn, title="c", parents=[parent])
conn.execute("UPDATE tasks SET status='done' WHERE id=?", (parent,))
rc = kb_cli._cmd_promote(_promote_ns(child, as_json=True))
assert rc == 0
payload = json.loads(capsys.readouterr().out)
assert isinstance(payload, dict)
assert payload["task_id"] == child and payload["promoted"] is True
def test_cli_promote_dedupes_duplicate_ids(kanban_home, capsys):
"""Same id in positional + --ids must only attempt the promotion once."""
with kb.connect() as conn:
parent = kb.create_task(conn, title="parent")
child = kb.create_task(conn, title="c", parents=[parent])
conn.execute("UPDATE tasks SET status='done' WHERE id=?", (parent,))
rc = kb_cli._cmd_promote(_promote_ns(child, ids=[child, child]))
assert rc == 0
with kb.connect() as conn:
n = conn.execute(
"SELECT COUNT(*) AS n FROM task_events "
"WHERE task_id = ? AND kind = 'promoted_manual'",
(child,),
).fetchone()["n"]
assert n == 1

View file

@ -0,0 +1,214 @@
"""Regression tests for Nous Portal inference_base_url host-allowlist validation.
A poisoned ``inference_base_url`` from the Portal refresh / agent-key-mint
response (network MITM, malicious response injection) would otherwise be
persisted to auth.json and forwarded the user's legitimate agent_key
bearer on every subsequent proxy request, exfiltrating their inference
budget and opening a response-injection channel into the IDE / chat
client. ``_validate_nous_inference_url_from_network()`` blocks any URL
outside the allowlist at the source.
These tests verify:
1. The validator's host + scheme rules.
2. Each of the five NETWORK call sites in ``auth.py`` calls the validator
rather than the unrestricted ``_optional_base_url`` helper.
3. The proxy adapter applies the validator as belt-and-suspenders.
4. The env-var override path (``NOUS_INFERENCE_BASE_URL``) is NOT
gated by the validator that's the documented dev/staging escape
hatch.
"""
from __future__ import annotations
import logging
import pytest
from hermes_cli.auth import (
DEFAULT_NOUS_INFERENCE_URL,
_ALLOWED_NOUS_INFERENCE_HOSTS,
_validate_nous_inference_url_from_network,
)
class TestValidatorRules:
def test_allowlisted_https_host_returned(self):
url = "https://inference-api.nousresearch.com/v1"
assert _validate_nous_inference_url_from_network(url) == url
def test_trailing_slash_stripped(self):
url = "https://inference-api.nousresearch.com/v1/"
assert _validate_nous_inference_url_from_network(url) == url.rstrip("/")
def test_attacker_host_rejected(self, caplog):
with caplog.at_level(logging.WARNING, logger="hermes_cli.auth"):
assert (
_validate_nous_inference_url_from_network("https://attacker.com/v1")
is None
)
assert any("attacker.com" in rec.message for rec in caplog.records)
def test_subdomain_of_allowlist_host_rejected(self):
"""*.nousresearch.com is NOT in the allowlist — exact hostname only.
A subdomain takeover or DNS hijack of *.nousresearch.com would
otherwise pass keep the gate tight.
"""
assert (
_validate_nous_inference_url_from_network(
"https://evil.inference-api.nousresearch.com/v1"
)
is None
)
def test_http_scheme_rejected(self, caplog):
with caplog.at_level(logging.WARNING, logger="hermes_cli.auth"):
assert (
_validate_nous_inference_url_from_network(
"http://inference-api.nousresearch.com/v1"
)
is None
)
assert any("non-https" in rec.message for rec in caplog.records)
def test_file_scheme_rejected(self):
assert (
_validate_nous_inference_url_from_network("file:///etc/passwd") is None
)
def test_javascript_scheme_rejected(self):
assert (
_validate_nous_inference_url_from_network(
"javascript:alert(document.cookie)"
)
is None
)
def test_empty_string_rejected(self):
assert _validate_nous_inference_url_from_network("") is None
def test_whitespace_only_rejected(self):
assert _validate_nous_inference_url_from_network(" ") is None
def test_none_rejected(self):
assert _validate_nous_inference_url_from_network(None) is None
def test_non_string_rejected(self):
assert _validate_nous_inference_url_from_network(12345) is None # type: ignore[arg-type]
assert _validate_nous_inference_url_from_network({"url": "x"}) is None # type: ignore[arg-type]
def test_malformed_url_rejected(self):
"""Even garbled input must fall back safely, not raise."""
assert (
_validate_nous_inference_url_from_network("not://a real url at all")
is None
)
def test_default_inference_url_is_in_allowlist(self):
"""Sanity check: DEFAULT_NOUS_INFERENCE_URL must itself validate.
If anyone retargets the default away from
``inference-api.nousresearch.com``, they MUST update the allowlist
in the same change otherwise the allowlist would reject the
Portal's own legitimate default and break every install.
"""
assert (
_validate_nous_inference_url_from_network(DEFAULT_NOUS_INFERENCE_URL)
== DEFAULT_NOUS_INFERENCE_URL.rstrip("/")
)
def test_allowlist_contains_inference_api_host(self):
"""The default's host must be in the allowlist set."""
from urllib.parse import urlparse
host = urlparse(DEFAULT_NOUS_INFERENCE_URL).hostname
assert host in _ALLOWED_NOUS_INFERENCE_HOSTS
class TestCallSiteWiring:
"""Verify the validator is actually wired into all 5 NETWORK call sites.
These are not behaviour-end-to-end tests (the surrounding code is
several hundred lines per site with extensive HTTP mocking
requirements). They're text-grep contracts: if anyone replaces
``_validate_nous_inference_url_from_network`` with the un-validated
``_optional_base_url`` again, the test catches it.
Each site lives inside ``resolve_nous_runtime_credentials`` and one
helper (``_extend_state_from_refresh``). The shape we guard against
is ``<helper>_url = _optional_base_url(<payload>.get("inference_base_url"))``
that's what the unsafe pre-fix code looked like, and the only
semantic difference between the safe and unsafe helpers is the
host-allowlist check.
"""
def _read_auth_source(self):
import hermes_cli.auth as _auth_mod
from pathlib import Path
return Path(_auth_mod.__file__).read_text(encoding="utf-8")
def test_no_unvalidated_inference_base_url_assignments_remain(self):
"""No remaining ``_optional_base_url(...inference_base_url...)`` reads
from Portal payloads. If you see a failure here, you've either
added a new NETWORK site that needs validation, or downgraded an
existing one back to the unsafe helper."""
source = self._read_auth_source()
for needle in (
'_optional_base_url(refreshed.get("inference_base_url"))',
'_optional_base_url(mint_payload.get("inference_base_url"))',
):
assert needle not in source, (
f"Found unvalidated network read: {needle!r}. "
f"Use _validate_nous_inference_url_from_network() instead."
)
def test_validator_wired_at_all_known_call_sites(self):
"""All 5 known NETWORK sites use the validator. If this count
drops, someone removed protection; if it grows, audit the new
site to be sure validation is appropriate."""
source = self._read_auth_source()
refresh_count = source.count(
'_validate_nous_inference_url_from_network(refreshed.get("inference_base_url"))'
)
mint_count = source.count(
'_validate_nous_inference_url_from_network(mint_payload.get("inference_base_url"))'
)
assert refresh_count == 3, f"expected 3 refresh sites, found {refresh_count}"
assert mint_count == 2, f"expected 2 mint sites, found {mint_count}"
def test_proxy_adapter_also_validates(self):
"""The Nous proxy adapter applies the validator as defense-in-depth
even though auth.py already validates at the source, so a future
bypass at the source layer still gets caught at the forward
boundary."""
from pathlib import Path
import hermes_cli.proxy.adapters.nous_portal as _nous_adapter
source = Path(_nous_adapter.__file__).read_text(encoding="utf-8")
assert "_validate_nous_inference_url_from_network" in source
class TestEnvOverrideNotGated:
"""The documented dev/staging env-var override must keep working.
``NOUS_INFERENCE_BASE_URL`` is read by ``resolve_nous_runtime_credentials``
via ``os.getenv`` that path doesn't pass through the validator
(env values are trusted because the user set them themselves).
Verify the env-var read site does NOT consult the validator, so a
user running against a non-allowlisted staging host via env is not
inadvertently broken by this fix.
"""
def test_env_override_path_does_not_call_validator(self):
"""In resolve_nous_runtime_credentials, the env override is
read via os.getenv directly, not via the validator. Grep the
source to confirm: the env line should NOT mention the
validator."""
import hermes_cli.auth as _auth_mod
from pathlib import Path
source = Path(_auth_mod.__file__).read_text(encoding="utf-8")
# Find the env-override read line.
for line in source.splitlines():
if "NOUS_INFERENCE_BASE_URL" in line and "os.getenv" in line:
assert "_validate_nous_inference_url_from_network" not in line, (
"env override path must not gate through the network "
"validator — it would break documented dev/staging use."
)

View file

@ -0,0 +1,353 @@
"""Tests for the plugin auxiliary-task registration API.
Covers:
- PluginContext.register_auxiliary_task() validation
- PluginManager._aux_tasks storage + force-rediscovery clearing
- get_plugin_auxiliary_tasks() module-level helper
- _all_aux_tasks() merge of built-in + plugin tasks
- _reset_aux_to_auto() includes plugin tasks
- _get_auxiliary_task_config() layers plugin defaults under user config
"""
from __future__ import annotations
import pytest
from hermes_cli.plugins import (
PluginContext,
PluginManager,
PluginManifest,
get_plugin_auxiliary_tasks,
)
# ── Fixtures ─────────────────────────────────────────────────────────────────
def _make_ctx(name: str = "test_plugin") -> tuple[PluginContext, PluginManager]:
"""Build a PluginContext + fresh PluginManager wired together.
The manager skips discovery (no plugins.yaml, no scan) so the test
can exercise registration paths directly.
"""
manager = PluginManager()
manager._discovered = True # skip auto-discovery on lookup
manifest = PluginManifest(name=name)
ctx = PluginContext(manifest, manager)
return ctx, manager
@pytest.fixture
def patched_manager(monkeypatch):
"""Replace the module-level singleton with a fresh manager for the test.
Restored automatically after the test by monkeypatch.
"""
from hermes_cli import plugins as plugins_mod
fresh = PluginManager()
fresh._discovered = True
monkeypatch.setattr(plugins_mod, "_PLUGIN_MANAGER", fresh, raising=False)
def _stub_get_manager() -> PluginManager:
return fresh
monkeypatch.setattr(plugins_mod, "get_plugin_manager", _stub_get_manager)
monkeypatch.setattr(plugins_mod, "_ensure_plugins_discovered", _stub_get_manager)
yield fresh
# ── PluginContext.register_auxiliary_task ────────────────────────────────────
def test_register_auxiliary_task_basic():
ctx, manager = _make_ctx("my_plugin")
ctx.register_auxiliary_task(
key="my_task",
display_name="My task",
description="a custom side task",
)
assert "my_task" in manager._aux_tasks
entry = manager._aux_tasks["my_task"]
assert entry["key"] == "my_task"
assert entry["display_name"] == "My task"
assert entry["description"] == "a custom side task"
assert entry["plugin"] == "my_plugin"
# Routing defaults populated
assert entry["defaults"]["provider"] == "auto"
assert entry["defaults"]["model"] == ""
assert entry["defaults"]["timeout"] == 60
def test_register_auxiliary_task_with_custom_defaults():
ctx, manager = _make_ctx()
ctx.register_auxiliary_task(
key="custom_task",
display_name="Custom",
description="d",
defaults={"timeout": 30, "extra_body": {"reasoning_effort": "low"}},
)
entry = manager._aux_tasks["custom_task"]
assert entry["defaults"]["timeout"] == 30
assert entry["defaults"]["extra_body"] == {"reasoning_effort": "low"}
# Unspecified defaults still populated
assert entry["defaults"]["provider"] == "auto"
def test_register_auxiliary_task_rejects_builtin_keys():
ctx, _ = _make_ctx()
for builtin in (
"vision",
"compression",
"web_extract",
"approval",
"mcp",
"title_generation",
"skills_hub",
"curator",
):
with pytest.raises(ValueError, match="reserved for a built-in task"):
ctx.register_auxiliary_task(
key=builtin,
display_name="x",
description="x",
)
def test_register_auxiliary_task_rejects_invalid_key_shapes():
ctx, _ = _make_ctx()
for bad in ("", "with-dash", "with.dot", "with space", "with/slash"):
with pytest.raises(ValueError):
ctx.register_auxiliary_task(
key=bad,
display_name="x",
description="x",
)
def test_register_auxiliary_task_allows_same_plugin_re_registration():
"""Re-registration by the same plugin updates the entry (idempotent)."""
ctx, manager = _make_ctx("plug_a")
ctx.register_auxiliary_task(
key="t1", display_name="First", description="first"
)
ctx.register_auxiliary_task(
key="t1", display_name="Second", description="second"
)
assert manager._aux_tasks["t1"]["display_name"] == "Second"
def test_register_auxiliary_task_rejects_cross_plugin_collision():
"""Two different plugins cannot register the same task key."""
manager = PluginManager()
manager._discovered = True
manifest_a = PluginManifest(name="plug_a")
manifest_b = PluginManifest(name="plug_b")
ctx_a = PluginContext(manifest_a, manager)
ctx_b = PluginContext(manifest_b, manager)
ctx_a.register_auxiliary_task(
key="shared", display_name="A", description="a"
)
with pytest.raises(ValueError, match="already registered by plugin 'plug_a'"):
ctx_b.register_auxiliary_task(
key="shared", display_name="B", description="b"
)
# ── PluginManager state lifecycle ────────────────────────────────────────────
def test_force_rediscovery_clears_aux_tasks():
ctx, manager = _make_ctx()
ctx.register_auxiliary_task(
key="will_be_cleared",
display_name="x",
description="x",
)
assert "will_be_cleared" in manager._aux_tasks
manager._discovered = False
# Simulate force=True path: clears state before re-scanning
manager._aux_tasks.clear()
assert manager._aux_tasks == {}
# ── Module-level helper ──────────────────────────────────────────────────────
def test_get_plugin_auxiliary_tasks_returns_sorted_list(patched_manager):
manifest = PluginManifest(name="plug")
ctx = PluginContext(manifest, patched_manager)
ctx.register_auxiliary_task(
key="zeta_task", display_name="Zeta", description="z"
)
ctx.register_auxiliary_task(
key="alpha_task", display_name="Alpha", description="a"
)
ctx.register_auxiliary_task(
key="mike_task", display_name="Mike", description="m"
)
tasks = get_plugin_auxiliary_tasks()
assert [t["key"] for t in tasks] == ["alpha_task", "mike_task", "zeta_task"]
def test_get_plugin_auxiliary_tasks_empty_when_none_registered(patched_manager):
assert get_plugin_auxiliary_tasks() == []
# ── _all_aux_tasks merges built-in + plugin ──────────────────────────────────
def test_all_aux_tasks_includes_plugin_registered(patched_manager):
from hermes_cli.main import _AUX_TASKS, _all_aux_tasks
manifest = PluginManifest(name="hindsight")
ctx = PluginContext(manifest, patched_manager)
ctx.register_auxiliary_task(
key="memory_retain_filter",
display_name="Memory retain filter",
description="hindsight pre-retain dedup/extract",
)
merged = _all_aux_tasks()
keys = [k for k, _, _ in merged]
# Built-ins preserved (and come first)
builtin_keys = [k for k, _, _ in _AUX_TASKS]
assert keys[: len(builtin_keys)] == builtin_keys
# Plugin task appended
assert "memory_retain_filter" in keys
plugin_entry = next(t for t in merged if t[0] == "memory_retain_filter")
assert plugin_entry == (
"memory_retain_filter",
"Memory retain filter",
"hindsight pre-retain dedup/extract",
)
def test_all_aux_tasks_swallows_plugin_discovery_failure(monkeypatch):
"""Plugin discovery failure must not break the aux config UI."""
from hermes_cli import main as main_mod
def _broken():
raise RuntimeError("plugin scan exploded")
monkeypatch.setattr(
"hermes_cli.plugins.get_plugin_auxiliary_tasks", _broken
)
merged = main_mod._all_aux_tasks()
# Built-in tasks still present
assert any(k == "vision" for k, _, _ in merged)
# ── _reset_aux_to_auto includes plugin tasks ─────────────────────────────────
def test_reset_aux_to_auto_resets_plugin_tasks(tmp_path, monkeypatch, patched_manager):
"""Plugin task with non-auto config gets reset alongside built-ins."""
from pathlib import Path
from hermes_cli.config import load_config, save_config
from hermes_cli.main import _reset_aux_to_auto
monkeypatch.setenv("HERMES_HOME", str(tmp_path / ".hermes"))
monkeypatch.setattr(Path, "home", lambda: tmp_path)
(tmp_path / ".hermes").mkdir(exist_ok=True)
manifest = PluginManifest(name="plug")
ctx = PluginContext(manifest, patched_manager)
ctx.register_auxiliary_task(
key="my_aux",
display_name="My Aux",
description="d",
)
# Manually configure the plugin task to non-auto
cfg = load_config()
aux = cfg.setdefault("auxiliary", {})
aux["my_aux"] = {"provider": "openrouter", "model": "gpt-4o", "base_url": "", "api_key": ""}
save_config(cfg)
n = _reset_aux_to_auto()
assert n >= 1
cfg = load_config()
assert cfg["auxiliary"]["my_aux"]["provider"] == "auto"
assert cfg["auxiliary"]["my_aux"]["model"] == ""
# ── auxiliary_client._get_auxiliary_task_config defaults layering ────────────
def test_get_auxiliary_task_config_layers_plugin_defaults(
tmp_path, monkeypatch, patched_manager
):
"""Plugin-declared defaults appear when user has no config entry."""
from pathlib import Path
from agent.auxiliary_client import _get_auxiliary_task_config
monkeypatch.setenv("HERMES_HOME", str(tmp_path / ".hermes"))
monkeypatch.setattr(Path, "home", lambda: tmp_path)
(tmp_path / ".hermes").mkdir(exist_ok=True)
manifest = PluginManifest(name="plug")
ctx = PluginContext(manifest, patched_manager)
ctx.register_auxiliary_task(
key="my_filter",
display_name="My filter",
description="x",
defaults={"timeout": 15, "extra_body": {"reasoning_effort": "low"}},
)
# No user config for my_filter — defaults should surface
resolved = _get_auxiliary_task_config("my_filter")
assert resolved["timeout"] == 15
assert resolved["extra_body"] == {"reasoning_effort": "low"}
assert resolved["provider"] == "auto"
def test_get_auxiliary_task_config_user_config_wins_over_plugin_defaults(
tmp_path, monkeypatch, patched_manager
):
"""User's config.yaml entry overrides plugin-declared defaults."""
from pathlib import Path
from hermes_cli.config import load_config, save_config
from agent.auxiliary_client import _get_auxiliary_task_config
monkeypatch.setenv("HERMES_HOME", str(tmp_path / ".hermes"))
monkeypatch.setattr(Path, "home", lambda: tmp_path)
(tmp_path / ".hermes").mkdir(exist_ok=True)
manifest = PluginManifest(name="plug")
ctx = PluginContext(manifest, patched_manager)
ctx.register_auxiliary_task(
key="my_filter",
display_name="My filter",
description="x",
defaults={"timeout": 15, "provider": "auto"},
)
# User overrides timeout + provider via config.yaml
cfg = load_config()
aux = cfg.setdefault("auxiliary", {})
aux["my_filter"] = {"timeout": 90, "provider": "nous"}
save_config(cfg)
resolved = _get_auxiliary_task_config("my_filter")
assert resolved["timeout"] == 90 # user wins
assert resolved["provider"] == "nous" # user wins
def test_get_auxiliary_task_config_unknown_task_returns_empty(
tmp_path, monkeypatch, patched_manager
):
from pathlib import Path
from agent.auxiliary_client import _get_auxiliary_task_config
monkeypatch.setenv("HERMES_HOME", str(tmp_path / ".hermes"))
monkeypatch.setattr(Path, "home", lambda: tmp_path)
(tmp_path / ".hermes").mkdir(exist_ok=True)
assert _get_auxiliary_task_config("nonexistent") == {}

View file

@ -65,6 +65,36 @@ class TestSanitizePluginName:
with pytest.raises(ValueError, match="must not be empty"):
_sanitize_plugin_name("", tmp_path)
# ── allow_subdir=True ──
def test_allow_subdir_accepts_single_slash(self, tmp_path):
target = _sanitize_plugin_name(
"observability/langfuse", tmp_path, allow_subdir=True
)
assert target == (tmp_path / "observability" / "langfuse").resolve()
def test_allow_subdir_strips_leading_trailing_slash(self, tmp_path):
target = _sanitize_plugin_name(
"/image_gen/openai/", tmp_path, allow_subdir=True
)
assert target == (tmp_path / "image_gen" / "openai").resolve()
def test_allow_subdir_still_rejects_dot_dot(self, tmp_path):
with pytest.raises(ValueError, match="must not contain"):
_sanitize_plugin_name("foo/../bar", tmp_path, allow_subdir=True)
def test_allow_subdir_still_rejects_backslash(self, tmp_path):
with pytest.raises(ValueError, match="must not contain"):
_sanitize_plugin_name("foo\\bar", tmp_path, allow_subdir=True)
def test_allow_subdir_rejects_empty_after_strip(self, tmp_path):
with pytest.raises(ValueError, match="must not be empty"):
_sanitize_plugin_name("///", tmp_path, allow_subdir=True)
def test_allow_subdir_resolves_inside_plugins_dir(self, tmp_path):
target = _sanitize_plugin_name("a/b/c", tmp_path, allow_subdir=True)
assert target.is_relative_to(tmp_path.resolve())
# ── _resolve_git_url ──────────────────────────────────────────────────────

View file

@ -0,0 +1,361 @@
"""Regression coverage for GHSA-5qr3-c538-wm9j (#29156) — Remote Code
Execution via the ``HERMES_ENABLE_PROJECT_PLUGINS`` bypass in the web
server's dashboard plugin loader.
Two primitives combined into the original advisory chain:
1. ``hermes_cli.web_server._discover_dashboard_plugins`` opted into
the untrusted ``./.hermes/plugins/`` source via
``os.environ.get("HERMES_ENABLE_PROJECT_PLUGINS")`` truthy for
any non-empty string, so ``=0`` / ``=false`` / ``=no`` (all of
which the agent loader treats as off, and which operators set to
*disable* project plugins) silently *enabled* the source.
2. ``hermes_cli.web_server._mount_plugin_api_routes`` then imported
each plugin's manifest ``api`` field as a Python module via
``importlib.util.spec_from_file_location``. The field was used
raw, with no path-traversal check, so a single manifest line
``{"api": "/tmp/payload.py"}`` was enough to redirect the
importer at any Python file on disk (``Path('safe') / '/abs'``
resolves to ``/abs`` in Python).
These tests pin each layer of the new defence:
* Truthy env semantics now match the agent loader.
* ``_safe_plugin_api_relpath`` rejects absolute paths, ``..``
traversal, and non-string / empty values.
* ``_mount_plugin_api_routes`` re-validates at import time and
refuses project-source plugins outright.
* End-to-end the original PoC manifest no longer triggers
``importlib`` for ``/tmp/payload.py``.
"""
from __future__ import annotations
import json
import os
import sys
from pathlib import Path
from unittest.mock import patch
import pytest
from hermes_cli import web_server
@pytest.fixture(autouse=True)
def _reset_plugin_cache(monkeypatch):
"""The plugin scanner caches its result per-process. Bust the
cache before *and* after each test so leakage between tests can't
mask a regression and so the production cache the import-time
``_mount_plugin_api_routes()`` populated doesn't bleed in."""
web_server._dashboard_plugins_cache = None
yield
web_server._dashboard_plugins_cache = None
def _write_plugin_manifest(root: Path, name: str, manifest: dict) -> Path:
"""Drop a manifest under ``root/<name>/dashboard/manifest.json`` and
return the dashboard dir path."""
dashboard_dir = root / name / "dashboard"
dashboard_dir.mkdir(parents=True)
(dashboard_dir / "manifest.json").write_text(json.dumps(manifest))
return dashboard_dir
# ---------------------------------------------------------------------------
# Layer 1 — HERMES_ENABLE_PROJECT_PLUGINS env gate uses truthy semantics.
# ---------------------------------------------------------------------------
class TestProjectPluginsEnvGate:
"""Project plugins must only be discovered when the env var is set
to a documented truthy value. Pre-#29156 any non-empty string —
including ``0`` / ``false`` / ``no`` silently enabled the source."""
@pytest.fixture
def project_plugin(self, tmp_path, monkeypatch):
"""Plant a project-source plugin under CWD's ``.hermes/plugins``
and isolate the user-plugins dir to an empty tmp tree."""
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "home"))
(tmp_path / "home").mkdir()
cwd = tmp_path / "evil-repo"
cwd.mkdir()
monkeypatch.chdir(cwd)
_write_plugin_manifest(
cwd / ".hermes" / "plugins",
"evil",
{
"name": "evil",
"label": "Evil",
"entry": "dist/index.js",
},
)
return cwd
@pytest.mark.parametrize("value", ["", "0", "false", "FALSE", "no", "off", "False"])
def test_falsy_values_keep_project_plugins_disabled(
self, project_plugin, monkeypatch, value
):
if value == "":
monkeypatch.delenv("HERMES_ENABLE_PROJECT_PLUGINS", raising=False)
else:
monkeypatch.setenv("HERMES_ENABLE_PROJECT_PLUGINS", value)
plugins = web_server._get_dashboard_plugins(force_rescan=True)
names = {p["name"] for p in plugins}
assert "evil" not in names, (
f"HERMES_ENABLE_PROJECT_PLUGINS={value!r} must NOT enable the "
"project source — that's the GHSA-5qr3-c538-wm9j env bypass."
)
@pytest.mark.parametrize("value", ["1", "true", "TRUE", "yes", "on", "YES"])
def test_truthy_values_enable_project_plugins(
self, project_plugin, monkeypatch, value
):
monkeypatch.setenv("HERMES_ENABLE_PROJECT_PLUGINS", value)
plugins = web_server._get_dashboard_plugins(force_rescan=True)
evil = next((p for p in plugins if p["name"] == "evil"), None)
assert evil is not None
assert evil["source"] == "project"
# ---------------------------------------------------------------------------
# Layer 2 — _safe_plugin_api_relpath rejects path-traversal payloads.
# ---------------------------------------------------------------------------
class TestApiPathSanitizer:
"""Unit-level coverage for the new ``_safe_plugin_api_relpath``
helper. Anything that escapes the plugin's dashboard directory
must come back as ``None``."""
def _dashboard_dir(self, tmp_path):
d = tmp_path / "plug" / "dashboard"
d.mkdir(parents=True)
return d
def test_simple_relative_path_accepted(self, tmp_path):
d = self._dashboard_dir(tmp_path)
(d / "api.py").write_text("router = None\n")
assert web_server._safe_plugin_api_relpath("api.py", dashboard_dir=d) == "api.py"
def test_nested_relative_path_accepted(self, tmp_path):
d = self._dashboard_dir(tmp_path)
(d / "backend").mkdir()
(d / "backend" / "routes.py").write_text("router = None\n")
out = web_server._safe_plugin_api_relpath(
"backend/routes.py", dashboard_dir=d
)
assert out == "backend/routes.py"
@pytest.mark.parametrize("payload", [
"/etc/passwd",
"/tmp/payload.py",
"/usr/bin/python",
# NT-style absolute on POSIX is a relative path — covered by traversal below.
])
def test_absolute_path_rejected(self, tmp_path, payload):
d = self._dashboard_dir(tmp_path)
assert web_server._safe_plugin_api_relpath(payload, dashboard_dir=d) is None
@pytest.mark.parametrize("payload", [
"../../../etc/passwd",
"../neighbour/api.py",
"../../../../tmp/evil.py",
"subdir/../../../../etc/passwd",
])
def test_traversal_rejected(self, tmp_path, payload):
d = self._dashboard_dir(tmp_path)
assert web_server._safe_plugin_api_relpath(payload, dashboard_dir=d) is None
@pytest.mark.parametrize("payload", [None, "", " ", 42, [], {}])
def test_non_string_or_empty_rejected(self, tmp_path, payload):
d = self._dashboard_dir(tmp_path)
assert web_server._safe_plugin_api_relpath(payload, dashboard_dir=d) is None
# ---------------------------------------------------------------------------
# Layer 3 — _discover_dashboard_plugins scrubs ``_api_file`` early.
# ---------------------------------------------------------------------------
class TestDiscoveryScrubsApiField:
"""The cached plugin entry must NEVER carry an unsanitised api path.
A regression here would re-arm the RCE for any caller that uses
``plugin['_api_file']`` directly."""
@pytest.fixture
def user_plugin_factory(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
monkeypatch.delenv("HERMES_ENABLE_PROJECT_PLUGINS", raising=False)
def _make(name: str, manifest: dict) -> None:
_write_plugin_manifest(tmp_path / "plugins", name, manifest)
return _make
def test_absolute_api_path_in_manifest_is_scrubbed(self, user_plugin_factory):
user_plugin_factory("evil", {
"name": "evil",
"label": "Evil",
"api": "/tmp/payload.py",
"entry": "dist/index.js",
})
plugins = web_server._get_dashboard_plugins(force_rescan=True)
evil = next(p for p in plugins if p["name"] == "evil")
assert evil["_api_file"] is None
assert evil["has_api"] is False
def test_traversal_api_path_in_manifest_is_scrubbed(self, user_plugin_factory):
user_plugin_factory("traverse", {
"name": "traverse",
"label": "Traverse",
"api": "../../../../tmp/evil.py",
"entry": "dist/index.js",
})
plugins = web_server._get_dashboard_plugins(force_rescan=True)
entry = next(p for p in plugins if p["name"] == "traverse")
assert entry["_api_file"] is None
assert entry["has_api"] is False
def test_safe_api_path_survives(self, user_plugin_factory, tmp_path):
user_plugin_factory("safe", {
"name": "safe",
"label": "Safe",
"api": "api.py",
"entry": "dist/index.js",
})
# Make the api file actually exist so a downstream mount could
# in principle proceed — we're only testing the discovery scrub.
(tmp_path / "plugins" / "safe" / "dashboard" / "api.py").write_text(
"router = None\n"
)
plugins = web_server._get_dashboard_plugins(force_rescan=True)
entry = next(p for p in plugins if p["name"] == "safe")
assert entry["_api_file"] == "api.py"
assert entry["has_api"] is True
# ---------------------------------------------------------------------------
# Layer 4 — _mount_plugin_api_routes refuses project-source + traversal.
# ---------------------------------------------------------------------------
class TestMountApiRoutesRefusesUntrusted:
"""The mount routine is the actual ``importlib`` call site — these
tests poke synthetic plugin entries directly into the cache and
assert the importer is *not* invoked."""
def _payload_plugin(self, tmp_path, *, source: str, api_file: str = "api.py"):
dash = tmp_path / "plug" / "dashboard"
dash.mkdir(parents=True)
# Write a benign router file; the test asserts it's NOT imported
# regardless of whether it exists, since the source/path checks
# short-circuit before the importer runs.
(dash / "api.py").write_text(
"from fastapi import APIRouter\nrouter = APIRouter()\n"
)
return {
"name": "synthetic",
"label": "Synthetic",
"tab": {"path": "/synthetic", "position": "end"},
"slots": [],
"entry": "dist/index.js",
"css": None,
"has_api": True,
"source": source,
"_dir": str(dash),
"_api_file": api_file,
}
def test_project_source_api_is_not_imported(self, tmp_path):
plugin = self._payload_plugin(tmp_path, source="project")
web_server._dashboard_plugins_cache = [plugin]
with patch("importlib.util.spec_from_file_location") as spec:
web_server._mount_plugin_api_routes()
assert spec.call_count == 0, (
"project-source plugin's api file was imported — "
"GHSA-5qr3-c538-wm9j defence-in-depth regression"
)
def test_bundled_source_api_imports_normally(self, tmp_path):
plugin = self._payload_plugin(tmp_path, source="bundled")
web_server._dashboard_plugins_cache = [plugin]
with patch("importlib.util.spec_from_file_location") as spec:
spec.return_value = None # loader is None -> early continue, safe
web_server._mount_plugin_api_routes()
assert spec.call_count == 1
# First positional arg after module_name is the resolved api path.
called_path = Path(spec.call_args.args[1])
assert called_path.name == "api.py"
assert called_path.is_absolute()
def test_traversal_api_caught_at_mount_time(self, tmp_path):
"""Defence-in-depth: if discovery is bypassed (e.g. cache
tampering), mount-time validation still refuses to import a
file outside the dashboard dir."""
plugin = self._payload_plugin(tmp_path, source="user",
api_file="../../../tmp/evil.py")
web_server._dashboard_plugins_cache = [plugin]
with patch("importlib.util.spec_from_file_location") as spec:
web_server._mount_plugin_api_routes()
assert spec.call_count == 0
# ---------------------------------------------------------------------------
# Layer 5 — End-to-end: the original PoC manifest no longer triggers RCE.
# ---------------------------------------------------------------------------
class TestEndToEndPocBlocked:
"""Reproduces the original advisory PoC shape: untrusted CWD with a
manifest pointing ``api`` at an attacker-chosen Python file, with
``HERMES_ENABLE_PROJECT_PLUGINS=0`` (so the operator believed the
project source was disabled). Post-fix, the importer must never
be invoked for the payload path, regardless of how the bypass is
framed (``=0`` truthy-string bypass, absolute path bypass,
project-source bypass)."""
def test_full_chain_blocked(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "home"))
(tmp_path / "home").mkdir()
cwd = tmp_path / "evil-repo"
cwd.mkdir()
monkeypatch.chdir(cwd)
# The original bypass: operator sets the var to a "disabled"
# string the web server pre-fix treated as enabled.
monkeypatch.setenv("HERMES_ENABLE_PROJECT_PLUGINS", "0")
# Payload: absolute path inside a manifest dropped in CWD.
payload_py = tmp_path / "payload.py"
payload_py.write_text("OWNED = True\n")
_write_plugin_manifest(
cwd / ".hermes" / "plugins",
"evil",
{
"name": "evil",
"label": "Evil",
"api": str(payload_py),
"entry": "dist/index.js",
},
)
with patch("importlib.util.spec_from_file_location") as spec:
plugins = web_server._get_dashboard_plugins(force_rescan=True)
web_server._mount_plugin_api_routes()
# The project source must stay disabled because ``0`` is no
# longer truthy. Even if the operator *had* opted in, the
# absolute-path api would be scrubbed at discovery, and even
# if discovery missed it the project-source guard in mount
# would refuse the import.
assert "evil" not in {p["name"] for p in plugins}
# Bundled plugins shipped with the repo may legitimately have
# ``api`` files and so ``spec_from_file_location`` can fire for
# those — the regression is specifically that the *payload*
# path / *evil* module are never targeted.
for call in spec.call_args_list:
module_name = call.args[0]
target = Path(call.args[1])
assert module_name != "hermes_dashboard_plugin_evil"
assert target != payload_py
assert "evil-repo" not in target.parts
assert "hermes_dashboard_plugin_evil" not in sys.modules

View file

@ -0,0 +1,299 @@
"""Unit tests for hermes_cli.security_audit — parsers + OSV plumbing.
These never hit the live OSV API; HTTP is monkeypatched. The live-call path
is exercised in the E2E test embedded in PR validation, not here.
"""
from __future__ import annotations
import json
from pathlib import Path
from unittest.mock import patch
import pytest
from hermes_cli import security_audit as sa
# ─── Parsers ──────────────────────────────────────────────────────────────────
class TestRequirementsParser:
def test_extracts_pinned_versions(self):
text = "requests==2.20.0\nflask==2.0.1\n"
assert sa._parse_requirements(text) == [
("requests", "2.20.0"),
("flask", "2.0.1"),
]
def test_skips_comments_and_options(self):
text = "# comment\n-r other.txt\n--index-url https://x\nflask==2.0.1\n"
assert sa._parse_requirements(text) == [("flask", "2.0.1")]
def test_skips_unpinned(self):
# We deliberately don't try to map >=, ~=, or bare-name deps to OSV.
text = "requests>=2.0\ntyping-extensions\nflask~=2.0\n"
assert sa._parse_requirements(text) == []
def test_handles_extras_and_markers(self):
text = 'requests[security]==2.20.0\nflask==2.0.1 ; python_version >= "3.8"\n'
assert sa._parse_requirements(text) == [
("requests", "2.20.0"),
("flask", "2.0.1"),
]
def test_handles_empty(self):
assert sa._parse_requirements("") == []
assert sa._parse_requirements(" \n\n ") == []
class TestMCPComponentExtraction:
def test_npx_scoped_pinned(self):
comp = sa._extract_mcp_component(
"fs", "npx", ["-y", "@modelcontextprotocol/server-filesystem@0.5.0"]
)
assert comp == sa.Component(
name="@modelcontextprotocol/server-filesystem",
version="0.5.0",
ecosystem="npm",
source="mcp:fs",
)
def test_npx_full_path_command(self):
comp = sa._extract_mcp_component(
"fetch", "/usr/local/bin/npx", ["mcp-server-fetch@1.2.3"]
)
assert comp is not None
assert comp.name == "mcp-server-fetch"
assert comp.version == "1.2.3"
def test_uvx_pinned(self):
comp = sa._extract_mcp_component("time", "uvx", ["mcp-server-time==2.1.0"])
assert comp is not None
assert comp.ecosystem == "PyPI"
assert comp.name == "mcp-server-time"
assert comp.version == "2.1.0"
def test_unpinned_returns_none(self):
# Bare npx package name = "latest" at runtime; not an audit subject.
assert sa._extract_mcp_component("x", "npx", ["-y", "some-pkg"]) is None
def test_docker_returns_none(self):
# We don't currently parse docker image refs.
assert sa._extract_mcp_component("x", "docker", ["run", "-i", "mcp/foo:1.0"]) is None
def test_empty_args(self):
assert sa._extract_mcp_component("x", "npx", []) is None
# ─── Plugin discovery ─────────────────────────────────────────────────────────
class TestPluginDiscovery:
def test_reads_requirements_txt(self, tmp_path: Path):
plugin = tmp_path / "plugins" / "myplugin"
plugin.mkdir(parents=True)
(plugin / "requirements.txt").write_text("requests==2.20.0\n")
components = sa._discover_plugins(tmp_path)
assert len(components) == 1
assert components[0].name == "requests"
assert components[0].source == "plugin:myplugin"
def test_skips_when_no_plugins_dir(self, tmp_path: Path):
assert sa._discover_plugins(tmp_path) == []
def test_skips_hidden_dirs(self, tmp_path: Path):
(tmp_path / "plugins" / ".hidden").mkdir(parents=True)
(tmp_path / "plugins" / ".hidden" / "requirements.txt").write_text(
"requests==2.20.0\n"
)
assert sa._discover_plugins(tmp_path) == []
def test_reads_pyproject_dependencies(self, tmp_path: Path):
plugin = tmp_path / "plugins" / "py"
plugin.mkdir(parents=True)
(plugin / "pyproject.toml").write_text(
'[project]\ndependencies = ["flask==2.0.1", "uvicorn>=0.20"]\n'
)
components = sa._discover_plugins(tmp_path)
# uvicorn>=0.20 is unpinned, so only flask comes through
assert len(components) == 1
assert components[0].name == "flask"
assert components[0].version == "2.0.1"
# ─── OSV severity extraction ──────────────────────────────────────────────────
class TestSeverityExtraction:
def test_database_specific_severity(self):
rec = {"database_specific": {"severity": "HIGH"}}
assert sa._osv_severity_from_record(rec) == "HIGH"
def test_unknown_when_no_severity(self):
assert sa._osv_severity_from_record({}) == "UNKNOWN"
def test_ecosystem_specific_fallback(self):
rec = {"affected": [{"ecosystem_specific": {"severity": "MODERATE"}}]}
assert sa._osv_severity_from_record(rec) == "MODERATE"
def test_fixed_versions_extracted_and_deduped(self):
rec = {
"affected": [
{
"ranges": [
{
"events": [
{"introduced": "0"},
{"fixed": "2.0.0"},
]
}
]
},
{"ranges": [{"events": [{"fixed": "2.0.0"}, {"fixed": "1.9.5"}]}]},
]
}
assert sa._osv_fixed_versions(rec) == ["2.0.0", "1.9.5"]
# ─── End-to-end orchestration with mocked OSV ─────────────────────────────────
class TestRunAudit:
def test_no_components_returns_empty(self, tmp_path: Path):
findings = sa.run_audit(
skip_venv=True, skip_plugins=True, skip_mcp=True, hermes_home=tmp_path
)
assert findings == []
def test_findings_sorted_by_severity_desc(self, tmp_path: Path):
plugin = tmp_path / "plugins" / "p"
plugin.mkdir(parents=True)
(plugin / "requirements.txt").write_text("alpha==1.0.0\nbeta==2.0.0\n")
def fake_batch(comps):
return {
comps[0]: ["LOW-1"],
comps[1]: ["CRIT-1"],
}
def fake_details(ids):
return {
"LOW-1": sa.Vulnerability(osv_id="LOW-1", severity="LOW", summary="low"),
"CRIT-1": sa.Vulnerability(osv_id="CRIT-1", severity="CRITICAL", summary="crit"),
}
with patch.object(sa, "_osv_query_batch", side_effect=fake_batch), \
patch.object(sa, "_osv_fetch_details", side_effect=fake_details):
findings = sa.run_audit(
skip_venv=True, skip_plugins=False, skip_mcp=True, hermes_home=tmp_path
)
assert len(findings) == 2
# CRITICAL must come first
assert findings[0].vuln.osv_id == "CRIT-1"
assert findings[1].vuln.osv_id == "LOW-1"
# ─── CLI subcommand exit codes ────────────────────────────────────────────────
class TestExitCodes:
def _build_args(self, **kwargs):
import argparse
defaults = {
"skip_venv": True,
"skip_plugins": True,
"skip_mcp": True,
"json": False,
"fail_on": "critical",
}
defaults.update(kwargs)
return argparse.Namespace(**defaults)
def test_clean_audit_exits_zero(self, tmp_path: Path, monkeypatch, capsys):
monkeypatch.setattr(sa, "get_hermes_home", lambda: str(tmp_path))
# Everything skipped → no components → exit 0
code = sa.cmd_security_audit(self._build_args())
assert code == 0
out = capsys.readouterr().out
assert "No components" in out or "0 component" in out
def test_finding_above_threshold_exits_one(self, tmp_path: Path, monkeypatch):
monkeypatch.setattr(sa, "get_hermes_home", lambda: str(tmp_path))
# Force a venv discovery to return one component, OSV to flag it CRITICAL
fake_comp = sa.Component(
name="pkg", version="1.0", ecosystem="PyPI", source="venv"
)
monkeypatch.setattr(sa, "_discover_venv", lambda: [fake_comp])
monkeypatch.setattr(
sa, "_osv_query_batch", lambda comps: {fake_comp: ["X-1"]}
)
monkeypatch.setattr(
sa,
"_osv_fetch_details",
lambda ids: {"X-1": sa.Vulnerability(osv_id="X-1", severity="CRITICAL")},
)
code = sa.cmd_security_audit(
self._build_args(skip_venv=False, fail_on="critical")
)
assert code == 1
def test_finding_below_threshold_exits_zero(self, tmp_path: Path, monkeypatch):
monkeypatch.setattr(sa, "get_hermes_home", lambda: str(tmp_path))
fake_comp = sa.Component(
name="pkg", version="1.0", ecosystem="PyPI", source="venv"
)
monkeypatch.setattr(sa, "_discover_venv", lambda: [fake_comp])
monkeypatch.setattr(
sa, "_osv_query_batch", lambda comps: {fake_comp: ["X-1"]}
)
monkeypatch.setattr(
sa,
"_osv_fetch_details",
lambda ids: {"X-1": sa.Vulnerability(osv_id="X-1", severity="MODERATE")},
)
code = sa.cmd_security_audit(
self._build_args(skip_venv=False, fail_on="critical")
)
assert code == 0
def test_unknown_fail_on_value_exits_two(self, tmp_path: Path, monkeypatch, capsys):
monkeypatch.setattr(sa, "get_hermes_home", lambda: str(tmp_path))
code = sa.cmd_security_audit(self._build_args(fail_on="garbage"))
assert code == 2
err = capsys.readouterr().err
assert "fail-on" in err.lower()
def test_json_output_shape(self, tmp_path: Path, monkeypatch, capsys):
monkeypatch.setattr(sa, "get_hermes_home", lambda: str(tmp_path))
fake_comp = sa.Component(
name="pkg", version="1.0", ecosystem="PyPI", source="venv"
)
monkeypatch.setattr(sa, "_discover_venv", lambda: [fake_comp])
monkeypatch.setattr(
sa, "_osv_query_batch", lambda comps: {fake_comp: ["X-1"]}
)
monkeypatch.setattr(
sa,
"_osv_fetch_details",
lambda ids: {
"X-1": sa.Vulnerability(
osv_id="X-1",
severity="HIGH",
summary="bad",
fixed_versions=["1.1"],
)
},
)
sa.cmd_security_audit(
self._build_args(skip_venv=False, json=True, fail_on="critical")
)
payload = capsys.readouterr().out
# The bitwarden banner can leak above the json; pick the first { line.
lines = payload.splitlines()
json_start = next(i for i, l in enumerate(lines) if l.startswith("{"))
data = json.loads("\n".join(lines[json_start:]))
assert data["finding_count"] == 1
assert data["findings"][0]["severity"] == "HIGH"
assert data["findings"][0]["fixed_versions"] == ["1.1"]

View file

@ -12,8 +12,10 @@ from hermes_cli.tools_config import (
_get_platform_tools,
_platform_toolset_summary,
_reconfigure_tool,
_run_post_setup,
_save_platform_tools,
_toolset_has_keys,
_toolset_needs_configuration_prompt,
CONFIGURABLE_TOOLSETS,
TOOL_CATEGORIES,
_visible_providers,
@ -752,6 +754,91 @@ def test_numeric_mcp_server_name_does_not_crash_sorted():
# ─── Imagegen Backend Picker Wiring ────────────────────────────────────────
def test_toolset_has_keys_treats_no_key_providers_as_configured():
config = {}
assert _toolset_has_keys("computer_use", config) is True
def test_computer_use_needs_configuration_when_cua_driver_post_setup_pending():
"""No-key providers can still need setup when their post_setup is unsatisfied.
Returning users enabling Computer Use through `hermes tools` must reach the
cua-driver post-setup installer even though the provider has no API keys.
"""
with patch("shutil.which", return_value=None):
assert _toolset_needs_configuration_prompt("computer_use", {}) is True
def test_computer_use_skips_configuration_when_cua_driver_already_installed():
"""Installed post_setup dependencies should keep returning-user toggles no-op."""
def fake_which(name: str):
return "/usr/local/bin/cua-driver" if name == "cua-driver" else None
with patch("shutil.which", side_effect=fake_which):
assert _toolset_needs_configuration_prompt("computer_use", {}) is False
def test_computer_use_respects_custom_cua_driver_command():
"""The setup gate should match runtime's HERMES_CUA_DRIVER_CMD override."""
def fake_which(name: str):
return "/opt/bin/custom-cua" if name == "custom-cua" else None
with patch.dict("os.environ", {"HERMES_CUA_DRIVER_CMD": "custom-cua"}), \
patch("shutil.which", side_effect=fake_which):
assert _toolset_needs_configuration_prompt("computer_use", {}) is False
def test_computer_use_blank_custom_driver_command_falls_back_to_default():
"""Blank overrides should not make the setup gate look for an empty command."""
def fake_which(name: str):
return "/usr/local/bin/cua-driver" if name == "cua-driver" else None
with patch.dict("os.environ", {"HERMES_CUA_DRIVER_CMD": " "}), \
patch("shutil.which", side_effect=fake_which):
assert _toolset_needs_configuration_prompt("computer_use", {}) is False
def test_computer_use_post_setup_respects_custom_driver_command_when_installed():
"""post_setup already-installed checks should version-probe the override."""
def fake_which(name: str):
return "/opt/bin/custom-cua" if name == "custom-cua" else None
with patch.dict("os.environ", {"HERMES_CUA_DRIVER_CMD": "custom-cua"}), \
patch("platform.system", return_value="Darwin"), \
patch("shutil.which", side_effect=fake_which), \
patch("subprocess.run") as run:
run.return_value.stdout = "custom 1.2.3\n"
_run_post_setup("cua_driver")
run.assert_called_once()
assert run.call_args.args[0] == ["custom-cua", "--version"]
def test_computer_use_post_setup_missing_override_does_not_accept_default_binary():
"""A default cua-driver binary must not satisfy a missing runtime override."""
seen = []
def fake_which(name: str):
seen.append(name)
if name == "cua-driver":
return "/usr/local/bin/cua-driver"
if name == "curl":
return None
return None
with patch.dict("os.environ", {"HERMES_CUA_DRIVER_CMD": "custom-cua"}), \
patch("platform.system", return_value="Darwin"), \
patch("shutil.which", side_effect=fake_which), \
patch("subprocess.run") as run:
_run_post_setup("cua_driver")
run.assert_not_called()
assert "custom-cua" in seen
assert "curl" in seen
class TestImagegenBackendRegistry:
"""IMAGEGEN_BACKENDS tags drive the model picker flow in tools_config."""

View file

@ -168,7 +168,7 @@ def test_make_tui_argv_skips_build_only_on_termux_when_fresh(
argv, cwd = main_mod._make_tui_argv(tmp_path, tui_dev=False)
assert argv == ["/bin/node", str(tmp_path / "dist" / "entry.js")]
assert argv == ["/bin/node", "--expose-gc", str(tmp_path / "dist" / "entry.js")]
assert cwd == tmp_path

View file

@ -1,4 +1,5 @@
from argparse import Namespace
import os
from pathlib import Path
import sys
import types
@ -312,6 +313,37 @@ def test_termux_fast_cli_launch_chat_uses_light_parser(monkeypatch, main_mod):
}
def test_termux_fast_cli_launch_bare_defers_agent_startup(monkeypatch, main_mod):
captured = {}
prepared = []
monkeypatch.setenv("TERMUX_VERSION", "1")
monkeypatch.delenv("HERMES_TUI", raising=False)
monkeypatch.delenv("HERMES_DEFER_AGENT_STARTUP", raising=False)
monkeypatch.delenv("HERMES_FAST_STARTUP_BANNER", raising=False)
monkeypatch.setattr(sys, "argv", ["hermes"])
monkeypatch.setattr(
main_mod, "_prepare_agent_startup", lambda args: prepared.append(args.command)
)
monkeypatch.setattr(
main_mod,
"cmd_chat",
lambda args: captured.update(
{
"query": args.query,
"command": args.command,
"compact": getattr(args, "compact", False),
}
),
)
assert main_mod._try_termux_fast_cli_launch() is True
assert prepared == []
assert captured == {"query": None, "command": None, "compact": True}
assert os.environ["HERMES_DEFER_AGENT_STARTUP"] == "1"
assert os.environ["HERMES_FAST_STARTUP_BANNER"] == "1"
def test_termux_fast_cli_launch_oneshot_uses_light_parser(monkeypatch, main_mod):
captured = {}
prepared = []
@ -364,6 +396,34 @@ def test_termux_fast_cli_launch_version_skips_update_check(monkeypatch, main_mod
assert captured == [False]
def test_termux_ultrafast_version_runs_before_heavy_startup(
monkeypatch, capsys, main_mod
):
monkeypatch.setenv("TERMUX_VERSION", "1")
monkeypatch.delenv("HERMES_TERMUX_DISABLE_FAST_CLI", raising=False)
monkeypatch.setattr(sys, "argv", ["hermes", "--version"])
assert main_mod._try_termux_ultrafast_version() is True
out = capsys.readouterr().out
assert "Hermes Agent v" in out
assert "Project:" in out
assert "Python:" in out
assert "OpenAI SDK:" in out
def test_read_openai_version_fast(monkeypatch, tmp_path, main_mod):
package_dir = tmp_path / "openai"
package_dir.mkdir()
(package_dir / "_version.py").write_text(
'__version__ = "9.8.7" # x-release-please-version\n',
encoding="utf-8",
)
monkeypatch.setattr(sys, "path", [str(tmp_path)])
assert main_mod._read_openai_version_fast() == "9.8.7"
def test_termux_fast_cli_launch_skips_help(monkeypatch, main_mod):
monkeypatch.setenv("TERMUX_VERSION", "1")
monkeypatch.delenv("HERMES_TUI", raising=False)

View file

@ -327,6 +327,12 @@ class TestWebServerEndpoints:
# Public endpoints should still work
resp = unauth_client.get("/api/status")
assert resp.status_code == 200
resp = unauth_client.get("/api/dashboard/plugins")
assert resp.status_code == 200
resp = unauth_client.get("/api/dashboard/plugins/rescan")
assert resp.status_code == 401
resp = self.client.get("/api/dashboard/plugins/rescan")
assert resp.status_code == 200
def test_path_traversal_blocked(self):
"""Verify URL-encoded path traversal is blocked."""
@ -2285,7 +2291,10 @@ class TestPtyWebSocket:
self.ws_module.app.state, "bound_port", 9119, raising=False
)
with self.client.websocket_connect(self._url(channel="abc-123")) as conn:
headers = {"host": "127.0.0.1:9119", "origin": "http://127.0.0.1:9119"}
with self.client.websocket_connect(
self._url(channel="abc-123"), headers=headers
) as conn:
try:
conn.receive_bytes()
except Exception:
@ -2325,7 +2334,34 @@ class TestPtyWebSocket:
with self.client.websocket_connect(pub_path) as pub:
pub.send_text('{"type":"tool.start","payload":{"tool_id":"t1"}}')
received = sub.receive_text()
# Yield control so the server-side broadcast handler can
# process the frame. TestClient runs the ASGI app in a
# background thread; a small sleep gives that thread time
# to call _broadcast_event before we start blocking on
# receive_text(). Without this, under heavy CI load the
# receive can race the broadcast and hang until
# pytest-timeout kills us.
import queue, threading
recv_q: queue.Queue = queue.Queue()
def _recv():
try:
recv_q.put(sub.receive_text())
except Exception as exc:
recv_q.put(exc)
t = threading.Thread(target=_recv, daemon=True)
t.start()
try:
received = recv_q.get(timeout=10.0)
except queue.Empty:
raise AssertionError(
"broadcast not received within 10s — server likely "
"dropped the frame silently (see _broadcast_event "
"except Exception: pass)"
)
if isinstance(received, Exception):
raise received
assert "tool.start" in received
assert '"tool_id":"t1"' in received

View file

@ -146,3 +146,72 @@ class TestHostHeaderMiddleware:
resp = client.get("/api/status")
# Should get through to the status endpoint, not a 400
assert resp.status_code != 400
class TestWebSocketHostOriginGuard:
"""WebSocket upgrades must enforce the same dashboard boundary as HTTP."""
def test_rebinding_websocket_host_is_rejected(self, monkeypatch):
from fastapi.testclient import TestClient
from starlette.websockets import WebSocketDisconnect
import hermes_cli.web_server as ws
monkeypatch.setattr(ws.app.state, "bound_host", "127.0.0.1", raising=False)
monkeypatch.setattr(ws, "_DASHBOARD_EMBEDDED_CHAT_ENABLED", True)
client = TestClient(ws.app)
url = f"/api/events?token={ws._SESSION_TOKEN}&channel=security-test"
with pytest.raises(WebSocketDisconnect) as exc:
with client.websocket_connect(
url,
headers={
"Host": "evil.example",
"Origin": "http://evil.example",
},
):
pass
assert exc.value.code == 4403
def test_rebinding_websocket_origin_is_rejected(self, monkeypatch):
from fastapi.testclient import TestClient
from starlette.websockets import WebSocketDisconnect
import hermes_cli.web_server as ws
monkeypatch.setattr(ws.app.state, "bound_host", "127.0.0.1", raising=False)
monkeypatch.setattr(ws, "_DASHBOARD_EMBEDDED_CHAT_ENABLED", True)
client = TestClient(ws.app)
url = f"/api/events?token={ws._SESSION_TOKEN}&channel=security-test"
with pytest.raises(WebSocketDisconnect) as exc:
with client.websocket_connect(
url,
headers={
"Host": "localhost:9119",
"Origin": "http://evil.example",
},
):
pass
assert exc.value.code == 4403
def test_loopback_websocket_host_and_origin_are_accepted(self, monkeypatch):
from fastapi.testclient import TestClient
import hermes_cli.web_server as ws
monkeypatch.setattr(ws.app.state, "bound_host", "127.0.0.1", raising=False)
monkeypatch.setattr(ws, "_DASHBOARD_EMBEDDED_CHAT_ENABLED", True)
client = TestClient(ws.app)
url = f"/api/events?token={ws._SESSION_TOKEN}&channel=security-test"
with client.websocket_connect(
url,
headers={
"Host": "localhost:9119",
"Origin": "http://localhost:9119",
},
):
pass

View file

@ -3,6 +3,7 @@
import json
import os
import pytest
import stat
from argparse import Namespace
from pathlib import Path
@ -145,6 +146,31 @@ class TestPersistence:
path.write_text("broken{{{")
assert _load_subscriptions() == {}
@pytest.mark.skipif(os.name == "nt", reason="POSIX mode bits are platform-specific")
def test_save_creates_secret_file_owner_only_under_permissive_umask(self):
old_umask = os.umask(0o022)
try:
_save_subscriptions({"demo": {"secret": "TOPSECRET", "prompt": "x"}})
finally:
os.umask(old_umask)
path = _subscriptions_path()
assert stat.S_IMODE(path.stat().st_mode) == 0o600
assert "TOPSECRET" in path.read_text(encoding="utf-8")
@pytest.mark.skipif(os.name == "nt", reason="POSIX mode bits are platform-specific")
def test_save_narrows_existing_broad_secret_file_mode(self):
# Simulate a pre-existing 0o644 file from before this hardening landed.
path = _subscriptions_path()
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps({"old": {"secret": "stale", "prompt": "x"}}))
path.chmod(0o644)
_save_subscriptions({"demo": {"secret": "FRESH", "prompt": "x"}})
assert stat.S_IMODE(path.stat().st_mode) == 0o600
assert "FRESH" in path.read_text(encoding="utf-8")
class TestWebhookEnabledGate:
def test_blocks_when_disabled(self, capsys, monkeypatch):