hermes-agent/tests/plugins/dashboard_auth/test_drain_provider.py
Ben 2e322466b1 feat(dashboard-auth): drain shared-bearer-secret provider plugin
Task 2.0b: the concrete shared-bearer-secret auth provider, the FIRST consumer
of the generic token-auth capability (Task 2.0a). Implements decisions.md Q-A.

plugins/dashboard_auth/drain/ (bundled, discovered like dashboard_auth/basic):
- DrainSecretProvider: non-interactive provider, supports_token=True. Verifies
  an inbound Authorization bearer token against a per-agent shared secret with
  hmac.compare_digest (constant-time, no timing oracle) and, on a match,
  vouches for the caller as the "drain-control" principal scoped to "drain".
  The five interactive ABC methods raise NotImplementedError; verify_session
  returns None (stacks harmlessly in the cookie-verify loop).
- assess_secret_strength(): fail-closed entropy gate. Rejects secrets shorter
  than 43 url-safe-b64 chars (~256 bits), with < 16 distinct characters, or
  below 128 bits Shannon entropy — so a weak/structured/repeated secret can
  never be silently accepted. Enforced both at register() (friendly skip
  reason) and in __init__ (raises — defence in depth).
- register(ctx): no-op + skip reason when HERMES_DASHBOARD_DRAIN_SECRET is
  unset; rejects a weak secret fail-closed (drain endpoint stays gated). On a
  strong secret, registers the provider AND opts /api/gateway/drain into the
  generic token-auth seam via register_token_route().

Config: the secret is a CREDENTIAL → carried via HERMES_DASHBOARD_DRAIN_SECRET
(per-agent, provisioned by NAS at deploy). Behavioural knobs only
(dashboard.drain_auth.{scope,min_secret_chars}) live in config.yaml — added to
DEFAULT_CONFIG with the .env-is-for-secrets rationale documented inline.

Tests: tests/plugins/dashboard_auth/test_drain_provider.py — entropy gate
(strong pass; empty/short/repeated/few-distinct/custom-min reject), verify_token
(match → scoped principal, wrong/empty → None, custom scope), protocol
compliance, interactive-methods-raise, and register() (skip-no-secret,
fail-closed-weak-secret, strong-env-secret registers + route opt-in, config
scope + min_secret_chars). 21 new tests; drain + token-auth suites 44 passed.
Verified the plugin is discovered as dashboard_auth/drain alongside basic/nous.

Intentionally deferred:
- The begin/cancel-drain endpoint handler itself — Task 2.1.
- The dashboard→gateway control channel — Task 2.2.

Build status: dashboard-auth + drain-plugin suites green.
2026-06-26 00:47:19 -07:00

184 lines
7.3 KiB
Python

