feat(dashboard-auth): define DashboardAuthProvider ABC + Session dataclass

Phase 1, Task 1.1. New package hermes_cli/dashboard_auth/ contains:

  base.py     - DashboardAuthProvider ABC with 5 abstract methods
                (start_login, complete_login, verify_session,
                refresh_session, revoke_session), Session + LoginStart
                frozen dataclasses, three exception types
                (ProviderError / InvalidCodeError / RefreshExpiredError),
                and assert_protocol_compliance() for plugins to call
                in their own tests.
  registry.py - Module-level register/get/list/clear with a lock.

Nothing reads the registry yet — Phase 2 adds the StubAuthProvider and
Phase 3 wires the gate middleware. The plugin hook lands in Task 1.3.
This commit is contained in:
Ben 2026-05-21 15:07:02 +10:00 committed by Teknium
parent 949ad95e4b
commit 2dc6d03a3d
4 changed files with 373 additions and 0 deletions

View file

@ -0,0 +1,40 @@
"""Dashboard authentication provider framework.
The dashboard auth gate engages only when the dashboard binds to a
non-loopback host without ``--insecure``. In that mode, every request must
carry a verified session from one of the registered ``DashboardAuthProvider``
plugins.
The Nous provider lives in ``plugins/dashboard-auth-nous/`` and is the
default. Third parties register their own providers via the plugin hook
``ctx.register_dashboard_auth_provider``.
"""
from hermes_cli.dashboard_auth.base import (
DashboardAuthProvider,
Session,
LoginStart,
InvalidCodeError,
ProviderError,
RefreshExpiredError,
assert_protocol_compliance,
)
from hermes_cli.dashboard_auth.registry import (
register_provider,
get_provider,
list_providers,
clear_providers,
)
__all__ = [
"DashboardAuthProvider",
"Session",
"LoginStart",
"InvalidCodeError",
"ProviderError",
"RefreshExpiredError",
"assert_protocol_compliance",
"register_provider",
"get_provider",
"list_providers",
"clear_providers",
]

View file

@ -0,0 +1,158 @@
"""Abstract base + dataclasses + exceptions for dashboard auth providers."""
from __future__ import annotations
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Optional
@dataclass(frozen=True)
class Session:
"""A verified identity. Returned by ``complete_login`` and ``verify_session``.
All fields are mandatory. Providers that don't have a concept of orgs
should set ``org_id`` to an empty string. ``access_token`` and
``refresh_token`` are opaque to Hermes provider-specific.
"""
user_id: str
email: str
display_name: str
org_id: str
provider: str
expires_at: int # unix seconds; the access_token's exp claim
access_token: str
refresh_token: str
@dataclass(frozen=True)
class LoginStart:
"""First leg of the OAuth round trip.
``redirect_url`` is the URL the browser must navigate to (e.g. the
Portal's ``/oauth/authorize``). ``cookie_payload`` is a dict of cookie
name serialised value that the auth route will ``Set-Cookie`` on the
response. Used for PKCE state, CSRF nonces, etc. Cookies set here MUST
be HttpOnly + Secure (when over HTTPS) + SameSite=Lax with a TTL 10
minutes (the login lifetime).
"""
redirect_url: str
cookie_payload: dict[str, str]
class ProviderError(Exception):
"""IDP unreachable, network error, or other transient failure.
Middleware translates this to HTTP 503.
"""
class InvalidCodeError(Exception):
"""The OAuth callback ``code`` / ``state`` failed validation.
Middleware translates this to HTTP 400.
"""
class RefreshExpiredError(Exception):
"""The refresh token is dead.
Middleware clears cookies and forces re-login (302 ``/login``).
"""
class DashboardAuthProvider(ABC):
"""Protocol every dashboard-auth provider plugin implements.
Lifecycle:
1. ``start_login`` user clicks "Log in with X" on the login page.
Provider returns a redirect URL and any PKCE/CSRF state to stash
in short-lived cookies.
2. Browser bounces through the OAuth IDP and lands at /auth/callback.
3. ``complete_login`` exchange the code + verifier for a Session.
4. ``verify_session`` called on every request to validate the
access token in the cookie. Returns ``None`` if the token is
expired or invalid (middleware then triggers refresh or logout).
5. ``refresh_session`` called when the access token is near expiry.
Returns a new Session with rotated tokens.
6. ``revoke_session`` called on /auth/logout. Best-effort.
Failure semantics:
* ``start_login`` may raise ``ProviderError`` if the IDP is
unreachable.
* ``complete_login`` raises ``InvalidCodeError`` on bad code/state;
``ProviderError`` if the IDP is unreachable.
* ``verify_session`` returns ``None`` on expiry / unknown token;
raises ``ProviderError`` if the IDP is unreachable. Middleware
treats expiry and unreachable differently (expiry refresh;
unreachable 503).
* ``refresh_session`` raises ``RefreshExpiredError`` when the
refresh token is also invalid; middleware then forces re-login.
Raises ``ProviderError`` on network failure.
* ``revoke_session`` is best-effort and must not raise.
Subclasses MUST set ``name`` (lowercase identifier, stable forever)
and ``display_name`` (user-facing label on the login page).
"""
name: str = ""
display_name: str = ""
@abstractmethod
def start_login(self, *, redirect_uri: str) -> LoginStart: ...
@abstractmethod
def complete_login(
self,
*,
code: str,
state: str,
code_verifier: str,
redirect_uri: str,
) -> Session: ...
@abstractmethod
def verify_session(self, *, access_token: str) -> Optional[Session]: ...
@abstractmethod
def refresh_session(self, *, refresh_token: str) -> Session: ...
@abstractmethod
def revoke_session(self, *, refresh_token: str) -> None: ...
def assert_protocol_compliance(cls: type) -> None:
"""Raise ``TypeError`` if ``cls`` doesn't fully implement the provider protocol.
Call this in every provider plugin's unit tests::
def test_protocol_compliance():
assert_protocol_compliance(MyProvider)
Returns ``None`` on success so callers can assert it explicitly.
"""
required_methods = (
"start_login",
"complete_login",
"verify_session",
"refresh_session",
"revoke_session",
)
required_attrs = ("name", "display_name")
for attr in required_attrs:
val = getattr(cls, attr, "")
if not val:
raise TypeError(
f"{cls.__name__} missing or empty attribute: {attr!r}"
)
for method in required_methods:
if not callable(getattr(cls, method, None)):
raise TypeError(f"{cls.__name__} missing method: {method}")
# Also catch the ABC-not-overridden case.
if getattr(cls, "__abstractmethods__", None):
raise TypeError(
f"{cls.__name__} has unimplemented abstract methods: "
f"{sorted(cls.__abstractmethods__)}"
)

