feat(dashboard-auth): auth gate middleware + /auth/* routes + /login HTML

Phase 3, Tasks 3.2 + 3.3 + 3.4. These three pieces are mutually
dependent so they land together.

middleware.py - gated_auth_middleware engages when app.state.auth_required
is True.  Allowlists /login, /auth/*, /api/auth/providers, and static
asset paths; everything else demands a valid session_at cookie.  Verifies
by trying every registered provider's verify_session in turn (multi-
provider stack); attaches verified Session to request.state.session.
Returns 401 JSON for /api/* and 302 -> /login for HTML.  ProviderError
during verify -> 503.

routes.py - APIRouter with:
  GET  /login              server-rendered HTML
  GET  /auth/login?provider=N  302 to IDP + PKCE cookie
  GET  /auth/callback?code,state  completes login, sets session cookies
  POST /auth/logout        clears cookies + best-effort revoke
  GET  /api/auth/providers public bootstrap endpoint (503 if zero)
  GET  /api/auth/me        verified session as JSON (auth-required)

login_page.py - Inline-CSS HTML template, no React, no JavaScript.

web_server.py - Mounted gated_auth_middleware between host_header and
auth_middleware (FastAPI runs middlewares in registration order: host
check -> cookie auth -> token auth).  auth_middleware short-circuits
when auth_required so cookie auth is authoritative in gated mode.
Router is included before mount_spa so the catch-all doesn't swallow
/login or /auth/*.

17 new behavioural tests; loopback regression harness still green.
This commit is contained in:
Ben 2026-05-21 15:19:44 +10:00
parent 12142c0ca1
commit 884f0da82a
5 changed files with 838 additions and 0 deletions

View file

@ -0,0 +1,104 @@
"""Server-rendered /login page.
No React, no JavaScript dependency. Listed providers come from the
registry; clicking a provider sends a GET to
``/auth/login?provider=<name>``.
"""
from __future__ import annotations
import html
from hermes_cli.dashboard_auth import list_providers
# Inline minimal CSS. The dashboard's full skin lives in the React
# bundle, which we deliberately do NOT load here — the login page must
# not depend on the SPA build being present or on the injected session
# token.
_LOGIN_HTML_TEMPLATE = """\
<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>Sign in Hermes Agent</title>
<style>
:root {{
--bg: #0a0a0b;
--fg: #e5e5e7;
--accent: #f97316;
--border: #27272a;
}}
html, body {{
margin: 0; padding: 0; height: 100%;
background: var(--bg); color: var(--fg);
font: 16px/1.5 system-ui, -apple-system, "Segoe UI", Roboto, sans-serif;
}}
main {{
max-width: 28rem; margin: 10vh auto; padding: 2rem;
border: 1px solid var(--border); border-radius: 0.75rem;
background: rgba(255,255,255,0.02);
}}
h1 {{ margin: 0 0 0.5rem; font-size: 1.5rem; }}
p {{ margin: 0 0 1.5rem; opacity: 0.7; }}
.provider-list {{ display: grid; gap: 0.75rem; }}
.provider-btn {{
display: block; width: 100%; box-sizing: border-box;
padding: 0.875rem 1rem; text-align: center;
background: var(--accent); color: #0a0a0b;
font-weight: 600; font-size: 1rem;
border-radius: 0.5rem; text-decoration: none;
border: 0; cursor: pointer;
}}
.provider-btn:hover {{ filter: brightness(1.1); }}
footer {{
margin-top: 2rem; font-size: 0.875rem;
opacity: 0.5; text-align: center;
}}
</style>
</head>
<body>
<main>
<h1>Sign in to Hermes Agent</h1>
<p>Choose a sign-in method to continue.</p>
<div class="provider-list">
{provider_buttons}
</div>
<footer>This dashboard is bound to a non-loopback host.<br>
Sign-in is required for security.</footer>
</main>
</body>
</html>
"""
_EMPTY_HTML = """\
<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>Sign-in unavailable Hermes Agent</title>
</head>
<body><main style="font-family: system-ui; max-width: 36rem; margin: 10vh auto; padding: 2rem;">
<h1>Sign-in unavailable</h1>
<p>This dashboard is bound to a non-loopback host but no authentication
providers are installed.</p>
<p>Install <code>plugins/dashboard-auth-nous</code> (default) or another
auth provider, or restart with <code>--insecure</code> to bypass the
auth gate (not recommended on untrusted networks).</p>
</main></body></html>
"""
def render_login_html() -> str:
"""Return the full HTML for ``GET /login``."""
providers = list_providers()
if not providers:
return _EMPTY_HTML
buttons = []
for p in providers:
buttons.append(
f' <a class="provider-btn" '
f'href="/auth/login?provider={html.escape(p.name, quote=True)}">'
f'Sign in with {html.escape(p.display_name)}</a>'
)
return _LOGIN_HTML_TEMPLATE.format(provider_buttons="\n".join(buttons))

View file

@ -0,0 +1,126 @@
"""Auth-gate middleware for the dashboard.
Engaged when ``app.state.auth_required is True``. The gate's job:
1. Allow a small set of routes through unauthenticated (login page,
``/auth/*`` OAuth round trip, ``/api/auth/providers``, static
assets).
2. For everything else, demand a valid session cookie and attach the
verified :class:`Session` to ``request.state.session``.
3. On HTML routes, redirect missing/invalid cookies to ``/login``.
On ``/api/*`` routes, return 401 JSON.
The middleware is a no-op when ``auth_required`` is False (loopback
mode); the legacy ``_SESSION_TOKEN`` ``auth_middleware`` handles those
binds.
"""
from __future__ import annotations
import logging
from typing import Awaitable, Callable
from fastapi import Request
from fastapi.responses import JSONResponse, RedirectResponse, Response
from hermes_cli.dashboard_auth import list_providers
from hermes_cli.dashboard_auth.audit import AuditEvent, audit_log
from hermes_cli.dashboard_auth.base import ProviderError
from hermes_cli.dashboard_auth.cookies import read_session_cookies
_log = logging.getLogger(__name__)
# Paths that bypass the auth gate. Order matters: prefix match.
_GATE_PUBLIC_PREFIXES: tuple[str, ...] = (
"/auth/login",
"/auth/callback",
"/auth/logout",
"/login",
"/api/auth/providers",
"/assets/",
"/favicon.ico",
"/ds-assets/",
"/fonts/",
"/fonts-terminal/",
)
def _path_is_public(path: str) -> bool:
return any(
path == prefix or path.startswith(prefix)
for prefix in _GATE_PUBLIC_PREFIXES
)
def _client_ip(request: Request) -> str:
fwd = request.headers.get("x-forwarded-for", "")
if fwd:
return fwd.split(",")[0].strip()
return request.client.host if request.client else ""
def _unauth_response(path: str, *, reason: str) -> Response:
"""API routes → 401 JSON; HTML routes → 302 → /login."""
if path.startswith("/api/"):
return JSONResponse(
{"detail": "Unauthorized", "reason": reason},
status_code=401,
)
return RedirectResponse(url="/login", status_code=302)
async def gated_auth_middleware(
request: Request,
call_next: Callable[[Request], Awaitable[Response]],
) -> Response:
"""Engaged only when ``app.state.auth_required is True``.
No-op pass-through in loopback mode so the legacy auth_middleware can
handle those binds via ``_SESSION_TOKEN``.
"""
if not getattr(request.app.state, "auth_required", False):
return await call_next(request)
path = request.url.path
if _path_is_public(path):
return await call_next(request)
at, _rt = read_session_cookies(request)
if not at:
return _unauth_response(path, reason="no_cookie")
# Try every registered provider's verify_session in turn. Providers
# MUST return None for tokens they don't recognise (not raise). This
# lets multiple providers stack — the first one that recognises a
# token wins.
session = None
for provider in list_providers():
try:
session = provider.verify_session(access_token=at)
except ProviderError as e:
_log.warning(
"dashboard-auth: provider %r unreachable during verify: %s",
provider.name, e,
)
audit_log(
AuditEvent.SESSION_VERIFY_FAILURE,
provider=provider.name,
reason="provider_unreachable",
ip=_client_ip(request),
)
return JSONResponse(
{"detail": f"Auth provider {provider.name!r} unreachable"},
status_code=503,
)
if session is not None:
break
if session is None:
audit_log(
AuditEvent.SESSION_VERIFY_FAILURE,
reason="no_provider_recognises",
ip=_client_ip(request),
)
return _unauth_response(path, reason="invalid_or_expired_session")
request.state.session = session
return await call_next(request)

View file

@ -0,0 +1,304 @@
"""HTTP routes for the dashboard-auth OAuth round trip.
Mounted at root (no prefix) by ``web_server.py``. The router does not
auto-gate; gating is performed by ``gated_auth_middleware``, which
allowlists everything under ``/auth/*`` and ``/api/auth/providers``.
The routes:
GET /login server-rendered login page
GET /auth/login?provider=N 302 to IDP, sets PKCE cookie
GET /auth/callback?code,state completes login, sets session cookies
POST /auth/logout clears cookies, best-effort revoke
GET /api/auth/providers list registered providers (login bootstrap)
GET /api/auth/me current Session as JSON (auth-required)
"""
from __future__ import annotations
import logging
import time
from typing import Any
from fastapi import APIRouter, HTTPException, Request
from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse
from hermes_cli.dashboard_auth import (
get_provider,
list_providers,
)
from hermes_cli.dashboard_auth.audit import AuditEvent, audit_log
from hermes_cli.dashboard_auth.base import (
InvalidCodeError,
ProviderError,
)
from hermes_cli.dashboard_auth.cookies import (
clear_pkce_cookie,
clear_session_cookies,
detect_https,
read_pkce_cookie,
read_session_cookies,
set_pkce_cookie,
set_session_cookies,
)
from hermes_cli.dashboard_auth.login_page import render_login_html
_log = logging.getLogger(__name__)
router = APIRouter()
def _redirect_uri(request: Request) -> str:
"""Reconstruct the absolute callback URL the IDP redirects back to.
Reads from the request URL under uvicorn's ``proxy_headers=True``
this picks up the public https URL from ``X-Forwarded-Host`` plus
``X-Forwarded-Proto``.
"""
return str(request.url_for("auth_callback"))
def _client_ip(request: Request) -> str:
fwd = request.headers.get("x-forwarded-for", "")
if fwd:
return fwd.split(",")[0].strip()
return request.client.host if request.client else ""
# ---------------------------------------------------------------------------
# Public: login page (server-rendered HTML, no SPA bundle)
# ---------------------------------------------------------------------------
@router.get("/login", name="login_page")
async def login_page(request: Request) -> HTMLResponse:
return HTMLResponse(
render_login_html(),
headers={"Cache-Control": "no-store, no-cache, must-revalidate"},
)
# ---------------------------------------------------------------------------
# Public: provider list for the login-page bootstrap
# ---------------------------------------------------------------------------
@router.get("/api/auth/providers", name="auth_providers")
async def api_auth_providers() -> Any:
providers = list_providers()
if not providers:
# Q13: fail-closed when zero providers are registered.
return JSONResponse(
{"detail": "no auth providers registered"},
status_code=503,
)
return {
"providers": [
{"name": p.name, "display_name": p.display_name}
for p in providers
],
}
# ---------------------------------------------------------------------------
# Public: OAuth round trip
# ---------------------------------------------------------------------------
@router.get("/auth/login", name="auth_login")
async def auth_login(request: Request, provider: str):
p = get_provider(provider)
if p is None:
raise HTTPException(
status_code=404,
detail=f"Unknown provider: {provider!r}",
)
try:
ls = p.start_login(redirect_uri=_redirect_uri(request))
except ProviderError as e:
audit_log(
AuditEvent.LOGIN_FAILURE,
provider=provider,
reason="provider_unreachable",
ip=_client_ip(request),
)
raise HTTPException(
status_code=503,
detail=f"Provider unreachable: {e}",
)
audit_log(
AuditEvent.LOGIN_START,
provider=provider,
ip=_client_ip(request),
)
resp = RedirectResponse(url=ls.redirect_url, status_code=302)
# Pack the provider name into the PKCE cookie so the callback can
# find it without a separate cookie. Provider may or may not have
# already included a ``provider=`` segment.
pkce = ls.cookie_payload.get("hermes_session_pkce", "")
if "provider=" not in pkce:
pkce = f"provider={provider};{pkce}" if pkce else f"provider={provider}"
set_pkce_cookie(resp, payload=pkce, use_https=detect_https(request))
return resp
@router.get("/auth/callback", name="auth_callback")
async def auth_callback(
request: Request,
code: str = "",
state: str = "",
error: str = "",
error_description: str = "",
):
pkce_raw = read_pkce_cookie(request)
if not pkce_raw:
audit_log(
AuditEvent.LOGIN_FAILURE,
reason="missing_pkce_cookie",
ip=_client_ip(request),
)
raise HTTPException(
status_code=400,
detail="Missing PKCE state cookie",
)
# Parse ``provider=...;state=...;verifier=...``
parts = dict(
seg.split("=", 1) for seg in pkce_raw.split(";") if "=" in seg
)
provider_name = parts.get("provider", "")
expected_state = parts.get("state", "")
verifier = parts.get("verifier", "")
p = get_provider(provider_name)
if p is None:
raise HTTPException(
status_code=400,
detail=f"Unknown provider in cookie: {provider_name!r}",
)
if error:
audit_log(
AuditEvent.LOGIN_FAILURE,
provider=provider_name,
reason="idp_error",
error=error,
ip=_client_ip(request),
)
raise HTTPException(
status_code=400,
detail=f"OAuth error from provider: {error} ({error_description})",
)
if not state or state != expected_state:
audit_log(
AuditEvent.LOGIN_FAILURE,
provider=provider_name,
reason="state_mismatch",
ip=_client_ip(request),
)
raise HTTPException(
status_code=400,
detail="OAuth state mismatch (CSRF check failed)",
)
try:
session = p.complete_login(
code=code,
state=state,
code_verifier=verifier,
redirect_uri=_redirect_uri(request),
)
except InvalidCodeError as e:
audit_log(
AuditEvent.LOGIN_FAILURE,
provider=provider_name,
reason="invalid_code",
ip=_client_ip(request),
)
raise HTTPException(status_code=400, detail=f"Invalid code: {e}")
except ProviderError as e:
audit_log(
AuditEvent.LOGIN_FAILURE,
provider=provider_name,
reason="provider_unreachable",
ip=_client_ip(request),
)
raise HTTPException(
status_code=503,
detail=f"Provider unreachable: {e}",
)
audit_log(
AuditEvent.LOGIN_SUCCESS,
provider=provider_name,
user_id=session.user_id,
email=session.email,
org_id=session.org_id,
ip=_client_ip(request),
)
expires_in = max(60, session.expires_at - int(time.time()))
resp = RedirectResponse(url="/", status_code=302)
set_session_cookies(
resp,
access_token=session.access_token,
refresh_token=session.refresh_token,
access_token_expires_in=expires_in,
use_https=detect_https(request),
)
clear_pkce_cookie(resp)
return resp
@router.post("/auth/logout", name="auth_logout")
async def auth_logout(request: Request):
_at, rt = read_session_cookies(request)
if rt:
# Best-effort revoke. Try every provider so a session minted by
# any registered provider is revoked correctly. Failures are
# logged but never raised.
for provider in list_providers():
try:
provider.revoke_session(refresh_token=rt)
except Exception as e: # noqa: BLE001 — best-effort
_log.warning(
"dashboard-auth: revoke on %r failed: %s",
provider.name, e,
)
sess = getattr(request.state, "session", None)
audit_log(
AuditEvent.LOGOUT,
provider=(sess.provider if sess else "unknown"),
user_id=(sess.user_id if sess else ""),
ip=_client_ip(request),
)
resp = RedirectResponse(url="/login", status_code=302)
clear_session_cookies(resp)
clear_pkce_cookie(resp)
return resp
# ---------------------------------------------------------------------------
# Auth-required: identity probe for the SPA
# ---------------------------------------------------------------------------
@router.get("/api/auth/me", name="auth_me")
async def api_auth_me(request: Request):
"""Return the verified session as JSON. Auth-required (gate enforces)."""
sess = getattr(request.state, "session", None)
if sess is None:
raise HTTPException(status_code=401, detail="Unauthorized")
return {
"user_id": sess.user_id,
"email": sess.email,
"display_name": sess.display_name,
"org_id": sess.org_id,
"provider": sess.provider,
"expires_at": sess.expires_at,
}

View file

@ -249,9 +249,29 @@ async def host_header_middleware(request: Request, call_next):
return await call_next(request)
# ---------------------------------------------------------------------------
# Dashboard OAuth auth gate — engaged only when start_server flags the
# bind as non-loopback-without-insecure. No-op pass-through in loopback
# mode so the legacy auth_middleware (below) handles those binds via
# the injected ``_SESSION_TOKEN``. Registered between host_header and
# auth_middleware so the order is: host check → cookie auth → token auth.
# ---------------------------------------------------------------------------
@app.middleware("http")
async def _dashboard_auth_gate(request: Request, call_next):
from hermes_cli.dashboard_auth.middleware import gated_auth_middleware
return await gated_auth_middleware(request, call_next)
@app.middleware("http")
async def auth_middleware(request: Request, call_next):
"""Require the session token on all /api/ routes except the public list."""
# When the OAuth gate is active, cookie-based auth (gated_auth_middleware
# above) is authoritative. The legacy _SESSION_TOKEN path is loopback-only
# and is skipped here so the gate's session attachment isn't overridden.
if getattr(request.app.state, "auth_required", False):
return await call_next(request)
path = request.url.path
if path.startswith("/api/") and path not in _PUBLIC_API_PATHS:
if not _has_valid_session_token(request):
@ -4524,6 +4544,13 @@ def _mount_plugin_api_routes():
# Mount plugin API routes before the SPA catch-all.
_mount_plugin_api_routes()
# Mount the dashboard auth routes (/login, /auth/*, /api/auth/*) before the
# SPA catch-all so /{full_path:path} doesn't swallow them. These are
# always mounted — the gate middleware decides whether to enforce auth,
# not whether the routes exist.
from hermes_cli.dashboard_auth.routes import router as _dashboard_auth_router # noqa: E402
app.include_router(_dashboard_auth_router)
mount_spa(app)

View file

@ -0,0 +1,277 @@
"""End-to-end behavioural tests for the dashboard auth gate.
Uses ``StubAuthProvider`` so the OAuth round trip can complete in-process
without any external IDP. Exercises:
* `/api/status` flips from public (loopback) to gated (auth_required)
* `/` redirects to /login when no cookie present
* `/api/auth/providers` is the public bootstrap endpoint
* `/login` renders HTML listing all providers
* /assets/* still passes through unauthenticated
* Full /auth/login /auth/callback / round trip with the stub
* Invalid / missing cookies return 401 (api) or 302 (html)
* Zero-providers + gate-on fails closed
"""
from __future__ import annotations
import pytest
from fastapi.testclient import TestClient
from hermes_cli import web_server
from hermes_cli.dashboard_auth import clear_providers, register_provider
from hermes_cli.dashboard_auth.cookies import SESSION_AT_COOKIE
from tests.hermes_cli.conftest_dashboard_auth import StubAuthProvider
@pytest.fixture
def gated_app():
"""Configure web_server.app for gated mode + register the stub provider."""
clear_providers()
register_provider(StubAuthProvider())
prev_host = getattr(web_server.app.state, "bound_host", None)
prev_port = getattr(web_server.app.state, "bound_port", None)
prev_required = getattr(web_server.app.state, "auth_required", None)
web_server.app.state.bound_host = "fly-app.fly.dev"
web_server.app.state.bound_port = 443
web_server.app.state.auth_required = True
# Use https base_url so cookies pick up Secure flag and host_header
# matches the bound interface.
client = TestClient(web_server.app, base_url="https://fly-app.fly.dev")
yield client
clear_providers()
web_server.app.state.bound_host = prev_host
web_server.app.state.bound_port = prev_port
web_server.app.state.auth_required = prev_required
# ---------------------------------------------------------------------------
# Allowlist (public) routes
# ---------------------------------------------------------------------------
def test_gated_status_now_requires_auth(gated_app):
"""When gate is on, /api/status is NOT public — login bootstrap uses /api/auth/providers."""
r = gated_app.get("/api/status")
assert r.status_code == 401
def test_gated_html_redirects_to_login(gated_app):
r = gated_app.get("/", follow_redirects=False)
assert r.status_code == 302
assert r.headers["location"] == "/login"
def test_gated_auth_providers_is_public(gated_app):
r = gated_app.get("/api/auth/providers")
assert r.status_code == 200
body = r.json()
assert any(p["name"] == "stub" for p in body["providers"])
assert body["providers"][0]["display_name"] == "Stub IdP (test only)"
def test_gated_login_html_is_public_and_lists_providers(gated_app):
r = gated_app.get("/login")
assert r.status_code == 200
assert r.headers["content-type"].startswith("text/html")
assert "Stub IdP" in r.text
assert 'href="/auth/login?provider=stub"' in r.text
def test_gated_static_asset_path_is_public(gated_app):
"""``/assets/*`` is allowlisted so the SPA's CSS/JS loads pre-login."""
r = gated_app.get("/assets/_nonexistent.css")
# 404 not 401 — proves middleware let the request through to the
# static-files mount, which then 404'd because the file isn't there.
assert r.status_code == 404
# ---------------------------------------------------------------------------
# OAuth round trip
# ---------------------------------------------------------------------------
def test_full_login_round_trip_unlocks_api_status(gated_app):
# 1) Click "Sign in with Stub IdP" — /auth/login redirects to the stub
# with a PKCE cookie on the response.
r1 = gated_app.get("/auth/login?provider=stub", follow_redirects=False)
assert r1.status_code == 302
pkce = next(
(c for c in r1.headers.get_list("set-cookie")
if c.startswith("hermes_session_pkce=")),
None,
)
assert pkce and "HttpOnly" in pkce
redirect = r1.headers["location"]
# Stub bounces back to {redirect_uri}?code=stub_code&state=<s>
assert "code=stub_code" in redirect
assert "state=" in redirect
state = redirect.split("state=")[1]
# 2) The browser would now follow the redirect to /auth/callback.
# TestClient automatically carries the PKCE cookie forward.
r2 = gated_app.get(
f"/auth/callback?code=stub_code&state={state}",
follow_redirects=False,
)
assert r2.status_code == 302
assert r2.headers["location"] == "/"
set_cookies = r2.headers.get_list("set-cookie")
assert any(c.startswith("hermes_session_at=") for c in set_cookies)
assert any(c.startswith("hermes_session_rt=") for c in set_cookies)
# 3) /api/status now succeeds because we're authenticated.
r3 = gated_app.get("/api/status")
assert r3.status_code == 200
body = r3.json()
assert "version" in body
def test_login_unknown_provider_returns_404(gated_app):
r = gated_app.get("/auth/login?provider=nonexistent", follow_redirects=False)
assert r.status_code == 404
def test_callback_without_pkce_cookie_returns_400(gated_app):
# No prior /auth/login → no PKCE cookie.
r = gated_app.get(
"/auth/callback?code=stub_code&state=anything",
follow_redirects=False,
)
assert r.status_code == 400
def test_callback_state_mismatch_returns_400(gated_app):
# Walk through /auth/login first to plant the PKCE cookie.
r1 = gated_app.get("/auth/login?provider=stub", follow_redirects=False)
# ...then pretend the IDP returned a different state.
r2 = gated_app.get(
"/auth/callback?code=stub_code&state=WRONG",
follow_redirects=False,
)
assert r2.status_code == 400
def test_callback_invalid_code_returns_400(gated_app):
r1 = gated_app.get("/auth/login?provider=stub", follow_redirects=False)
state = r1.headers["location"].split("state=")[1]
r2 = gated_app.get(
f"/auth/callback?code=BAD_CODE&state={state}",
follow_redirects=False,
)
assert r2.status_code == 400
# ---------------------------------------------------------------------------
# Cookie validation
# ---------------------------------------------------------------------------
def test_invalid_cookie_returns_401_on_api(gated_app):
gated_app.cookies.set(SESSION_AT_COOKIE, "garbage-not-a-real-token")
r = gated_app.get("/api/sessions")
assert r.status_code == 401
def test_invalid_cookie_redirects_on_html(gated_app):
gated_app.cookies.set(SESSION_AT_COOKIE, "garbage")
r = gated_app.get("/", follow_redirects=False)
assert r.status_code == 302
assert r.headers["location"] == "/login"
def test_logout_clears_cookies_and_redirects_to_login(gated_app):
# First log in.
r1 = gated_app.get("/auth/login?provider=stub", follow_redirects=False)
state = r1.headers["location"].split("state=")[1]
gated_app.get(
f"/auth/callback?code=stub_code&state={state}",
follow_redirects=False,
)
# Now log out.
r = gated_app.post("/auth/logout", follow_redirects=False)
assert r.status_code == 302
assert r.headers["location"] == "/login"
set_cookies = r.headers.get_list("set-cookie")
assert any(
c.startswith("hermes_session_at=") and "Max-Age=0" in c
for c in set_cookies
)
assert any(
c.startswith("hermes_session_rt=") and "Max-Age=0" in c
for c in set_cookies
)
# ---------------------------------------------------------------------------
# Identity probe
# ---------------------------------------------------------------------------
def test_api_auth_me_returns_session_after_login(gated_app):
r1 = gated_app.get("/auth/login?provider=stub", follow_redirects=False)
state = r1.headers["location"].split("state=")[1]
gated_app.get(
f"/auth/callback?code=stub_code&state={state}",
follow_redirects=False,
)
r = gated_app.get("/api/auth/me")
assert r.status_code == 200
body = r.json()
assert body["user_id"] == "stub-user-1"
assert body["email"] == "stub@example.test"
assert body["display_name"] == "Stub User"
assert body["provider"] == "stub"
assert body["org_id"] == "stub-org-1"
assert "expires_at" in body
def test_api_auth_me_requires_auth(gated_app):
# No cookies.
r = gated_app.get("/api/auth/me")
assert r.status_code == 401
# ---------------------------------------------------------------------------
# Zero-providers fail-closed
# ---------------------------------------------------------------------------
def test_gated_zero_providers_fails_closed_on_api_auth_providers():
"""If gate is on but no providers are registered, /api/auth/providers 503s."""
clear_providers()
prev_required = getattr(web_server.app.state, "auth_required", None)
prev_host = getattr(web_server.app.state, "bound_host", None)
web_server.app.state.bound_host = "fly-app.fly.dev"
web_server.app.state.auth_required = True
try:
client = TestClient(web_server.app, base_url="https://fly-app.fly.dev")
r = client.get("/api/auth/providers")
assert r.status_code == 503
assert "no auth providers" in r.text.lower()
finally:
web_server.app.state.auth_required = prev_required
web_server.app.state.bound_host = prev_host
def test_gated_zero_providers_login_page_renders_help_text():
clear_providers()
prev_required = getattr(web_server.app.state, "auth_required", None)
prev_host = getattr(web_server.app.state, "bound_host", None)
web_server.app.state.bound_host = "fly-app.fly.dev"
web_server.app.state.auth_required = True
try:
client = TestClient(web_server.app, base_url="https://fly-app.fly.dev")
r = client.get("/login")
assert r.status_code == 200
# Empty-provider HTML mentions the fix-up path. (HTML wraps text
# so we can't grep for the exact phrase; check for the canonical
# fragments instead.)
text = r.text.lower()
assert "sign-in unavailable" in text
assert "no authentication" in text
assert "providers are installed" in text
assert "--insecure" in text
finally:
web_server.app.state.auth_required = prev_required
web_server.app.state.bound_host = prev_host