feat(dashboard-auth): generic non-interactive API-token capability

Task 2.0a of the safe-shutdown drain-coordination plan. Widens the dashboard
auth framework GENERICALLY to support non-interactive (service-to-service)
bearer-token auth, mirroring the existing supports_password precedent. This is
a reusable capability — any future machine-credential provider plugs in without
core changes (decisions.md Q-C). The drain bearer-secret plugin (Task 2.0b) is
the first consumer, not the definition.

- base.py: add TokenPrincipal dataclass (the token analog of Session) +
  supports_token capability flag + verify_token() on the ABC (default raises
  NotImplementedError so a misconfigured provider fails loud). Contract mirrors
  verify_session stacking: return None for unrecognised tokens (never raise),
  raise ProviderError only on a genuine backing-store outage.
- registry.py: list_token_providers() — the supports_token subset, in
  registration order. Empty when none registered (token routes fail closed).
- token_auth.py (new): route-agnostic seam. Routes opt in via
  register_token_route(exact path); token_auth_middleware owns the auth
  decision for those routes only — authenticate via stacked providers, attach
  request.state.token_principal + token_authenticated, pass through. 401 on
  missing/unrecognised token, 503 when a provider was unreachable, untouched
  passthrough for non-token routes. Fails closed (never open).
- web_server.py: install the seam OUTERMOST (registered last → runs first).
  Both downstream gates (legacy auth_middleware + gated_auth_middleware) honour
  request.state.token_authenticated and skip enforcement, so a token-authed
  service request is never bounced to /login.
- audit.py: TOKEN_AUTH_SUCCESS / TOKEN_AUTH_FAILURE events.