"""Tests for the DrainSecretProvider plugin (non-interactive bearer secret).
Task 2.0b. Loads the bundled drain plugin module directly and exercises:
* the entropy gate (assess_secret_strength) — fail-closed on weak secrets,
* constant-time verify_token returning a scoped TokenPrincipal,
* the register(ctx) entry point's env/config resolution, skip reasons, and
token-route registration.
"""
from __future__ import annotations
import secrets
from unittest.mock import MagicMock
import pytest
import plugins.dashboard_auth.drain as drain_plugin
from hermes_cli.dashboard_auth import TokenPrincipal, assert_protocol_compliance
from hermes_cli.dashboard_auth import token_auth
@pytest.fixture(scope="module")
def drain():
return drain_plugin
@pytest.fixture(autouse=True)
def _clean_env_and_routes(monkeypatch):
monkeypatch.delenv("HERMES_DASHBOARD_DRAIN_SECRET", raising=False)
token_auth.clear_token_routes()
yield
token_auth.clear_token_routes()
def _strong_secret() -> str:
# token_urlsafe(32) → 43 url-safe-b64 chars ≈ 256 bits.
return secrets.token_urlsafe(32)
# ---------------------------------------------------------------------------
# Entropy gate
# ---------------------------------------------------------------------------
class TestEntropyGate:
def test_strong_secret_passes(self, drain):
assert drain.assess_secret_strength(_strong_secret()) is None
def test_empty_rejected(self, drain):
assert drain.assess_secret_strength("") is not None
def test_too_short_rejected(self, drain):
# 42 chars — one under the 43-char bar.
assert drain.assess_secret_strength("a1B2c3" * 7) is not None
def test_long_but_repeated_rejected(self, drain):
# 60 chars, one distinct character → low distinct count + low entropy.
assert drain.assess_secret_strength("a" * 60) is not None
def test_long_but_few_distinct_rejected(self, drain):
# 60 chars cycling through only 4 distinct characters.
assert drain.assess_secret_strength("abcd" * 15) is not None
def test_custom_min_chars_enforced(self, drain):
s = _strong_secret() # 43 chars
assert drain.assess_secret_strength(s, min_chars=999) is not None
# ---------------------------------------------------------------------------
# Provider behaviour
# ---------------------------------------------------------------------------
class TestProvider:
def test_protocol_compliance(self, drain):
assert_protocol_compliance(drain.DrainSecretProvider)
def test_supports_token_flag(self, drain):
p = drain.DrainSecretProvider(secret=_strong_secret())
assert p.supports_token is True
def test_verify_token_accepts_matching_secret(self, drain):
s = _strong_secret()
p = drain.DrainSecretProvider(secret=s, scope="drain")
principal = p.verify_token(token=s)
assert isinstance(principal, TokenPrincipal)
assert principal.principal == "drain-control"
assert principal.provider == "drain-secret"
assert principal.scopes == ("drain",)
def test_verify_token_rejects_wrong_secret(self, drain):
p = drain.DrainSecretProvider(secret=_strong_secret())
assert p.verify_token(token=_strong_secret()) is None
def test_verify_token_rejects_empty(self, drain):
p = drain.DrainSecretProvider(secret=_strong_secret())
assert p.verify_token(token="") is None
def test_custom_scope_attached(self, drain):
s = _strong_secret()
p = drain.DrainSecretProvider(secret=s, scope="lifecycle")
assert p.verify_token(token=s).scopes == ("lifecycle",)
def test_construction_rejects_weak_secret(self, drain):
with pytest.raises(ValueError):
drain.DrainSecretProvider(secret="weak")
def test_verify_session_returns_none_not_raises(self, drain):
# Stacks harmlessly in the cookie-verify loop.
p = drain.DrainSecretProvider(secret=_strong_secret())
assert p.verify_session(access_token="anything") is None
def test_interactive_methods_raise(self, drain):
p = drain.DrainSecretProvider(secret=_strong_secret())
with pytest.raises(NotImplementedError):
p.start_login(redirect_uri="r")
with pytest.raises(NotImplementedError):
p.complete_login(code="c", state="s", code_verifier="v", redirect_uri="r")
with pytest.raises(NotImplementedError):
p.refresh_session(refresh_token="r")
# ---------------------------------------------------------------------------
# register() entry point
# ---------------------------------------------------------------------------
class TestRegister:
def test_skips_when_no_secret(self, drain, monkeypatch):
monkeypatch.setattr(drain, "_load_config_drain_auth_section", lambda: {})
ctx = MagicMock()
drain.register(ctx)
ctx.register_dashboard_auth_provider.assert_not_called()
assert "HERMES_DASHBOARD_DRAIN_SECRET" in drain.LAST_SKIP_REASON
assert not token_auth.is_token_route(drain.DRAIN_ROUTE_PATH)
def test_skips_and_fails_closed_on_weak_secret(self, drain, monkeypatch):
monkeypatch.setenv("HERMES_DASHBOARD_DRAIN_SECRET", "tooweak")
monkeypatch.setattr(drain, "_load_config_drain_auth_section", lambda: {})
ctx = MagicMock()
drain.register(ctx)
ctx.register_dashboard_auth_provider.assert_not_called()
assert "rejected" in drain.LAST_SKIP_REASON
# fail-closed: the route is NOT token-authable, so it stays gated.
assert not token_auth.is_token_route(drain.DRAIN_ROUTE_PATH)
def test_registers_with_strong_env_secret(self, drain, monkeypatch):
s = _strong_secret()
monkeypatch.setenv("HERMES_DASHBOARD_DRAIN_SECRET", s)
monkeypatch.setattr(drain, "_load_config_drain_auth_section", lambda: {})
ctx = MagicMock()
drain.register(ctx)
ctx.register_dashboard_auth_provider.assert_called_once()
provider = ctx.register_dashboard_auth_provider.call_args.args[0]
assert isinstance(provider, drain.DrainSecretProvider)
assert provider.verify_token(token=s) is not None
assert drain.LAST_SKIP_REASON == ""
# The drain endpoint is now token-authable.
assert token_auth.is_token_route(drain.DRAIN_ROUTE_PATH)
def test_config_scope_applied(self, drain, monkeypatch):
s = _strong_secret()
monkeypatch.setenv("HERMES_DASHBOARD_DRAIN_SECRET", s)
monkeypatch.setattr(
drain, "_load_config_drain_auth_section", lambda: {"scope": "lifecycle"}
)
ctx = MagicMock()
drain.register(ctx)
provider = ctx.register_dashboard_auth_provider.call_args.args[0]
assert provider.verify_token(token=s).scopes == ("lifecycle",)
def test_config_min_secret_chars_can_reject_otherwise_ok_secret(
self, drain, monkeypatch
):
s = _strong_secret() # 43 chars — fine by default, too short at 999
monkeypatch.setenv("HERMES_DASHBOARD_DRAIN_SECRET", s)
monkeypatch.setattr(
drain,
"_load_config_drain_auth_section",
lambda: {"min_secret_chars": 999},
)
ctx = MagicMock()
drain.register(ctx)
ctx.register_dashboard_auth_provider.assert_not_called()
assert "rejected" in drain.LAST_SKIP_REASON