hermes-agent/tests/hermes_cli/test_dashboard_token_auth.py
Ben cb9cb6ba1c feat(dashboard-auth): generic non-interactive API-token capability
Task 2.0a of the safe-shutdown drain-coordination plan. Widens the dashboard
auth framework GENERICALLY to support non-interactive (service-to-service)
bearer-token auth, mirroring the existing supports_password precedent. This is
a reusable capability — any future machine-credential provider plugs in without
core changes (decisions.md Q-C). The drain bearer-secret plugin (Task 2.0b) is
the first consumer, not the definition.

- base.py: add TokenPrincipal dataclass (the token analog of Session) +
  supports_token capability flag + verify_token() on the ABC (default raises
  NotImplementedError so a misconfigured provider fails loud). Contract mirrors
  verify_session stacking: return None for unrecognised tokens (never raise),
  raise ProviderError only on a genuine backing-store outage.
- registry.py: list_token_providers() — the supports_token subset, in
  registration order. Empty when none registered (token routes fail closed).
- token_auth.py (new): route-agnostic seam. Routes opt in via
  register_token_route(exact path); token_auth_middleware owns the auth
  decision for those routes only — authenticate via stacked providers, attach
  request.state.token_principal + token_authenticated, pass through. 401 on
  missing/unrecognised token, 503 when a provider was unreachable, untouched
  passthrough for non-token routes. Fails closed (never open).
- web_server.py: install the seam OUTERMOST (registered last → runs first).
  Both downstream gates (legacy auth_middleware + gated_auth_middleware) honour
  request.state.token_authenticated and skip enforcement, so a token-authed
  service request is never bounced to /login.
- audit.py: TOKEN_AUTH_SUCCESS / TOKEN_AUTH_FAILURE events.

