diff --git a/hermes_cli/dashboard_auth/__init__.py b/hermes_cli/dashboard_auth/__init__.py index faba3761038..83a49d48834 100644 --- a/hermes_cli/dashboard_auth/__init__.py +++ b/hermes_cli/dashboard_auth/__init__.py @@ -12,6 +12,7 @@ default. Third parties register their own providers via the plugin hook from hermes_cli.dashboard_auth.base import ( DashboardAuthProvider, Session, + TokenPrincipal, LoginStart, InvalidCodeError, InvalidCredentialsError, @@ -23,12 +24,14 @@ from hermes_cli.dashboard_auth.registry import ( register_provider, get_provider, list_providers, + list_token_providers, clear_providers, ) __all__ = [ "DashboardAuthProvider", "Session", + "TokenPrincipal", "LoginStart", "InvalidCodeError", "InvalidCredentialsError", @@ -38,5 +41,6 @@ __all__ = [ "register_provider", "get_provider", "list_providers", + "list_token_providers", "clear_providers", ] diff --git a/hermes_cli/dashboard_auth/audit.py b/hermes_cli/dashboard_auth/audit.py index 9e52ca75ebe..cde23bf40b2 100644 --- a/hermes_cli/dashboard_auth/audit.py +++ b/hermes_cli/dashboard_auth/audit.py @@ -47,6 +47,8 @@ class AuditEvent(enum.Enum): SESSION_VERIFY_FAILURE = "session_verify_failure" WS_TICKET_MINTED = "ws_ticket_minted" WS_TICKET_REJECTED = "ws_ticket_rejected" + TOKEN_AUTH_SUCCESS = "token_auth_success" + TOKEN_AUTH_FAILURE = "token_auth_failure" def _resolve_log_path() -> Path: diff --git a/hermes_cli/dashboard_auth/base.py b/hermes_cli/dashboard_auth/base.py index 06dab5dd5a4..77a769cff15 100644 --- a/hermes_cli/dashboard_auth/base.py +++ b/hermes_cli/dashboard_auth/base.py @@ -25,6 +25,34 @@ class Session: refresh_token: str +@dataclass(frozen=True) +class TokenPrincipal: + """A verified non-interactive (service-to-service) caller. + + The token analog of :class:`Session`. Where a ``Session`` represents an + interactive human identity behind a session cookie, a ``TokenPrincipal`` + represents a machine/service caller that authenticated by presenting a + bearer token in the ``Authorization`` request header on a single + request — no login, no cookie, no refresh. + + Returned by :meth:`DashboardAuthProvider.verify_token` and attached to + ``request.state.token_principal`` by the token-auth middleware seam so a + route handler can see *who* called it. + + Fields: + * ``principal`` — stable identifier for the caller (e.g. the provider + name, a service account id, or an agent id). Opaque to the seam. + * ``provider`` — the ``name`` of the provider that verified the token. + * ``scopes`` — capability strings this principal is authorised for. + Empty tuple means "unscoped" (the provider vouches for the caller but + attaches no capability list); a route MAY enforce a required scope. + """ + + principal: str + provider: str + scopes: tuple[str, ...] = () + + @dataclass(frozen=True) class LoginStart: """First leg of the OAuth round trip. @@ -131,6 +159,18 @@ class DashboardAuthProvider(ABC): # and are completely unaffected. supports_password: bool = False + # When True, this provider can verify a non-interactive bearer token + # (``verify_token``) presented on a single request by a service-to-service + # caller — no login, no cookie, no refresh. This is the generic + # API-token capability flag, mirroring ``supports_password``: a route + # opts into token auth (see ``token_auth`` middleware seam) and the + # gate consults every ``supports_token`` provider in turn until one + # recognises the token. OAuth/password providers leave this False and + # are completely unaffected. The drain bearer-secret plugin is the + # first consumer, but the capability is deliberately generic so any + # future machine-credential provider drops in without core changes. + supports_token: bool = False + @abstractmethod def start_login(self, *, redirect_uri: str) -> LoginStart: ... @@ -183,6 +223,39 @@ class DashboardAuthProvider(ABC): "complete_password_login)" ) + def verify_token(self, *, token: str) -> "Optional[TokenPrincipal]": + """Verify a non-interactive bearer token; return its principal. + + The token analog of ``verify_session``. Only consulted when + ``supports_token`` is True. Called by the ``token_auth`` middleware + seam for every request to a token-authable route, in registration + order, until one provider returns a non-None principal. + + Contract (mirrors ``verify_session`` stacking semantics): + * Return a :class:`TokenPrincipal` if this provider recognises and + accepts the token. + * Return ``None`` for a token this provider does NOT recognise — + never raise, so the seam can fall through to the next provider. + A malformed/expired/wrong token is "not recognised" → ``None``. + * Raise ``ProviderError`` ONLY for a genuine backing-store outage + (the provider can neither confirm nor deny). The seam treats this + like ``verify_session``: remember it, keep trying other providers, + and surface 503 only if NO provider accepts the token AND at least + one was unreachable. + + Implementations MUST use a constant-time comparison + (``hmac.compare_digest``) when matching a shared secret so the + endpoint isn't a timing oracle. + + The default raises ``NotImplementedError`` so a provider that sets + ``supports_token`` but forgets to implement this fails loudly rather + than silently accepting every caller. + """ + raise NotImplementedError( + f"{type(self).__name__} does not support token auth " + "(set supports_token = True and override verify_token)" + ) + def assert_protocol_compliance(cls: type) -> None: """Raise ``TypeError`` if ``cls`` doesn't fully implement the provider protocol. diff --git a/hermes_cli/dashboard_auth/middleware.py b/hermes_cli/dashboard_auth/middleware.py index f3ad42a6186..8b37a6388a1 100644 --- a/hermes_cli/dashboard_auth/middleware.py +++ b/hermes_cli/dashboard_auth/middleware.py @@ -181,6 +181,13 @@ async def gated_auth_middleware( if not getattr(request.app.state, "auth_required", False): return await call_next(request) + # A request already authenticated by the token-auth seam (a service caller + # on a registered token route) carries ``token_authenticated`` — it is NOT + # a cookie session and must not be bounced to /login. Pass it through; the + # seam already attached ``request.state.token_principal``. + if getattr(request.state, "token_authenticated", False): + return await call_next(request) + path = request.url.path if _path_is_public(path): return await call_next(request) diff --git a/hermes_cli/dashboard_auth/registry.py b/hermes_cli/dashboard_auth/registry.py index fde1420e204..e6f71744786 100644 --- a/hermes_cli/dashboard_auth/registry.py +++ b/hermes_cli/dashboard_auth/registry.py @@ -52,6 +52,20 @@ def list_providers() -> List[DashboardAuthProvider]: return list(_providers.values()) +def list_token_providers() -> List[DashboardAuthProvider]: + """Registered providers that support non-interactive token auth. + + The subset of ``list_providers()`` whose ``supports_token`` flag is True, + in registration order. The ``token_auth`` middleware seam consults these + (and only these) when a token-authable route is hit, so OAuth/password-only + providers are never asked to ``verify_token``. Returns an empty list when + no token provider is registered — a token-authable route then fails + closed (401), never open. + """ + with _lock: + return [p for p in _providers.values() if getattr(p, "supports_token", False)] + + def clear_providers() -> None: """Test-only: drop all registrations.""" with _lock: diff --git a/hermes_cli/dashboard_auth/token_auth.py b/hermes_cli/dashboard_auth/token_auth.py new file mode 100644 index 00000000000..320b4cdb52b --- /dev/null +++ b/hermes_cli/dashboard_auth/token_auth.py @@ -0,0 +1,194 @@ +"""Route-agnostic non-interactive (bearer-token) auth seam for the dashboard. + +This is the generic API-token capability (decisions.md Q-C): a reusable seam +that ANY service-to-service / machine-credential provider plugs into, NOT a +drain-specific hook. The drain bearer-secret plugin is merely the first +consumer. + +How it fits the existing auth framework: + + * The interactive gate (``gated_auth_middleware``) authenticates a human + via a session cookie on every non-public route. A service caller has no + cookie — it presents a bearer token in the ``Authorization`` header on a + single request. That is what this seam verifies. + + * A route opts in by registering its exact path via + :func:`register_token_route`. Only registered paths are token-authable; + everything else is untouched, so this can never accidentally widen the + auth surface of an existing route. + + * :func:`token_auth_middleware` runs OUTERMOST (installed last in + ``web_server.py``). For a token route it fully owns the auth decision: + authenticate via the stacked token providers, attach the verified + :class:`~hermes_cli.dashboard_auth.base.TokenPrincipal` to + ``request.state.token_principal`` + set ``request.state.token_authenticated``, + and pass through; otherwise reject (401 unauthenticated, or 503 when a + provider's backing store was unreachable). The downstream cookie/session + gates honour ``token_authenticated`` and skip enforcement, so a + token-authed service request is never bounced to ``/login``. + + * Fails closed: a token route with no registered token provider, no token, + or an unrecognised token gets 401 — never an open pass-through. + +Provider stacking mirrors ``verify_session``: each ``supports_token`` provider +is consulted in registration order until one returns a principal. A provider +that doesn't recognise the token returns ``None`` and the seam moves on; a +provider whose backing store is unreachable raises ``ProviderError``, which the +seam remembers and surfaces as 503 only if NO provider accepts the token. +""" +from __future__ import annotations + +import logging +import threading +from typing import Awaitable, Callable, Optional, Tuple + +from fastapi import Request +from fastapi.responses import JSONResponse, Response + +from hermes_cli.dashboard_auth import list_token_providers +from hermes_cli.dashboard_auth.audit import AuditEvent, audit_log +from hermes_cli.dashboard_auth.base import ProviderError, TokenPrincipal + +_log = logging.getLogger(__name__) + +# Exact paths that accept non-interactive bearer-token auth. A route registers +# itself here at import/startup; the seam only acts on registered paths. +_token_routes: set[str] = set() +_lock = threading.Lock() + + +def register_token_route(path: str) -> None: + """Mark ``path`` (exact match) as token-authable. + + Idempotent. Call at module import / app setup so the seam knows which + routes to guard. Registering a route does NOT make it public — it makes + it authenticate by token instead of by session cookie. + """ + with _lock: + _token_routes.add(path) + + +def is_token_route(path: str) -> bool: + """True if ``path`` was registered as token-authable (exact match).""" + with _lock: + return path in _token_routes + + +def clear_token_routes() -> None: + """Test-only: drop all registered token routes.""" + with _lock: + _token_routes.clear() + + +def _client_ip(request: Request) -> str: + fwd = request.headers.get("x-forwarded-for", "") + if fwd: + return fwd.split(",")[0].strip() + return request.client.host if request.client else "" + + +def extract_bearer_token(request: Request) -> str: + """Return the bearer token from the ``Authorization`` header, or "". + + Accepts `` `` where scheme is "bearer" (case-insensitive). + Returns an empty string for a missing/malformed header or a non-bearer + scheme — the caller treats "" as "no token presented". + """ + auth = request.headers.get("authorization", "") + parts = auth.split(" ", 1) + if len(parts) == 2 and parts[0].strip().lower() == "bearer": + return parts[1].strip() + return "" + + +def authenticate_token( + request: Request, +) -> Tuple[Optional[TokenPrincipal], Optional[str]]: + """Try every token provider against the request's bearer token. + + Returns ``(principal, unreachable_provider_name)``: + * ``(TokenPrincipal, None)`` — a provider recognised and accepted the token. + * ``(None, None)`` — no token, or no provider recognised it (reject 401). + * ``(None, name)`` — no provider accepted it AND at least one provider's + backing store was unreachable (the caller surfaces 503, not 401, so a + transient outage doesn't read as "bad credentials"). + + Never raises: a provider ``ProviderError`` is caught and remembered. + """ + token = extract_bearer_token(request) + if not token: + return None, None + unreachable: Optional[str] = None + for provider in list_token_providers(): + try: + principal = provider.verify_token(token=token) + except ProviderError as e: + _log.warning( + "dashboard-auth: token provider %r unreachable during verify: %s", + provider.name, e, + ) + if unreachable is None: + unreachable = provider.name + continue + except Exception as e: # noqa: BLE001 — a buggy provider must not 500 the gate + _log.warning( + "dashboard-auth: token provider %r raised during verify: %s", + provider.name, e, + ) + continue + if principal is not None: + return principal, None + return None, unreachable + + +async def token_auth_middleware( + request: Request, + call_next: Callable[[Request], Awaitable[Response]], +) -> Response: + """Outermost auth seam for token-authable routes. + + No-op pass-through for any path not registered via + :func:`register_token_route`. For a registered path, token auth is the + only accepted scheme: + + * valid token → attach principal + ``token_authenticated`` flag, pass through. + * unreachable → 503 (provider backing store down; not "bad credentials"). + * otherwise → 401 unauthenticated. + + Runs before the cookie/session gates (installed last in ``web_server.py``). + The cookie gates honour ``request.state.token_authenticated`` and skip + enforcement, so a token-authed request is never redirected to ``/login``. + """ + path = request.url.path + if not is_token_route(path): + return await call_next(request) + + principal, unreachable = authenticate_token(request) + if principal is not None: + request.state.token_principal = principal + request.state.token_authenticated = True + return await call_next(request) + + if unreachable: + audit_log( + AuditEvent.TOKEN_AUTH_FAILURE, + provider=unreachable, + reason="provider_unreachable", + path=path, + ip=_client_ip(request), + ) + return JSONResponse( + {"detail": f"Auth provider {unreachable!r} unreachable"}, + status_code=503, + ) + + audit_log( + AuditEvent.TOKEN_AUTH_FAILURE, + reason="no_provider_recognises_token", + path=path, + ip=_client_ip(request), + ) + return JSONResponse( + {"error": "unauthenticated", "detail": "Unauthorized"}, + status_code=401, + ) diff --git a/hermes_cli/web_server.py b/hermes_cli/web_server.py index 178b6b0dd87..5dbecaced50 100644 --- a/hermes_cli/web_server.py +++ b/hermes_cli/web_server.py @@ -482,6 +482,11 @@ async def _dashboard_auth_gate(request: Request, call_next): @app.middleware("http") async def auth_middleware(request: Request, call_next): """Require the session token on all /api/ routes except the public list.""" + # A request already authenticated by the token-auth seam (a service caller + # presenting a bearer token on a registered token route) carries + # ``token_authenticated`` — never bounce it through the cookie/session gate. + if getattr(request.state, "token_authenticated", False): + return await call_next(request) # When the OAuth gate is active, cookie-based auth (gated_auth_middleware # above) is authoritative. The legacy _SESSION_TOKEN path is loopback-only # and is skipped here so the gate's session attachment isn't overridden. @@ -497,6 +502,20 @@ async def auth_middleware(request: Request, call_next): return await call_next(request) +@app.middleware("http") +async def _token_auth_seam(request: Request, call_next): + """Outermost auth seam: non-interactive bearer-token auth for opted-in routes. + + Registered LAST so it runs FIRST (Starlette middleware is outermost-last). + A registered token route is fully owned here — authenticate by token, + attach the principal + ``token_authenticated`` flag, and let the downstream + cookie/session gates skip enforcement. Non-token routes pass straight + through untouched. + """ + from hermes_cli.dashboard_auth.token_auth import token_auth_middleware + return await token_auth_middleware(request, call_next) + + # --------------------------------------------------------------------------- # Config schema — auto-generated from DEFAULT_CONFIG # --------------------------------------------------------------------------- diff --git a/tests/hermes_cli/test_dashboard_token_auth.py b/tests/hermes_cli/test_dashboard_token_auth.py new file mode 100644 index 00000000000..744651c58a5 --- /dev/null +++ b/tests/hermes_cli/test_dashboard_token_auth.py @@ -0,0 +1,316 @@ +"""Contract tests for the generic non-interactive (bearer-token) auth seam. + +Covers Task 2.0a: the reusable token-auth capability in the dashboard auth +framework — NOT the drain plugin (that's 2.0b/2.1). Asserts the ABC capability +flag, the registry filter, bearer extraction, provider stacking (verify_token), +and the route-agnostic middleware seam's fail-closed / 503 / pass-through +behaviour. +""" +from __future__ import annotations + +import asyncio +from typing import Optional + +import pytest + +from hermes_cli.dashboard_auth import ( + DashboardAuthProvider, + LoginStart, + Session, + TokenPrincipal, + clear_providers, + list_token_providers, + register_provider, +) +from hermes_cli.dashboard_auth.base import ProviderError +from hermes_cli.dashboard_auth import token_auth + + +# -------------------------------------------------------------------------- +# Test doubles +# -------------------------------------------------------------------------- + + +class _OAuthOnly(DashboardAuthProvider): + """A pure interactive provider — never token-authable.""" + + name = "oauth-only" + display_name = "OAuth Only" + + def start_login(self, *, redirect_uri): + return LoginStart(redirect_url="x", cookie_payload={}) + + def complete_login(self, *, code, state, code_verifier, redirect_uri): + return Session("u", "e", "n", "o", self.name, 0, "a", "r") + + def verify_session(self, *, access_token): + return None + + def refresh_session(self, *, refresh_token): + return Session("u", "e", "n", "o", self.name, 0, "a", "r") + + def revoke_session(self, *, refresh_token): + return None + + +class _TokenProvider(_OAuthOnly): + """A token provider that accepts exactly one secret.""" + + name = "tok" + display_name = "Token Provider" + supports_token = True + + def __init__(self, *, secret: str = "good-secret", scopes=("drain",)): + self._secret = secret + self._scopes = tuple(scopes) + + def verify_token(self, *, token: str) -> Optional[TokenPrincipal]: + if token == self._secret: + return TokenPrincipal( + principal=self.name, provider=self.name, scopes=self._scopes + ) + return None + + +class _UnreachableTokenProvider(_OAuthOnly): + name = "tok-down" + display_name = "Unreachable Token Provider" + supports_token = True + + def verify_token(self, *, token: str) -> Optional[TokenPrincipal]: + raise ProviderError("backing store down") + + +class _BuggyTokenProvider(_OAuthOnly): + name = "tok-buggy" + display_name = "Buggy Token Provider" + supports_token = True + + def verify_token(self, *, token: str) -> Optional[TokenPrincipal]: + raise RuntimeError("kaboom") + + +# -------------------------------------------------------------------------- +# Fixtures +# -------------------------------------------------------------------------- + + +@pytest.fixture(autouse=True) +def _isolated_state(): + clear_providers() + token_auth.clear_token_routes() + yield + clear_providers() + token_auth.clear_token_routes() + + +class _FakeURL: + def __init__(self, path): + self.path = path + + +class _FakeClient: + host = "1.2.3.4" + + +class _FakeRequest: + """Minimal Request stand-in for the seam (no real Starlette needed).""" + + def __init__(self, path="/api/gateway/drain", headers=None): + self.url = _FakeURL(path) + self.headers = headers or {} + self.client = _FakeClient() + + class _State: + pass + + self.state = _State() + + +def _run(coro): + return asyncio.run(coro) + + +# -------------------------------------------------------------------------- +# ABC + registry +# -------------------------------------------------------------------------- + + +def test_oauth_provider_defaults_supports_token_false(): + assert _OAuthOnly().supports_token is False + + +def test_oauth_provider_verify_token_raises_not_implemented(): + with pytest.raises(NotImplementedError): + _OAuthOnly().verify_token(token="x") + + +def test_list_token_providers_filters_to_supports_token(): + register_provider(_OAuthOnly()) + register_provider(_TokenProvider()) + names = [p.name for p in list_token_providers()] + assert names == ["tok"] + + +def test_list_token_providers_empty_when_none_registered(): + register_provider(_OAuthOnly()) + assert list_token_providers() == [] + + +# -------------------------------------------------------------------------- +# Bearer extraction +# -------------------------------------------------------------------------- + + +@pytest.mark.parametrize( + "header,expected", + [ + ("Bearer abc123", "abc123"), + ("bearer abc123", "abc123"), + ("BEARER abc123", "abc123"), + ("Bearer spaced ", "spaced"), + ("Basic abc123", ""), + ("abc123", ""), + ("", ""), + ], +) +def test_extract_bearer_token(header, expected): + req = _FakeRequest(headers={"authorization": header} if header else {}) + assert token_auth.extract_bearer_token(req) == expected + + +# -------------------------------------------------------------------------- +# authenticate_token (provider stacking) +# -------------------------------------------------------------------------- + + +def test_authenticate_token_accepts_valid(): + register_provider(_TokenProvider(secret="good-secret")) + req = _FakeRequest(headers={"authorization": "Bearer good-secret"}) + principal, unreachable = token_auth.authenticate_token(req) + assert unreachable is None + assert principal is not None + assert principal.provider == "tok" + assert principal.scopes == ("drain",) + + +def test_authenticate_token_rejects_wrong_secret(): + register_provider(_TokenProvider(secret="good-secret")) + req = _FakeRequest(headers={"authorization": "Bearer wrong"}) + principal, unreachable = token_auth.authenticate_token(req) + assert principal is None + assert unreachable is None + + +def test_authenticate_token_no_token_returns_none(): + register_provider(_TokenProvider()) + req = _FakeRequest(headers={}) + principal, unreachable = token_auth.authenticate_token(req) + assert principal is None and unreachable is None + + +def test_authenticate_token_stacks_first_match_wins(): + register_provider(_TokenProvider(secret="aaa")) + second = _TokenProvider(secret="bbb") + second.name = "tok2" + register_provider(second) + req = _FakeRequest(headers={"authorization": "Bearer bbb"}) + principal, _ = token_auth.authenticate_token(req) + assert principal is not None and principal.provider == "tok2" + + +def test_authenticate_token_unreachable_remembered(): + register_provider(_UnreachableTokenProvider()) + req = _FakeRequest(headers={"authorization": "Bearer anything"}) + principal, unreachable = token_auth.authenticate_token(req) + assert principal is None + assert unreachable == "tok-down" + + +def test_authenticate_token_unreachable_then_valid_provider_wins(): + register_provider(_UnreachableTokenProvider()) + register_provider(_TokenProvider(secret="good")) + req = _FakeRequest(headers={"authorization": "Bearer good"}) + principal, unreachable = token_auth.authenticate_token(req) + # A later provider accepting the token beats the earlier outage. + assert principal is not None and principal.provider == "tok" + assert unreachable is None + + +def test_authenticate_token_buggy_provider_does_not_crash(): + register_provider(_BuggyTokenProvider()) + register_provider(_TokenProvider(secret="good")) + req = _FakeRequest(headers={"authorization": "Bearer good"}) + principal, unreachable = token_auth.authenticate_token(req) + assert principal is not None and principal.provider == "tok" + + +# -------------------------------------------------------------------------- +# Middleware seam (route-agnostic) +# -------------------------------------------------------------------------- + + +async def _call_next_ok(request): + from fastapi.responses import JSONResponse + + return JSONResponse({"ok": True}, status_code=200) + + +def test_seam_passthrough_for_unregistered_route(): + register_provider(_TokenProvider()) + req = _FakeRequest(path="/api/something-else") + resp = _run(token_auth.token_auth_middleware(req, _call_next_ok)) + assert resp.status_code == 200 + assert getattr(req.state, "token_authenticated", False) is False + + +def test_seam_accepts_valid_token_on_registered_route(): + register_provider(_TokenProvider(secret="good")) + token_auth.register_token_route("/api/gateway/drain") + req = _FakeRequest( + path="/api/gateway/drain", + headers={"authorization": "Bearer good"}, + ) + resp = _run(token_auth.token_auth_middleware(req, _call_next_ok)) + assert resp.status_code == 200 + assert req.state.token_authenticated is True + assert req.state.token_principal.provider == "tok" + + +def test_seam_rejects_missing_token_401(): + register_provider(_TokenProvider()) + token_auth.register_token_route("/api/gateway/drain") + req = _FakeRequest(path="/api/gateway/drain", headers={}) + resp = _run(token_auth.token_auth_middleware(req, _call_next_ok)) + assert resp.status_code == 401 + + +def test_seam_rejects_wrong_token_401(): + register_provider(_TokenProvider(secret="good")) + token_auth.register_token_route("/api/gateway/drain") + req = _FakeRequest( + path="/api/gateway/drain", headers={"authorization": "Bearer bad"} + ) + resp = _run(token_auth.token_auth_middleware(req, _call_next_ok)) + assert resp.status_code == 401 + + +def test_seam_fails_closed_when_no_token_provider(): + # Route registered but NO supports_token provider → 401, never open. + register_provider(_OAuthOnly()) + token_auth.register_token_route("/api/gateway/drain") + req = _FakeRequest( + path="/api/gateway/drain", headers={"authorization": "Bearer anything"} + ) + resp = _run(token_auth.token_auth_middleware(req, _call_next_ok)) + assert resp.status_code == 401 + + +def test_seam_503_on_provider_unreachable(): + register_provider(_UnreachableTokenProvider()) + token_auth.register_token_route("/api/gateway/drain") + req = _FakeRequest( + path="/api/gateway/drain", headers={"authorization": "Bearer x"} + ) + resp = _run(token_auth.token_auth_middleware(req, _call_next_ok)) + assert resp.status_code == 503