hermes-agent/tests/plugins/dashboard_auth/test_drain_provider.py
Nacho Avecilla dbe734beff
fix(dashboard-auth): exclude non-interactive providers from interactive login surfaces (#53239)
* Return None instead of erroring on drain login failure

* Fix login on drain

* Remove login for drained endpoints flow and clean the code

* chore: drop unrelated credits changes from this PR

* Remove extra comments that were not really necessary
2026-06-27 10:08:13 +10:00

189 lines
7.5 KiB
Python

"""Tests for the DrainSecretProvider plugin (non-interactive bearer secret).
Task 2.0b. Loads the bundled drain plugin module directly and exercises:
* the entropy gate (assess_secret_strength) — fail-closed on weak secrets,
* constant-time verify_token returning a scoped TokenPrincipal,
* the register(ctx) entry point's env/config resolution, skip reasons, and
token-route registration.
"""
from __future__ import annotations
import secrets
from unittest.mock import MagicMock
import pytest
import plugins.dashboard_auth.drain as drain_plugin
from hermes_cli.dashboard_auth import TokenPrincipal, assert_protocol_compliance
from hermes_cli.dashboard_auth import token_auth
@pytest.fixture(scope="module")
def drain():
return drain_plugin
@pytest.fixture(autouse=True)
def _clean_env_and_routes(monkeypatch):
monkeypatch.delenv("HERMES_DASHBOARD_DRAIN_SECRET", raising=False)
token_auth.clear_token_routes()
yield
token_auth.clear_token_routes()
def _strong_secret() -> str:
# token_urlsafe(32) → 43 url-safe-b64 chars ≈ 256 bits.
return secrets.token_urlsafe(32)
# ---------------------------------------------------------------------------
# Entropy gate
# ---------------------------------------------------------------------------
class TestEntropyGate:
def test_strong_secret_passes(self, drain):
assert drain.assess_secret_strength(_strong_secret()) is None
def test_empty_rejected(self, drain):
assert drain.assess_secret_strength("") is not None
def test_too_short_rejected(self, drain):
# 42 chars — one under the 43-char bar.
assert drain.assess_secret_strength("a1B2c3" * 7) is not None
def test_long_but_repeated_rejected(self, drain):
# 60 chars, one distinct character → low distinct count + low entropy.
assert drain.assess_secret_strength("a" * 60) is not None
def test_long_but_few_distinct_rejected(self, drain):
# 60 chars cycling through only 4 distinct characters.
assert drain.assess_secret_strength("abcd" * 15) is not None
def test_custom_min_chars_enforced(self, drain):
s = _strong_secret() # 43 chars
assert drain.assess_secret_strength(s, min_chars=999) is not None
# ---------------------------------------------------------------------------
# Provider behaviour
# ---------------------------------------------------------------------------
class TestProvider:
def test_protocol_compliance(self, drain):
assert_protocol_compliance(drain.DrainSecretProvider)
def test_supports_token_flag(self, drain):
p = drain.DrainSecretProvider(secret=_strong_secret())
assert p.supports_token is True
def test_is_non_interactive(self, drain):
# Excluded from interactive surfaces via list_session_providers().
p = drain.DrainSecretProvider(secret=_strong_secret())
assert p.supports_session is False
def test_verify_token_accepts_matching_secret(self, drain):
s = _strong_secret()
p = drain.DrainSecretProvider(secret=s, scope="drain")
principal = p.verify_token(token=s)
assert isinstance(principal, TokenPrincipal)
assert principal.principal == "drain-control"
assert principal.provider == "drain-secret"
assert principal.scopes == ("drain",)
def test_verify_token_rejects_wrong_secret(self, drain):
p = drain.DrainSecretProvider(secret=_strong_secret())
assert p.verify_token(token=_strong_secret()) is None
def test_verify_token_rejects_empty(self, drain):
p = drain.DrainSecretProvider(secret=_strong_secret())
assert p.verify_token(token="") is None
def test_custom_scope_attached(self, drain):
s = _strong_secret()
p = drain.DrainSecretProvider(secret=s, scope="lifecycle")
assert p.verify_token(token=s).scopes == ("lifecycle",)
def test_construction_rejects_weak_secret(self, drain):
with pytest.raises(ValueError):
drain.DrainSecretProvider(secret="weak")
def test_verify_session_returns_none_not_raises(self, drain):
# Stacks harmlessly in the cookie-verify loop.
p = drain.DrainSecretProvider(secret=_strong_secret())
assert p.verify_session(access_token="anything") is None
def test_interactive_methods_raise(self, drain):
p = drain.DrainSecretProvider(secret=_strong_secret())
with pytest.raises(NotImplementedError):
p.start_login(redirect_uri="r")
with pytest.raises(NotImplementedError):
p.complete_login(code="c", state="s", code_verifier="v", redirect_uri="r")
with pytest.raises(NotImplementedError):
p.refresh_session(refresh_token="r")
# ---------------------------------------------------------------------------
# register() entry point
# ---------------------------------------------------------------------------
class TestRegister:
def test_skips_when_no_secret(self, drain, monkeypatch):
monkeypatch.setattr(drain, "_load_config_drain_auth_section", lambda: {})
ctx = MagicMock()
drain.register(ctx)
ctx.register_dashboard_auth_provider.assert_not_called()
assert "HERMES_DASHBOARD_DRAIN_SECRET" in drain.LAST_SKIP_REASON
assert not token_auth.is_token_route(drain.DRAIN_ROUTE_PATH)
def test_skips_and_fails_closed_on_weak_secret(self, drain, monkeypatch):
monkeypatch.setenv("HERMES_DASHBOARD_DRAIN_SECRET", "tooweak")
monkeypatch.setattr(drain, "_load_config_drain_auth_section", lambda: {})
ctx = MagicMock()
drain.register(ctx)
ctx.register_dashboard_auth_provider.assert_not_called()
assert "rejected" in drain.LAST_SKIP_REASON
# fail-closed: the route is NOT token-authable, so it stays gated.
assert not token_auth.is_token_route(drain.DRAIN_ROUTE_PATH)
def test_registers_with_strong_env_secret(self, drain, monkeypatch):
s = _strong_secret()
monkeypatch.setenv("HERMES_DASHBOARD_DRAIN_SECRET", s)
monkeypatch.setattr(drain, "_load_config_drain_auth_section", lambda: {})
ctx = MagicMock()
drain.register(ctx)
ctx.register_dashboard_auth_provider.assert_called_once()
provider = ctx.register_dashboard_auth_provider.call_args.args[0]
assert isinstance(provider, drain.DrainSecretProvider)
assert provider.verify_token(token=s) is not None
assert drain.LAST_SKIP_REASON == ""
# The drain endpoint is now token-authable.
assert token_auth.is_token_route(drain.DRAIN_ROUTE_PATH)
def test_config_scope_applied(self, drain, monkeypatch):
s = _strong_secret()
monkeypatch.setenv("HERMES_DASHBOARD_DRAIN_SECRET", s)
monkeypatch.setattr(
drain, "_load_config_drain_auth_section", lambda: {"scope": "lifecycle"}
)
ctx = MagicMock()
drain.register(ctx)
provider = ctx.register_dashboard_auth_provider.call_args.args[0]
assert provider.verify_token(token=s).scopes == ("lifecycle",)
def test_config_min_secret_chars_can_reject_otherwise_ok_secret(
self, drain, monkeypatch
):
s = _strong_secret() # 43 chars — fine by default, too short at 999
monkeypatch.setenv("HERMES_DASHBOARD_DRAIN_SECRET", s)
monkeypatch.setattr(
drain,
"_load_config_drain_auth_section",
lambda: {"min_secret_chars": 999},
)
ctx = MagicMock()
drain.register(ctx)
ctx.register_dashboard_auth_provider.assert_not_called()
assert "rejected" in drain.LAST_SKIP_REASON