Tests: tests/hermes_cli/test_dashboard_token_auth.py — ABC flag default,
verify_token NotImplementedError, registry filter, bearer extraction
(case-insensitive scheme, malformed/non-bearer → ""), provider stacking
(first-match-wins, unreachable-remembered, unreachable-then-valid, buggy
provider doesn't crash the gate), and the seam's passthrough/401/503/
fail-closed behaviour. 29 new tests; full dashboard-auth suite 169 passed.

Intentionally deferred:
- The concrete shared-bearer-secret provider plugin — Task 2.0b.
- The begin/cancel-drain endpoint that registers itself as a token route —
  Task 2.1.

Build status: dashboard-auth + plugin-hook suites green.
2026-06-26 00:47:19 -07:00

316 lines
10 KiB
Python

"""Contract tests for the generic non-interactive (bearer-token) auth seam.
Covers Task 2.0a: the reusable token-auth capability in the dashboard auth
framework — NOT the drain plugin (that's 2.0b/2.1). Asserts the ABC capability
flag, the registry filter, bearer extraction, provider stacking (verify_token),
and the route-agnostic middleware seam's fail-closed / 503 / pass-through
behaviour.
"""
from __future__ import annotations
import asyncio
from typing import Optional
import pytest
from hermes_cli.dashboard_auth import (
DashboardAuthProvider,
LoginStart,
Session,
TokenPrincipal,
clear_providers,
list_token_providers,
register_provider,
)
from hermes_cli.dashboard_auth.base import ProviderError
from hermes_cli.dashboard_auth import token_auth
# --------------------------------------------------------------------------
# Test doubles
# --------------------------------------------------------------------------
class _OAuthOnly(DashboardAuthProvider):
"""A pure interactive provider — never token-authable."""
name = "oauth-only"
display_name = "OAuth Only"
def start_login(self, *, redirect_uri):
return LoginStart(redirect_url="x", cookie_payload={})
def complete_login(self, *, code, state, code_verifier, redirect_uri):
return Session("u", "e", "n", "o", self.name, 0, "a", "r")
def verify_session(self, *, access_token):
return None
def refresh_session(self, *, refresh_token):
return Session("u", "e", "n", "o", self.name, 0, "a", "r")
def revoke_session(self, *, refresh_token):
return None
class _TokenProvider(_OAuthOnly):
"""A token provider that accepts exactly one secret."""
name = "tok"
display_name = "Token Provider"
supports_token = True
def __init__(self, *, secret: str = "good-secret", scopes=("drain",)):
self._secret = secret
self._scopes = tuple(scopes)
def verify_token(self, *, token: str) -> Optional[TokenPrincipal]:
if token == self._secret:
return TokenPrincipal(
principal=self.name, provider=self.name, scopes=self._scopes
)
return None
class _UnreachableTokenProvider(_OAuthOnly):
name = "tok-down"
display_name = "Unreachable Token Provider"
supports_token = True
def verify_token(self, *, token: str) -> Optional[TokenPrincipal]:
raise ProviderError("backing store down")
class _BuggyTokenProvider(_OAuthOnly):
name = "tok-buggy"
display_name = "Buggy Token Provider"
supports_token = True
def verify_token(self, *, token: str) -> Optional[TokenPrincipal]:
raise RuntimeError("kaboom")
# --------------------------------------------------------------------------
# Fixtures
# --------------------------------------------------------------------------
@pytest.fixture(autouse=True)
def _isolated_state():
clear_providers()
token_auth.clear_token_routes()
yield
clear_providers()
token_auth.clear_token_routes()
class _FakeURL:
def __init__(self, path):
self.path = path
class _FakeClient:
host = "1.2.3.4"
class _FakeRequest:
"""Minimal Request stand-in for the seam (no real Starlette needed)."""
def __init__(self, path="/api/gateway/drain", headers=None):
self.url = _FakeURL(path)
self.headers = headers or {}
self.client = _FakeClient()
class _State:
pass
self.state = _State()
def _run(coro):
return asyncio.run(coro)
# --------------------------------------------------------------------------
# ABC + registry
# --------------------------------------------------------------------------
def test_oauth_provider_defaults_supports_token_false():
assert _OAuthOnly().supports_token is False
def test_oauth_provider_verify_token_raises_not_implemented():
with pytest.raises(NotImplementedError):
_OAuthOnly().verify_token(token="x")
def test_list_token_providers_filters_to_supports_token():
register_provider(_OAuthOnly())
register_provider(_TokenProvider())
names = [p.name for p in list_token_providers()]
assert names == ["tok"]
def test_list_token_providers_empty_when_none_registered():
register_provider(_OAuthOnly())
assert list_token_providers() == []
# --------------------------------------------------------------------------
# Bearer extraction
# --------------------------------------------------------------------------
@pytest.mark.parametrize(
"header,expected",
[
("Bearer abc123", "abc123"),
("bearer abc123", "abc123"),
("BEARER abc123", "abc123"),
("Bearer spaced ", "spaced"),
("Basic abc123", ""),
("abc123", ""),
("", ""),
],
)
def test_extract_bearer_token(header, expected):
req = _FakeRequest(headers={"authorization": header} if header else {})
assert token_auth.extract_bearer_token(req) == expected
# --------------------------------------------------------------------------
# authenticate_token (provider stacking)
# --------------------------------------------------------------------------
def test_authenticate_token_accepts_valid():
register_provider(_TokenProvider(secret="good-secret"))
req = _FakeRequest(headers={"authorization": "Bearer good-secret"})
principal, unreachable = token_auth.authenticate_token(req)
assert unreachable is None
assert principal is not None
assert principal.provider == "tok"
assert principal.scopes == ("drain",)
def test_authenticate_token_rejects_wrong_secret():
register_provider(_TokenProvider(secret="good-secret"))
req = _FakeRequest(headers={"authorization": "Bearer wrong"})
principal, unreachable = token_auth.authenticate_token(req)
assert principal is None
assert unreachable is None
def test_authenticate_token_no_token_returns_none():
register_provider(_TokenProvider())
req = _FakeRequest(headers={})
principal, unreachable = token_auth.authenticate_token(req)
assert principal is None and unreachable is None
def test_authenticate_token_stacks_first_match_wins():
register_provider(_TokenProvider(secret="aaa"))
second = _TokenProvider(secret="bbb")
second.name = "tok2"
register_provider(second)
req = _FakeRequest(headers={"authorization": "Bearer bbb"})
principal, _ = token_auth.authenticate_token(req)
assert principal is not None and principal.provider == "tok2"
def test_authenticate_token_unreachable_remembered():
register_provider(_UnreachableTokenProvider())
req = _FakeRequest(headers={"authorization": "Bearer anything"})
principal, unreachable = token_auth.authenticate_token(req)
assert principal is None
assert unreachable == "tok-down"
def test_authenticate_token_unreachable_then_valid_provider_wins():
register_provider(_UnreachableTokenProvider())
register_provider(_TokenProvider(secret="good"))
req = _FakeRequest(headers={"authorization": "Bearer good"})
principal, unreachable = token_auth.authenticate_token(req)
# A later provider accepting the token beats the earlier outage.
assert principal is not None and principal.provider == "tok"
assert unreachable is None
def test_authenticate_token_buggy_provider_does_not_crash():
register_provider(_BuggyTokenProvider())
register_provider(_TokenProvider(secret="good"))
req = _FakeRequest(headers={"authorization": "Bearer good"})
principal, unreachable = token_auth.authenticate_token(req)
assert principal is not None and principal.provider == "tok"
# --------------------------------------------------------------------------
# Middleware seam (route-agnostic)
# --------------------------------------------------------------------------
async def _call_next_ok(request):
from fastapi.responses import JSONResponse
return JSONResponse({"ok": True}, status_code=200)
def test_seam_passthrough_for_unregistered_route():
register_provider(_TokenProvider())
req = _FakeRequest(path="/api/something-else")
resp = _run(token_auth.token_auth_middleware(req, _call_next_ok))
assert resp.status_code == 200
assert getattr(req.state, "token_authenticated", False) is False
def test_seam_accepts_valid_token_on_registered_route():
register_provider(_TokenProvider(secret="good"))
token_auth.register_token_route("/api/gateway/drain")
req = _FakeRequest(
path="/api/gateway/drain",
headers={"authorization": "Bearer good"},
)
resp = _run(token_auth.token_auth_middleware(req, _call_next_ok))
assert resp.status_code == 200
assert req.state.token_authenticated is True
assert req.state.token_principal.provider == "tok"
def test_seam_rejects_missing_token_401():
register_provider(_TokenProvider())
token_auth.register_token_route("/api/gateway/drain")
req = _FakeRequest(path="/api/gateway/drain", headers={})
resp = _run(token_auth.token_auth_middleware(req, _call_next_ok))
assert resp.status_code == 401
def test_seam_rejects_wrong_token_401():
register_provider(_TokenProvider(secret="good"))
token_auth.register_token_route("/api/gateway/drain")
req = _FakeRequest(
path="/api/gateway/drain", headers={"authorization": "Bearer bad"}
)
resp = _run(token_auth.token_auth_middleware(req, _call_next_ok))
assert resp.status_code == 401
def test_seam_fails_closed_when_no_token_provider():
# Route registered but NO supports_token provider → 401, never open.
register_provider(_OAuthOnly())
token_auth.register_token_route("/api/gateway/drain")
req = _FakeRequest(
path="/api/gateway/drain", headers={"authorization": "Bearer anything"}
)
resp = _run(token_auth.token_auth_middleware(req, _call_next_ok))
assert resp.status_code == 401
def test_seam_503_on_provider_unreachable():
register_provider(_UnreachableTokenProvider())
token_auth.register_token_route("/api/gateway/drain")
req = _FakeRequest(
path="/api/gateway/drain", headers={"authorization": "Bearer x"}
)
resp = _run(token_auth.token_auth_middleware(req, _call_next_ok))
assert resp.status_code == 503