Tests: tests/hermes_cli/test_dashboard_token_auth.py — ABC flag default,
verify_token NotImplementedError, registry filter, bearer extraction
(case-insensitive scheme, malformed/non-bearer → ""), provider stacking
(first-match-wins, unreachable-remembered, unreachable-then-valid, buggy
provider doesn't crash the gate), and the seam's passthrough/401/503/
fail-closed behaviour. 29 new tests; full dashboard-auth suite 169 passed.

Intentionally deferred:
- The concrete shared-bearer-secret provider plugin — Task 2.0b.
- The begin/cancel-drain endpoint that registers itself as a token route —
  Task 2.1.

Build status: dashboard-auth + plugin-hook suites green.
This commit is contained in:
Ben 2026-06-22 11:07:56 +10:00 committed by Teknium
parent 099df3cd89
commit cb9cb6ba1c
8 changed files with 629 additions and 0 deletions

View file

@ -12,6 +12,7 @@ default. Third parties register their own providers via the plugin hook
from hermes_cli.dashboard_auth.base import (
DashboardAuthProvider,
Session,
TokenPrincipal,
LoginStart,
InvalidCodeError,
InvalidCredentialsError,
@ -23,12 +24,14 @@ from hermes_cli.dashboard_auth.registry import (
register_provider,
get_provider,
list_providers,
list_token_providers,
clear_providers,
)
__all__ = [
"DashboardAuthProvider",
"Session",
"TokenPrincipal",
"LoginStart",
"InvalidCodeError",
"InvalidCredentialsError",
@ -38,5 +41,6 @@ __all__ = [
"register_provider",
"get_provider",
"list_providers",
"list_token_providers",
"clear_providers",
]

View file

@ -47,6 +47,8 @@ class AuditEvent(enum.Enum):
SESSION_VERIFY_FAILURE = "session_verify_failure"
WS_TICKET_MINTED = "ws_ticket_minted"
WS_TICKET_REJECTED = "ws_ticket_rejected"
TOKEN_AUTH_SUCCESS = "token_auth_success"
TOKEN_AUTH_FAILURE = "token_auth_failure"
def _resolve_log_path() -> Path:

View file

@ -25,6 +25,34 @@ class Session:
refresh_token: str
@dataclass(frozen=True)
class TokenPrincipal:
"""A verified non-interactive (service-to-service) caller.
The token analog of :class:`Session`. Where a ``Session`` represents an
interactive human identity behind a session cookie, a ``TokenPrincipal``
represents a machine/service caller that authenticated by presenting a
bearer token in the ``Authorization`` request header on a single
request no login, no cookie, no refresh.
Returned by :meth:`DashboardAuthProvider.verify_token` and attached to
``request.state.token_principal`` by the token-auth middleware seam so a
route handler can see *who* called it.
Fields:
* ``principal`` stable identifier for the caller (e.g. the provider
name, a service account id, or an agent id). Opaque to the seam.
* ``provider`` the ``name`` of the provider that verified the token.
* ``scopes`` capability strings this principal is authorised for.
Empty tuple means "unscoped" (the provider vouches for the caller but
attaches no capability list); a route MAY enforce a required scope.
"""
principal: str
provider: str
scopes: tuple[str, ...] = ()
@dataclass(frozen=True)
class LoginStart:
"""First leg of the OAuth round trip.
@ -131,6 +159,18 @@ class DashboardAuthProvider(ABC):
# and are completely unaffected.
supports_password: bool = False
# When True, this provider can verify a non-interactive bearer token
# (``verify_token``) presented on a single request by a service-to-service
# caller — no login, no cookie, no refresh. This is the generic
# API-token capability flag, mirroring ``supports_password``: a route
# opts into token auth (see ``token_auth`` middleware seam) and the
# gate consults every ``supports_token`` provider in turn until one
# recognises the token. OAuth/password providers leave this False and
# are completely unaffected. The drain bearer-secret plugin is the
# first consumer, but the capability is deliberately generic so any
# future machine-credential provider drops in without core changes.
supports_token: bool = False
@abstractmethod
def start_login(self, *, redirect_uri: str) -> LoginStart: ...
@ -183,6 +223,39 @@ class DashboardAuthProvider(ABC):
"complete_password_login)"
)
def verify_token(self, *, token: str) -> "Optional[TokenPrincipal]":
"""Verify a non-interactive bearer token; return its principal.
The token analog of ``verify_session``. Only consulted when
``supports_token`` is True. Called by the ``token_auth`` middleware
seam for every request to a token-authable route, in registration
order, until one provider returns a non-None principal.
Contract (mirrors ``verify_session`` stacking semantics):
* Return a :class:`TokenPrincipal` if this provider recognises and
accepts the token.
* Return ``None`` for a token this provider does NOT recognise
never raise, so the seam can fall through to the next provider.
A malformed/expired/wrong token is "not recognised" ``None``.
* Raise ``ProviderError`` ONLY for a genuine backing-store outage
(the provider can neither confirm nor deny). The seam treats this
like ``verify_session``: remember it, keep trying other providers,
and surface 503 only if NO provider accepts the token AND at least
one was unreachable.
Implementations MUST use a constant-time comparison
(``hmac.compare_digest``) when matching a shared secret so the
endpoint isn't a timing oracle.
The default raises ``NotImplementedError`` so a provider that sets
``supports_token`` but forgets to implement this fails loudly rather
than silently accepting every caller.
"""
raise NotImplementedError(
f"{type(self).__name__} does not support token auth "
"(set supports_token = True and override verify_token)"
)
def assert_protocol_compliance(cls: type) -> None:
"""Raise ``TypeError`` if ``cls`` doesn't fully implement the provider protocol.

View file

@ -181,6 +181,13 @@ async def gated_auth_middleware(
if not getattr(request.app.state, "auth_required", False):
return await call_next(request)
# A request already authenticated by the token-auth seam (a service caller
# on a registered token route) carries ``token_authenticated`` — it is NOT
# a cookie session and must not be bounced to /login. Pass it through; the
# seam already attached ``request.state.token_principal``.
if getattr(request.state, "token_authenticated", False):
return await call_next(request)
path = request.url.path
if _path_is_public(path):
return await call_next(request)

View file

@ -52,6 +52,20 @@ def list_providers() -> List[DashboardAuthProvider]:
return list(_providers.values())
def list_token_providers() -> List[DashboardAuthProvider]:
"""Registered providers that support non-interactive token auth.
The subset of ``list_providers()`` whose ``supports_token`` flag is True,
in registration order. The ``token_auth`` middleware seam consults these
(and only these) when a token-authable route is hit, so OAuth/password-only
providers are never asked to ``verify_token``. Returns an empty list when
no token provider is registered a token-authable route then fails
closed (401), never open.
"""
with _lock:
return [p for p in _providers.values() if getattr(p, "supports_token", False)]
def clear_providers() -> None:
"""Test-only: drop all registrations."""
with _lock:

View file

@ -0,0 +1,194 @@
"""Route-agnostic non-interactive (bearer-token) auth seam for the dashboard.
This is the generic API-token capability (decisions.md Q-C): a reusable seam
that ANY service-to-service / machine-credential provider plugs into, NOT a
drain-specific hook. The drain bearer-secret plugin is merely the first
consumer.
How it fits the existing auth framework:
* The interactive gate (``gated_auth_middleware``) authenticates a human
via a session cookie on every non-public route. A service caller has no
cookie it presents a bearer token in the ``Authorization`` header on a
single request. That is what this seam verifies.
* A route opts in by registering its exact path via
:func:`register_token_route`. Only registered paths are token-authable;
everything else is untouched, so this can never accidentally widen the
auth surface of an existing route.
* :func:`token_auth_middleware` runs OUTERMOST (installed last in
``web_server.py``). For a token route it fully owns the auth decision:
authenticate via the stacked token providers, attach the verified
:class:`~hermes_cli.dashboard_auth.base.TokenPrincipal` to
``request.state.token_principal`` + set ``request.state.token_authenticated``,
and pass through; otherwise reject (401 unauthenticated, or 503 when a
provider's backing store was unreachable). The downstream cookie/session
gates honour ``token_authenticated`` and skip enforcement, so a
token-authed service request is never bounced to ``/login``.
* Fails closed: a token route with no registered token provider, no token,
or an unrecognised token gets 401 never an open pass-through.
Provider stacking mirrors ``verify_session``: each ``supports_token`` provider
is consulted in registration order until one returns a principal. A provider
that doesn't recognise the token returns ``None`` and the seam moves on; a
provider whose backing store is unreachable raises ``ProviderError``, which the
seam remembers and surfaces as 503 only if NO provider accepts the token.
"""
from __future__ import annotations
import logging
import threading
from typing import Awaitable, Callable, Optional, Tuple
from fastapi import Request
from fastapi.responses import JSONResponse, Response
from hermes_cli.dashboard_auth import list_token_providers
from hermes_cli.dashboard_auth.audit import AuditEvent, audit_log
from hermes_cli.dashboard_auth.base import ProviderError, TokenPrincipal
_log = logging.getLogger(__name__)
# Exact paths that accept non-interactive bearer-token auth. A route registers
# itself here at import/startup; the seam only acts on registered paths.
_token_routes: set[str] = set()
_lock = threading.Lock()
def register_token_route(path: str) -> None:
"""Mark ``path`` (exact match) as token-authable.
Idempotent. Call at module import / app setup so the seam knows which
routes to guard. Registering a route does NOT make it public it makes
it authenticate by token instead of by session cookie.
"""
with _lock:
_token_routes.add(path)
def is_token_route(path: str) -> bool:
"""True if ``path`` was registered as token-authable (exact match)."""
with _lock:
return path in _token_routes
def clear_token_routes() -> None:
"""Test-only: drop all registered token routes."""
with _lock:
_token_routes.clear()
def _client_ip(request: Request) -> str:
fwd = request.headers.get("x-forwarded-for", "")
if fwd:
return fwd.split(",")[0].strip()
return request.client.host if request.client else ""
def extract_bearer_token(request: Request) -> str:
"""Return the bearer token from the ``Authorization`` header, or "".
Accepts ``<scheme> <token>`` where scheme is "bearer" (case-insensitive).
Returns an empty string for a missing/malformed header or a non-bearer
scheme the caller treats "" as "no token presented".
"""
auth = request.headers.get("authorization", "")
parts = auth.split(" ", 1)
if len(parts) == 2 and parts[0].strip().lower() == "bearer":
return parts[1].strip()
return ""
def authenticate_token(
request: Request,
) -> Tuple[Optional[TokenPrincipal], Optional[str]]:
"""Try every token provider against the request's bearer token.
Returns ``(principal, unreachable_provider_name)``:
* ``(TokenPrincipal, None)`` a provider recognised and accepted the token.
* ``(None, None)`` no token, or no provider recognised it (reject 401).
* ``(None, name)`` no provider accepted it AND at least one provider's
backing store was unreachable (the caller surfaces 503, not 401, so a
transient outage doesn't read as "bad credentials").
Never raises: a provider ``ProviderError`` is caught and remembered.
"""
token = extract_bearer_token(request)
if not token:
return None, None
unreachable: Optional[str] = None
for provider in list_token_providers():
try:
principal = provider.verify_token(token=token)
except ProviderError as e:
_log.warning(
"dashboard-auth: token provider %r unreachable during verify: %s",
provider.name, e,
)
if unreachable is None:
unreachable = provider.name
continue
except Exception as e: # noqa: BLE001 — a buggy provider must not 500 the gate
_log.warning(
"dashboard-auth: token provider %r raised during verify: %s",
provider.name, e,
)
continue
if principal is not None:
return principal, None
return None, unreachable
async def token_auth_middleware(
request: Request,
call_next: Callable[[Request], Awaitable[Response]],
) -> Response:
"""Outermost auth seam for token-authable routes.
No-op pass-through for any path not registered via
:func:`register_token_route`. For a registered path, token auth is the
only accepted scheme:
* valid token attach principal + ``token_authenticated`` flag, pass through.
* unreachable 503 (provider backing store down; not "bad credentials").
* otherwise 401 unauthenticated.
Runs before the cookie/session gates (installed last in ``web_server.py``).
The cookie gates honour ``request.state.token_authenticated`` and skip
enforcement, so a token-authed request is never redirected to ``/login``.
"""
path = request.url.path
if not is_token_route(path):
return await call_next(request)
principal, unreachable = authenticate_token(request)
if principal is not None:
request.state.token_principal = principal
request.state.token_authenticated = True
return await call_next(request)
if unreachable:
audit_log(
AuditEvent.TOKEN_AUTH_FAILURE,
provider=unreachable,
reason="provider_unreachable",
path=path,
ip=_client_ip(request),
)
return JSONResponse(
{"detail": f"Auth provider {unreachable!r} unreachable"},
status_code=503,
)
audit_log(
AuditEvent.TOKEN_AUTH_FAILURE,
reason="no_provider_recognises_token",
path=path,
ip=_client_ip(request),
)
return JSONResponse(
{"error": "unauthenticated", "detail": "Unauthorized"},
status_code=401,
)

View file

@ -482,6 +482,11 @@ async def _dashboard_auth_gate(request: Request, call_next):
@app.middleware("http")
async def auth_middleware(request: Request, call_next):
"""Require the session token on all /api/ routes except the public list."""
# A request already authenticated by the token-auth seam (a service caller
# presenting a bearer token on a registered token route) carries
# ``token_authenticated`` — never bounce it through the cookie/session gate.
if getattr(request.state, "token_authenticated", False):
return await call_next(request)
# When the OAuth gate is active, cookie-based auth (gated_auth_middleware
# above) is authoritative. The legacy _SESSION_TOKEN path is loopback-only
# and is skipped here so the gate's session attachment isn't overridden.
@ -497,6 +502,20 @@ async def auth_middleware(request: Request, call_next):
return await call_next(request)
@app.middleware("http")
async def _token_auth_seam(request: Request, call_next):
"""Outermost auth seam: non-interactive bearer-token auth for opted-in routes.
Registered LAST so it runs FIRST (Starlette middleware is outermost-last).
A registered token route is fully owned here authenticate by token,
attach the principal + ``token_authenticated`` flag, and let the downstream
cookie/session gates skip enforcement. Non-token routes pass straight
through untouched.
"""
from hermes_cli.dashboard_auth.token_auth import token_auth_middleware
return await token_auth_middleware(request, call_next)
# ---------------------------------------------------------------------------
# Config schema — auto-generated from DEFAULT_CONFIG
# ---------------------------------------------------------------------------

View file

@ -0,0 +1,316 @@
"""Contract tests for the generic non-interactive (bearer-token) auth seam.
Covers Task 2.0a: the reusable token-auth capability in the dashboard auth
framework NOT the drain plugin (that's 2.0b/2.1). Asserts the ABC capability
flag, the registry filter, bearer extraction, provider stacking (verify_token),
and the route-agnostic middleware seam's fail-closed / 503 / pass-through
behaviour.
"""
from __future__ import annotations
import asyncio
from typing import Optional
import pytest
from hermes_cli.dashboard_auth import (
DashboardAuthProvider,
LoginStart,
Session,
TokenPrincipal,
clear_providers,
list_token_providers,
register_provider,
)
from hermes_cli.dashboard_auth.base import ProviderError
from hermes_cli.dashboard_auth import token_auth
# --------------------------------------------------------------------------
# Test doubles
# --------------------------------------------------------------------------
class _OAuthOnly(DashboardAuthProvider):
"""A pure interactive provider — never token-authable."""
name = "oauth-only"
display_name = "OAuth Only"
def start_login(self, *, redirect_uri):
return LoginStart(redirect_url="x", cookie_payload={})
def complete_login(self, *, code, state, code_verifier, redirect_uri):
return Session("u", "e", "n", "o", self.name, 0, "a", "r")
def verify_session(self, *, access_token):
return None
def refresh_session(self, *, refresh_token):
return Session("u", "e", "n", "o", self.name, 0, "a", "r")
def revoke_session(self, *, refresh_token):
return None
class _TokenProvider(_OAuthOnly):
"""A token provider that accepts exactly one secret."""
name = "tok"
display_name = "Token Provider"
supports_token = True
def __init__(self, *, secret: str = "good-secret", scopes=("drain",)):
self._secret = secret
self._scopes = tuple(scopes)
def verify_token(self, *, token: str) -> Optional[TokenPrincipal]:
if token == self._secret:
return TokenPrincipal(
principal=self.name, provider=self.name, scopes=self._scopes
)
return None
class _UnreachableTokenProvider(_OAuthOnly):
name = "tok-down"
display_name = "Unreachable Token Provider"
supports_token = True
def verify_token(self, *, token: str) -> Optional[TokenPrincipal]:
raise ProviderError("backing store down")
class _BuggyTokenProvider(_OAuthOnly):
name = "tok-buggy"
display_name = "Buggy Token Provider"
supports_token = True
def verify_token(self, *, token: str) -> Optional[TokenPrincipal]:
raise RuntimeError("kaboom")
# --------------------------------------------------------------------------
# Fixtures
# --------------------------------------------------------------------------
@pytest.fixture(autouse=True)
def _isolated_state():
clear_providers()
token_auth.clear_token_routes()
yield
clear_providers()
token_auth.clear_token_routes()
class _FakeURL:
def __init__(self, path):
self.path = path
class _FakeClient:
host = "1.2.3.4"
class _FakeRequest:
"""Minimal Request stand-in for the seam (no real Starlette needed)."""
def __init__(self, path="/api/gateway/drain", headers=None):
self.url = _FakeURL(path)
self.headers = headers or {}
self.client = _FakeClient()
class _State:
pass
self.state = _State()
def _run(coro):
return asyncio.run(coro)
# --------------------------------------------------------------------------
# ABC + registry
# --------------------------------------------------------------------------
def test_oauth_provider_defaults_supports_token_false():
assert _OAuthOnly().supports_token is False
def test_oauth_provider_verify_token_raises_not_implemented():
with pytest.raises(NotImplementedError):
_OAuthOnly().verify_token(token="x")
def test_list_token_providers_filters_to_supports_token():
register_provider(_OAuthOnly())
register_provider(_TokenProvider())
names = [p.name for p in list_token_providers()]
assert names == ["tok"]
def test_list_token_providers_empty_when_none_registered():
register_provider(_OAuthOnly())
assert list_token_providers() == []
# --------------------------------------------------------------------------
# Bearer extraction
# --------------------------------------------------------------------------
@pytest.mark.parametrize(
"header,expected",
[
("Bearer abc123", "abc123"),
("bearer abc123", "abc123"),
("BEARER abc123", "abc123"),
("Bearer spaced ", "spaced"),
("Basic abc123", ""),
("abc123", ""),
("", ""),
],
)
def test_extract_bearer_token(header, expected):
req = _FakeRequest(headers={"authorization": header} if header else {})
assert token_auth.extract_bearer_token(req) == expected
# --------------------------------------------------------------------------
# authenticate_token (provider stacking)
# --------------------------------------------------------------------------
def test_authenticate_token_accepts_valid():
register_provider(_TokenProvider(secret="good-secret"))
req = _FakeRequest(headers={"authorization": "Bearer good-secret"})
principal, unreachable = token_auth.authenticate_token(req)
assert unreachable is None
assert principal is not None
assert principal.provider == "tok"
assert principal.scopes == ("drain",)
def test_authenticate_token_rejects_wrong_secret():
register_provider(_TokenProvider(secret="good-secret"))
req = _FakeRequest(headers={"authorization": "Bearer wrong"})
principal, unreachable = token_auth.authenticate_token(req)
assert principal is None
assert unreachable is None
def test_authenticate_token_no_token_returns_none():
register_provider(_TokenProvider())
req = _FakeRequest(headers={})
principal, unreachable = token_auth.authenticate_token(req)
assert principal is None and unreachable is None
def test_authenticate_token_stacks_first_match_wins():
register_provider(_TokenProvider(secret="aaa"))
second = _TokenProvider(secret="bbb")
second.name = "tok2"
register_provider(second)
req = _FakeRequest(headers={"authorization": "Bearer bbb"})
principal, _ = token_auth.authenticate_token(req)
assert principal is not None and principal.provider == "tok2"
def test_authenticate_token_unreachable_remembered():
register_provider(_UnreachableTokenProvider())
req = _FakeRequest(headers={"authorization": "Bearer anything"})
principal, unreachable = token_auth.authenticate_token(req)
assert principal is None
assert unreachable == "tok-down"
def test_authenticate_token_unreachable_then_valid_provider_wins():
register_provider(_UnreachableTokenProvider())
register_provider(_TokenProvider(secret="good"))
req = _FakeRequest(headers={"authorization": "Bearer good"})
principal, unreachable = token_auth.authenticate_token(req)
# A later provider accepting the token beats the earlier outage.
assert principal is not None and principal.provider == "tok"
assert unreachable is None
def test_authenticate_token_buggy_provider_does_not_crash():
register_provider(_BuggyTokenProvider())
register_provider(_TokenProvider(secret="good"))
req = _FakeRequest(headers={"authorization": "Bearer good"})
principal, unreachable = token_auth.authenticate_token(req)
assert principal is not None and principal.provider == "tok"
# --------------------------------------------------------------------------
# Middleware seam (route-agnostic)
# --------------------------------------------------------------------------
async def _call_next_ok(request):
from fastapi.responses import JSONResponse
return JSONResponse({"ok": True}, status_code=200)
def test_seam_passthrough_for_unregistered_route():
register_provider(_TokenProvider())
req = _FakeRequest(path="/api/something-else")
resp = _run(token_auth.token_auth_middleware(req, _call_next_ok))
assert resp.status_code == 200
assert getattr(req.state, "token_authenticated", False) is False
def test_seam_accepts_valid_token_on_registered_route():
register_provider(_TokenProvider(secret="good"))
token_auth.register_token_route("/api/gateway/drain")
req = _FakeRequest(
path="/api/gateway/drain",
headers={"authorization": "Bearer good"},
)
resp = _run(token_auth.token_auth_middleware(req, _call_next_ok))
assert resp.status_code == 200
assert req.state.token_authenticated is True
assert req.state.token_principal.provider == "tok"
def test_seam_rejects_missing_token_401():
register_provider(_TokenProvider())
token_auth.register_token_route("/api/gateway/drain")
req = _FakeRequest(path="/api/gateway/drain", headers={})
resp = _run(token_auth.token_auth_middleware(req, _call_next_ok))
assert resp.status_code == 401
def test_seam_rejects_wrong_token_401():
register_provider(_TokenProvider(secret="good"))
token_auth.register_token_route("/api/gateway/drain")
req = _FakeRequest(
path="/api/gateway/drain", headers={"authorization": "Bearer bad"}
)
resp = _run(token_auth.token_auth_middleware(req, _call_next_ok))
assert resp.status_code == 401
def test_seam_fails_closed_when_no_token_provider():
# Route registered but NO supports_token provider → 401, never open.
register_provider(_OAuthOnly())
token_auth.register_token_route("/api/gateway/drain")
req = _FakeRequest(
path="/api/gateway/drain", headers={"authorization": "Bearer anything"}
)
resp = _run(token_auth.token_auth_middleware(req, _call_next_ok))
assert resp.status_code == 401
def test_seam_503_on_provider_unreachable():
register_provider(_UnreachableTokenProvider())
token_auth.register_token_route("/api/gateway/drain")
req = _FakeRequest(
path="/api/gateway/drain", headers={"authorization": "Bearer x"}
)
resp = _run(token_auth.token_auth_middleware(req, _call_next_ok))
assert resp.status_code == 503