diff --git a/hermes_cli/dashboard_auth/login_page.py b/hermes_cli/dashboard_auth/login_page.py index 7cb2d69eb9c..2baab0fdcc6 100644 --- a/hermes_cli/dashboard_auth/login_page.py +++ b/hermes_cli/dashboard_auth/login_page.py @@ -88,17 +88,35 @@ auth gate (not recommended on untrusted networks).

""" -def render_login_html() -> str: - """Return the full HTML for ``GET /login``.""" +def render_login_html(*, next_path: str = "") -> str: + """Return the full HTML for ``GET /login``. + + ``next_path`` — when set, the post-login landing path the user + originally requested. Threaded into each provider button's ``href`` + as a ``next=`` query parameter so the OAuth round trip carries it + end-to-end. The caller (``routes.login_page``) is responsible for + validating ``next_path`` against the same-origin rules before we + emit it; we still HTML-escape it as defence in depth. + """ providers = list_providers() if not providers: return _EMPTY_HTML + if next_path: + # URL-encode then HTML-escape. The URL-encode step matches the + # gate's ``_safe_next_target`` output shape (also URL-encoded), + # so a value that round-tripped from /login?next=... back into + # the button href is byte-identical. + from urllib.parse import quote + next_qs = f"&next={html.escape(quote(next_path, safe=''), quote=True)}" + else: + next_qs = "" + buttons = [] for p in providers: buttons.append( f' ' + f'href="/auth/login?provider={html.escape(p.name, quote=True)}{next_qs}">' f'Sign in with {html.escape(p.display_name)}' ) return _LOGIN_HTML_TEMPLATE.format(provider_buttons="\n".join(buttons)) diff --git a/hermes_cli/dashboard_auth/routes.py b/hermes_cli/dashboard_auth/routes.py index 7a72371a321..2e8aa067f61 100644 --- a/hermes_cli/dashboard_auth/routes.py +++ b/hermes_cli/dashboard_auth/routes.py @@ -71,8 +71,15 @@ def _client_ip(request: Request) -> str: @router.get("/login", name="login_page") async def login_page(request: Request) -> HTMLResponse: + # Read the ``next=`` query the gate's ``_unauth_response`` set on + # the redirect URL. Validate against the same same-origin rules the + # callback applies (defence in depth — the gate already filters, + # but /login is reachable directly too). + next_path = _validate_post_login_target( + request.query_params.get("next", "") + ) return HTMLResponse( - render_login_html(), + render_login_html(next_path=next_path), headers={"Cache-Control": "no-store, no-cache, must-revalidate"}, ) @@ -105,7 +112,7 @@ async def api_auth_providers() -> Any: @router.get("/auth/login", name="auth_login") -async def auth_login(request: Request, provider: str): +async def auth_login(request: Request, provider: str, next: str = ""): p = get_provider(provider) if p is None: raise HTTPException( @@ -140,6 +147,16 @@ async def auth_login(request: Request, provider: str): pkce = ls.cookie_payload.get("hermes_session_pkce", "") if "provider=" not in pkce: pkce = f"provider={provider};{pkce}" if pkce else f"provider={provider}" + # Carry ``next=`` through the round trip in the PKCE cookie. Real + # IDPs only echo back ``code`` + ``state`` on the callback URL, so + # query-string transport would lose the value — the cookie is the + # only server-controlled channel that survives. Validate before we + # store it so an attacker who reaches /auth/login directly with + # ``next=//evil.example`` can't poison the cookie. + safe_next = _validate_post_login_target(next) + if safe_next: + from urllib.parse import quote + pkce = f"{pkce};next={quote(safe_next, safe='')}" set_pkce_cookie(resp, payload=pkce, use_https=detect_https(request)) return resp @@ -151,7 +168,6 @@ async def auth_callback( state: str = "", error: str = "", error_description: str = "", - next: str = "", ): pkce_raw = read_pkce_cookie(request) if not pkce_raw: @@ -165,13 +181,21 @@ async def auth_callback( detail="Missing PKCE state cookie", ) - # Parse ``provider=...;state=...;verifier=...`` + # Parse ``provider=...;state=...;verifier=...;next=...`` — the + # ``next`` segment is optional (only present when /auth/login was + # given a next= query). All keys live in the same flat namespace; + # ``next`` carries a URL-encoded path so it never contains ``;``. parts = dict( seg.split("=", 1) for seg in pkce_raw.split(";") if "=" in seg ) provider_name = parts.get("provider", "") expected_state = parts.get("state", "") verifier = parts.get("verifier", "") + # Read next= from the cookie ONLY. The IDP doesn't echo next= back + # on the callback URL (it only carries ``code`` + ``state``), so any + # next= query parameter on the callback URL is attacker-controlled + # and MUST be ignored. + next_from_cookie = parts.get("next", "") p = get_provider(provider_name) if p is None: @@ -242,11 +266,13 @@ async def auth_callback( ) expires_in = max(60, session.expires_at - int(time.time())) - # Honour the ``next=`` query param the gate's _unauth_response set in - # the redirect URL. Validated against the same same-origin rules as - # the gate's _safe_next_target — any absolute URL / protocol-relative - # path / loop back to /login is dropped in favour of ``/``. - landing = _validate_post_login_target(next) or "/" + # Honour the ``next=`` value the gate's _unauth_response set in the + # /login redirect URL and that /auth/login persisted into the PKCE + # cookie. We re-validate against the same-origin rules here — the + # cookie is server-set so this is defence in depth, but a regression + # that lets attacker-controlled bytes into the cookie would otherwise + # produce an open redirect. + landing = _validate_post_login_target(next_from_cookie) or "/" resp = RedirectResponse(url=landing, status_code=302) set_session_cookies( resp, diff --git a/tests/hermes_cli/test_dashboard_auth_401_reauth.py b/tests/hermes_cli/test_dashboard_auth_401_reauth.py index db31edf1b2e..7cdb57efa1a 100644 --- a/tests/hermes_cli/test_dashboard_auth_401_reauth.py +++ b/tests/hermes_cli/test_dashboard_auth_401_reauth.py @@ -255,46 +255,229 @@ class TestNextSameOriginValidation: class TestAuthCallbackNext: - def _drive_oauth(self, gated_app, *, next_path: str = ""): - next_qs = f"&next={quote(next_path, safe='')}" if next_path else "" - r1 = gated_app.get( - f"/auth/login?provider=stub{next_qs}", follow_redirects=False - ) - state = r1.headers["location"].split("state=")[1] - # next is preserved by the route (it's in the original URL — but the - # stub IDP returns to /auth/callback. We need to pass next as a - # separate query param on the callback URL to simulate what a real - # IDP would do via state-bound storage. For this test, the - # /auth/callback handler reads `next` directly from its own query - # string, so just append it. + """End-to-end next= propagation through a full OAuth round trip. + + These tests drive the real flow exactly as the gate produces it: + + 1. unauth GET /sessions → 302 /login?next=%2Fsessions + 2. GET /login?next=%2Fsessions → HTML with provider buttons that + carry next=%2Fsessions in their hrefs + 3. GET /auth/login?provider=stub&next=%2Fsessions → 302 to IDP + + PKCE cookie carrying provider/state/verifier/next + 4. IDP returns to /auth/callback?code=...&state=... (NO next on + the callback URL — real IDPs only echo back code+state) + 5. /auth/callback reads next from the PKCE cookie, validates it, + and redirects there. + + Discrimination: each test drives the flow without smuggling + ``next=`` onto the callback URL. Under the pre-fix code paths + (/login ignored next=, /auth/login dropped it, /auth/callback read + it from the wrong place), the callback always lands on ``/``. Only + PKCE-cookie carriage produces the correct landing. + """ + + def _drive_oauth_via_login( + self, gated_app, *, next_path: str = "", + expect_next_in_button: bool = True, + ): + """Walk /login → /auth/login → IDP-bounce → /auth/callback like + a real browser. ``next_path`` is the path the gate would have + encoded for the user; nothing about the callback URL is + smuggled. ``expect_next_in_button`` controls whether the + rendered /login page is expected to thread next= into the + provider button — False for cases where the same-origin + validator drops the value (e.g. //evil.com, /login).""" + login_path = "/login" + if next_path: + login_path = f"/login?next={quote(next_path, safe='')}" + r_login = gated_app.get(login_path, follow_redirects=False) + assert r_login.status_code == 200 + # Click the stub provider button. Real browsers parse the HTML; + # we extract the href the page emitted, so a regression that + # forgets to thread next= through the button will surface here. + body = r_login.text + # Each provider button is emitted as an line. + marker = 'href="' + i = body.find('class="provider-btn"') + assert i != -1, "no provider button in /login HTML" + h = body.find(marker, i) + len(marker) + j = body.find('"', h) + href = body[h:j] + # Critical: the href must carry next= when /login was given + # next= AND the validator accepted it. (This is the property the + # pre-fix render_login_html didn't satisfy.) For rejected + # next= values, the validator drops them at the /login boundary + # and the button href must NOT carry the rogue value. + if next_path and expect_next_in_button: + assert "next=" in href, ( + f"login button dropped next= (href={href!r})" + ) + if next_path and not expect_next_in_button: + assert "next=" not in href, ( + f"login button leaked rejected next= " + f"(next_path={next_path!r}, href={href!r})" + ) + + r_to_idp = gated_app.get(href, follow_redirects=False) + assert r_to_idp.status_code == 302 + # Stub IDP "returns" code+state on the callback URL — same shape + # as a real IDP. Critical: we do NOT append next= here. + state = r_to_idp.headers["location"].split("state=")[1] return gated_app.get( - f"/auth/callback?code=stub_code&state={state}{next_qs}", + f"/auth/callback?code=stub_code&state={state}", follow_redirects=False, ) def test_callback_without_next_lands_at_root(self, gated_app): - r = self._drive_oauth(gated_app) + r = self._drive_oauth_via_login(gated_app) assert r.status_code == 302 assert r.headers["location"] == "/" def test_callback_with_safe_next_lands_there(self, gated_app): - r = self._drive_oauth(gated_app, next_path="/sessions") + r = self._drive_oauth_via_login(gated_app, next_path="/sessions") assert r.status_code == 302 assert r.headers["location"] == "/sessions" def test_callback_with_query_string_in_next(self, gated_app): - r = self._drive_oauth(gated_app, next_path="/sessions?page=2") + r = self._drive_oauth_via_login( + gated_app, next_path="/sessions?page=2" + ) assert r.status_code == 302 assert r.headers["location"] == "/sessions?page=2" def test_callback_rejects_open_redirect(self, gated_app): - # Attacker provides ``next=//evil.com`` hoping for an open redirect - # after successful auth. Validator drops it; user lands at "/". - r = self._drive_oauth(gated_app, next_path="//evil.com/steal") + # Attacker tries to inject ``next=//evil.com`` at the /login + # boundary, hoping it survives to the callback redirect. The + # /login validator drops it before it reaches the button href + # (and therefore the cookie), so the callback never sees it and + # the user lands at "/". + r = self._drive_oauth_via_login( + gated_app, next_path="//evil.com/steal", + expect_next_in_button=False, + ) assert r.status_code == 302 assert r.headers["location"] == "/" def test_callback_rejects_login_loop(self, gated_app): - r = self._drive_oauth(gated_app, next_path="/login") + r = self._drive_oauth_via_login( + gated_app, next_path="/login", + expect_next_in_button=False, + ) assert r.status_code == 302 assert r.headers["location"] == "/" + + def test_attacker_callback_next_param_is_ignored(self, gated_app): + """Hardening: even if an attacker crafts a callback URL with a + rogue ``next=`` query parameter, the server reads from the PKCE + cookie (server-set) and ignores the URL value. This pins the + fix against a regression that re-introduces the URL read.""" + # Drive a clean login with no next=. + r_login = gated_app.get("/login", follow_redirects=False) + assert r_login.status_code == 200 + r_to_idp = gated_app.get( + "/auth/login?provider=stub", follow_redirects=False + ) + state = r_to_idp.headers["location"].split("state=")[1] + # Attacker appends next=/internal-admin to the callback URL. + r = gated_app.get( + f"/auth/callback?code=stub_code&state={state}" + f"&next={quote('/internal-admin', safe='')}", + follow_redirects=False, + ) + assert r.status_code == 302 + # No next= was in the PKCE cookie, so landing must be "/" — + # NOT /internal-admin. + assert r.headers["location"] == "/" + + +# --------------------------------------------------------------------------- +# Unit-level coverage: render_login_html threads next= into provider buttons +# --------------------------------------------------------------------------- + + +class TestRenderLoginHtmlNext: + """Cover ``render_login_html`` directly so a regression that drops + the ``next_path`` parameter is caught at the function boundary, not + only via the full integration walk.""" + + def setup_method(self): + clear_providers() + register_provider(StubAuthProvider()) + + def teardown_method(self): + clear_providers() + + def test_no_next_emits_plain_button(self): + from hermes_cli.dashboard_auth.login_page import render_login_html + html_out = render_login_html() + assert 'href="/auth/login?provider=stub"' in html_out + assert "next=" not in html_out + + def test_next_threaded_url_encoded(self): + from hermes_cli.dashboard_auth.login_page import render_login_html + html_out = render_login_html(next_path="/sessions?page=2") + # next= is URL-encoded — quote(safe='') turns "/" into "%2F", + # "?" into "%3F", "=" into "%3D". The encoded value never + # contains an "&" so the raw "&" separator in the href is + # unambiguous. + assert "next=%2Fsessions%3Fpage%3D2" in html_out + assert "provider=stub&next=" in html_out + + def test_next_with_html_metacharacters_is_escaped(self): + """Defence in depth: even though the caller validates next_path, + we still HTML-escape the rendered value so a regression in the + caller can't trivially produce an HTML-injection sink.""" + from hermes_cli.dashboard_auth.login_page import render_login_html + # `"` in a path is already URL-encoded by quote() to %22, so it + # never reaches the HTML escaper as a raw quote. This test pins + # both layers: quote() does its job AND escape() does its. + html_out = render_login_html(next_path='/x"injected') + assert '"injected' not in html_out + assert "%22injected" in html_out + + +# --------------------------------------------------------------------------- +# Unit-level coverage: /auth/login persists next= into the PKCE cookie +# --------------------------------------------------------------------------- + + +class TestAuthLoginPkceCookieNext: + """Cover the ``/auth/login`` route's PKCE cookie payload directly. + + The cookie is the round-trip carrier for ``next=``; if /auth/login + forgets to encode it, the callback has no path to honour even when + everything else is wired correctly. + """ + + def test_no_next_query_omits_next_segment(self, gated_app): + r = gated_app.get( + "/auth/login?provider=stub", follow_redirects=False + ) + assert r.status_code == 302 + cookies = r.headers.get_list("set-cookie") + pkce = next(c for c in cookies if c.startswith("hermes_session_pkce=")) + assert "next=" not in pkce + + def test_safe_next_query_encoded_into_cookie(self, gated_app): + r = gated_app.get( + f"/auth/login?provider=stub&next={quote('/sessions', safe='')}", + follow_redirects=False, + ) + cookies = r.headers.get_list("set-cookie") + pkce = next(c for c in cookies if c.startswith("hermes_session_pkce=")) + # ``next=`` segment present, URL-encoded. + assert "next=%2Fsessions" in pkce + + def test_unsafe_next_query_dropped_from_cookie(self, gated_app): + """The validator at /auth/login refuses //evil.com BEFORE + storing it. Defence in depth: even if a regression leaks next= + through /login's button rendering, /auth/login is the second + boundary.""" + r = gated_app.get( + f"/auth/login?provider=stub&next={quote('//evil.com/x', safe='')}", + follow_redirects=False, + ) + cookies = r.headers.get_list("set-cookie") + pkce = next(c for c in cookies if c.startswith("hermes_session_pkce=")) + assert "next=" not in pkce