mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-07-01 12:02:05 +00:00
* Return None instead of erroring on drain login failure * Fix login on drain * Remove login for drained endpoints flow and clean the code * chore: drop unrelated credits changes from this PR * Remove extra comments that were not really necessary
189 lines
7.5 KiB
Python
189 lines
7.5 KiB
Python
"""Tests for the DrainSecretProvider plugin (non-interactive bearer secret).
|
|
|
|
Task 2.0b. Loads the bundled drain plugin module directly and exercises:
|
|
* the entropy gate (assess_secret_strength) — fail-closed on weak secrets,
|
|
* constant-time verify_token returning a scoped TokenPrincipal,
|
|
* the register(ctx) entry point's env/config resolution, skip reasons, and
|
|
token-route registration.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import secrets
|
|
from unittest.mock import MagicMock
|
|
|
|
import pytest
|
|
|
|
import plugins.dashboard_auth.drain as drain_plugin
|
|
from hermes_cli.dashboard_auth import TokenPrincipal, assert_protocol_compliance
|
|
from hermes_cli.dashboard_auth import token_auth
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def drain():
|
|
return drain_plugin
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def _clean_env_and_routes(monkeypatch):
|
|
monkeypatch.delenv("HERMES_DASHBOARD_DRAIN_SECRET", raising=False)
|
|
token_auth.clear_token_routes()
|
|
yield
|
|
token_auth.clear_token_routes()
|
|
|
|
|
|
def _strong_secret() -> str:
|
|
# token_urlsafe(32) → 43 url-safe-b64 chars ≈ 256 bits.
|
|
return secrets.token_urlsafe(32)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Entropy gate
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestEntropyGate:
|
|
def test_strong_secret_passes(self, drain):
|
|
assert drain.assess_secret_strength(_strong_secret()) is None
|
|
|
|
def test_empty_rejected(self, drain):
|
|
assert drain.assess_secret_strength("") is not None
|
|
|
|
def test_too_short_rejected(self, drain):
|
|
# 42 chars — one under the 43-char bar.
|
|
assert drain.assess_secret_strength("a1B2c3" * 7) is not None
|
|
|
|
def test_long_but_repeated_rejected(self, drain):
|
|
# 60 chars, one distinct character → low distinct count + low entropy.
|
|
assert drain.assess_secret_strength("a" * 60) is not None
|
|
|
|
def test_long_but_few_distinct_rejected(self, drain):
|
|
# 60 chars cycling through only 4 distinct characters.
|
|
assert drain.assess_secret_strength("abcd" * 15) is not None
|
|
|
|
def test_custom_min_chars_enforced(self, drain):
|
|
s = _strong_secret() # 43 chars
|
|
assert drain.assess_secret_strength(s, min_chars=999) is not None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Provider behaviour
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestProvider:
|
|
def test_protocol_compliance(self, drain):
|
|
assert_protocol_compliance(drain.DrainSecretProvider)
|
|
|
|
def test_supports_token_flag(self, drain):
|
|
p = drain.DrainSecretProvider(secret=_strong_secret())
|
|
assert p.supports_token is True
|
|
|
|
def test_is_non_interactive(self, drain):
|
|
# Excluded from interactive surfaces via list_session_providers().
|
|
p = drain.DrainSecretProvider(secret=_strong_secret())
|
|
assert p.supports_session is False
|
|
|
|
def test_verify_token_accepts_matching_secret(self, drain):
|
|
s = _strong_secret()
|
|
p = drain.DrainSecretProvider(secret=s, scope="drain")
|
|
principal = p.verify_token(token=s)
|
|
assert isinstance(principal, TokenPrincipal)
|
|
assert principal.principal == "drain-control"
|
|
assert principal.provider == "drain-secret"
|
|
assert principal.scopes == ("drain",)
|
|
|
|
def test_verify_token_rejects_wrong_secret(self, drain):
|
|
p = drain.DrainSecretProvider(secret=_strong_secret())
|
|
assert p.verify_token(token=_strong_secret()) is None
|
|
|
|
def test_verify_token_rejects_empty(self, drain):
|
|
p = drain.DrainSecretProvider(secret=_strong_secret())
|
|
assert p.verify_token(token="") is None
|
|
|
|
def test_custom_scope_attached(self, drain):
|
|
s = _strong_secret()
|
|
p = drain.DrainSecretProvider(secret=s, scope="lifecycle")
|
|
assert p.verify_token(token=s).scopes == ("lifecycle",)
|
|
|
|
def test_construction_rejects_weak_secret(self, drain):
|
|
with pytest.raises(ValueError):
|
|
drain.DrainSecretProvider(secret="weak")
|
|
|
|
def test_verify_session_returns_none_not_raises(self, drain):
|
|
# Stacks harmlessly in the cookie-verify loop.
|
|
p = drain.DrainSecretProvider(secret=_strong_secret())
|
|
assert p.verify_session(access_token="anything") is None
|
|
|
|
def test_interactive_methods_raise(self, drain):
|
|
p = drain.DrainSecretProvider(secret=_strong_secret())
|
|
with pytest.raises(NotImplementedError):
|
|
p.start_login(redirect_uri="r")
|
|
with pytest.raises(NotImplementedError):
|
|
p.complete_login(code="c", state="s", code_verifier="v", redirect_uri="r")
|
|
with pytest.raises(NotImplementedError):
|
|
p.refresh_session(refresh_token="r")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# register() entry point
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestRegister:
|
|
def test_skips_when_no_secret(self, drain, monkeypatch):
|
|
monkeypatch.setattr(drain, "_load_config_drain_auth_section", lambda: {})
|
|
ctx = MagicMock()
|
|
drain.register(ctx)
|
|
ctx.register_dashboard_auth_provider.assert_not_called()
|
|
assert "HERMES_DASHBOARD_DRAIN_SECRET" in drain.LAST_SKIP_REASON
|
|
assert not token_auth.is_token_route(drain.DRAIN_ROUTE_PATH)
|
|
|
|
def test_skips_and_fails_closed_on_weak_secret(self, drain, monkeypatch):
|
|
monkeypatch.setenv("HERMES_DASHBOARD_DRAIN_SECRET", "tooweak")
|
|
monkeypatch.setattr(drain, "_load_config_drain_auth_section", lambda: {})
|
|
ctx = MagicMock()
|
|
drain.register(ctx)
|
|
ctx.register_dashboard_auth_provider.assert_not_called()
|
|
assert "rejected" in drain.LAST_SKIP_REASON
|
|
# fail-closed: the route is NOT token-authable, so it stays gated.
|
|
assert not token_auth.is_token_route(drain.DRAIN_ROUTE_PATH)
|
|
|
|
def test_registers_with_strong_env_secret(self, drain, monkeypatch):
|
|
s = _strong_secret()
|
|
monkeypatch.setenv("HERMES_DASHBOARD_DRAIN_SECRET", s)
|
|
monkeypatch.setattr(drain, "_load_config_drain_auth_section", lambda: {})
|
|
ctx = MagicMock()
|
|
drain.register(ctx)
|
|
ctx.register_dashboard_auth_provider.assert_called_once()
|
|
provider = ctx.register_dashboard_auth_provider.call_args.args[0]
|
|
assert isinstance(provider, drain.DrainSecretProvider)
|
|
assert provider.verify_token(token=s) is not None
|
|
assert drain.LAST_SKIP_REASON == ""
|
|
# The drain endpoint is now token-authable.
|
|
assert token_auth.is_token_route(drain.DRAIN_ROUTE_PATH)
|
|
|
|
def test_config_scope_applied(self, drain, monkeypatch):
|
|
s = _strong_secret()
|
|
monkeypatch.setenv("HERMES_DASHBOARD_DRAIN_SECRET", s)
|
|
monkeypatch.setattr(
|
|
drain, "_load_config_drain_auth_section", lambda: {"scope": "lifecycle"}
|
|
)
|
|
ctx = MagicMock()
|
|
drain.register(ctx)
|
|
provider = ctx.register_dashboard_auth_provider.call_args.args[0]
|
|
assert provider.verify_token(token=s).scopes == ("lifecycle",)
|
|
|
|
def test_config_min_secret_chars_can_reject_otherwise_ok_secret(
|
|
self, drain, monkeypatch
|
|
):
|
|
s = _strong_secret() # 43 chars — fine by default, too short at 999
|
|
monkeypatch.setenv("HERMES_DASHBOARD_DRAIN_SECRET", s)
|
|
monkeypatch.setattr(
|
|
drain,
|
|
"_load_config_drain_auth_section",
|
|
lambda: {"min_secret_chars": 999},
|
|
)
|
|
ctx = MagicMock()
|
|
drain.register(ctx)
|
|
ctx.register_dashboard_auth_provider.assert_not_called()
|
|
assert "rejected" in drain.LAST_SKIP_REASON
|