View file

@ -0,0 +1,58 @@
"""Module-level registry for DashboardAuthProvider instances.
Plugins call ``register_provider`` via the plugin context hook at startup.
The auth gate middleware iterates ``list_providers()`` and uses
``get_provider`` to dispatch on the session's ``provider`` field.
"""
from __future__ import annotations
import logging
import threading
from typing import List, Optional
from hermes_cli.dashboard_auth.base import (
DashboardAuthProvider,
assert_protocol_compliance,
)
_log = logging.getLogger(__name__)
_lock = threading.Lock()
_providers: dict[str, DashboardAuthProvider] = {}
def register_provider(provider: DashboardAuthProvider) -> None:
"""Register a provider.
Raises:
TypeError: on protocol violation.
ValueError: if a provider with the same name is already registered.
"""
assert_protocol_compliance(type(provider))
with _lock:
if provider.name in _providers:
raise ValueError(
f"dashboard-auth provider already registered: {provider.name!r}"
)
_providers[provider.name] = provider
_log.info(
"dashboard-auth: registered provider %r (%s)",
provider.name, provider.display_name,
)
def get_provider(name: str) -> Optional[DashboardAuthProvider]:
"""Return the registered provider for ``name``, or None if unknown."""
with _lock:
return _providers.get(name)
def list_providers() -> List[DashboardAuthProvider]:
"""All registered providers, in registration order."""
with _lock:
return list(_providers.values())
def clear_providers() -> None:
"""Test-only: drop all registrations."""
with _lock:
_providers.clear()

View file

@ -0,0 +1,117 @@
"""Contract test for DashboardAuthProvider implementations.
Every provider plugin should call ``assert_protocol_compliance`` on its
provider class in its own unit test. This module tests the abstract base
itself: dataclass fields, ABC rejection of partial impls, and the
protocol-compliance helper.
"""
from __future__ import annotations
import pytest
from hermes_cli.dashboard_auth.base import (
DashboardAuthProvider,
Session,
LoginStart,
assert_protocol_compliance,
)
# ---------------------------------------------------------------------------
# Dataclasses
# ---------------------------------------------------------------------------
def test_session_has_required_fields():
s = Session(
user_id="u1",
email="a@b.com",
display_name="A",
org_id="org_1",
provider="test",
expires_at=1234567890,
access_token="at",
refresh_token="rt",
)
assert s.user_id == "u1"
assert s.provider == "test"
assert s.expires_at == 1234567890
def test_login_start_has_redirect_and_state():
ls = LoginStart(
redirect_url="https://portal/authorize?...",
cookie_payload={"hermes_session_pkce": "verifier=abc;state=xyz"},
)
assert ls.redirect_url.startswith("https://")
assert "hermes_session_pkce" in ls.cookie_payload
# ---------------------------------------------------------------------------
# ABC enforcement
# ---------------------------------------------------------------------------
def test_abstract_provider_cannot_be_instantiated():
with pytest.raises(TypeError):
DashboardAuthProvider() # type: ignore[abstract]
class _BrokenProvider(DashboardAuthProvider):
name = "broken"
display_name = "Broken"
# Deliberately missing all the methods.
def test_assert_protocol_compliance_rejects_partial_impl():
with pytest.raises(TypeError):
assert_protocol_compliance(_BrokenProvider)
class _CompliantProvider(DashboardAuthProvider):
name = "ok"
display_name = "OK"
def start_login(self, *, redirect_uri: str) -> LoginStart:
return LoginStart(redirect_url="x", cookie_payload={})
def complete_login(self, *, code, state, code_verifier, redirect_uri) -> Session:
return Session(
user_id="u", email="x", display_name="x", org_id="o",
provider=self.name, expires_at=0,
access_token="a", refresh_token="r",
)
def verify_session(self, *, access_token: str):
return None
def refresh_session(self, *, refresh_token: str) -> Session:
return Session(
user_id="u", email="x", display_name="x", org_id="o",
provider=self.name, expires_at=0,
access_token="a", refresh_token="r",
)
def revoke_session(self, *, refresh_token: str) -> None:
return None
def test_assert_protocol_compliance_accepts_full_impl():
# Returns None on success; the helper raises on failure.
assert assert_protocol_compliance(_CompliantProvider) is None
def test_assert_protocol_compliance_rejects_missing_name_attr():
class NoName(_CompliantProvider):
name = "" # empty is treated as missing
with pytest.raises(TypeError, match="name"):
assert_protocol_compliance(NoName)
def test_assert_protocol_compliance_rejects_missing_display_name():
class NoDisplay(_CompliantProvider):
display_name = ""
with pytest.raises(TypeError, match="display_name"):
assert_protocol_compliance(NoDisplay)