diff --git a/hermes_cli/auth.py b/hermes_cli/auth.py index 6752b65829f..8b154db7468 100644 --- a/hermes_cli/auth.py +++ b/hermes_cli/auth.py @@ -5312,6 +5312,107 @@ def _xai_oauth_build_authorize_url( return f"{authorization_endpoint}?{urlencode(authorize_params)}" +def _xai_oauth_exchange_code_for_tokens( + *, + token_endpoint: str, + code: str, + redirect_uri: str, + code_verifier: str, + code_challenge: str, + timeout_seconds: float = 20.0, +) -> Dict[str, Any]: + """POST the authorization code to xAI's token endpoint and return + the parsed JSON payload. + + Sends ``code_verifier`` as required by RFC 7636 §4.5. Also echoes + ``code_challenge`` + ``code_challenge_method`` in the request body + as a defense-in-depth measure for OAuth servers (xAI's among them, + per #26990) that re-validate the challenge at the token step + instead of relying solely on server-side session state captured + during the authorize step. Echoing the challenge is harmless for + strict RFC-compliant servers — RFC 7636 doesn't forbid additional + parameters at the token endpoint — and decisively fixes the + ``code_challenge is required`` failure mode users hit on the + loopback flow. + + Raises :class:`AuthError` on any non-2xx response or transport + failure; the error message embeds the HTTP status code and the + full response body so users can disambiguate cause at a glance. + """ + # Paranoia: if upstream call sites ever drop ``code_verifier`` we + # want to surface a precise, local error rather than send a + # missing-PKCE request to xAI and receive their generic "code + # challenge required" message back. + if not code_verifier: + raise AuthError( + "xAI token exchange refused locally: PKCE code_verifier is empty. " + "This is a bug in Hermes — please report at " + "https://github.com/NousResearch/hermes-agent/issues/26990.", + provider="xai-oauth", + code="xai_pkce_verifier_missing", + ) + + data = { + "grant_type": "authorization_code", + "code": code, + "redirect_uri": redirect_uri, + "client_id": XAI_OAUTH_CLIENT_ID, + "code_verifier": code_verifier, + } + # Defense-in-depth: include the original ``code_challenge`` and + # ``code_challenge_method``. Some OAuth servers (including xAI's + # auth.x.ai implementation, per the symptom reported in #26990) + # validate these at the token endpoint instead of relying purely on + # state captured during the authorize step — without them, xAI + # rejects the exchange with ``code_challenge is required`` even + # though we sent a valid ``code_verifier``. + if code_challenge: + data["code_challenge"] = code_challenge + data["code_challenge_method"] = "S256" + + try: + response = httpx.post( + token_endpoint, + headers={ + "Content-Type": "application/x-www-form-urlencoded", + "Accept": "application/json", + }, + data=data, + timeout=max(20.0, timeout_seconds), + ) + except Exception as exc: + raise AuthError( + f"xAI token exchange failed: {exc}", + provider="xai-oauth", + code="xai_token_exchange_failed", + ) from exc + + if response.status_code != 200: + body = response.text.strip() + raise AuthError( + f"xAI token exchange failed (HTTP {response.status_code})." + + (f" Response: {body}" if body else ""), + provider="xai-oauth", + code="xai_token_exchange_failed", + ) + + try: + payload = response.json() + except Exception as exc: + raise AuthError( + f"xAI token exchange returned invalid JSON: {exc}", + provider="xai-oauth", + code="xai_token_exchange_invalid", + ) from exc + if not isinstance(payload, dict): + raise AuthError( + "xAI token exchange response was not a JSON object.", + provider="xai-oauth", + code="xai_token_exchange_invalid", + ) + return payload + + def _xai_oauth_loopback_login( *, timeout_seconds: float = 20.0, @@ -5392,47 +5493,14 @@ def _xai_oauth_loopback_login( code="xai_code_missing", ) - try: - response = httpx.post( - token_endpoint, - headers={"Content-Type": "application/x-www-form-urlencoded", "Accept": "application/json"}, - data={ - "grant_type": "authorization_code", - "code": code, - "redirect_uri": redirect_uri, - "client_id": XAI_OAUTH_CLIENT_ID, - "code_verifier": code_verifier, - }, - timeout=max(20.0, timeout_seconds), - ) - except Exception as exc: - raise AuthError( - f"xAI token exchange failed: {exc}", - provider="xai-oauth", - code="xai_token_exchange_failed", - ) from exc - if response.status_code != 200: - detail = response.text.strip() - raise AuthError( - "xAI token exchange failed." - + (f" Response: {detail}" if detail else ""), - provider="xai-oauth", - code="xai_token_exchange_failed", - ) - try: - payload = response.json() - except Exception as exc: - raise AuthError( - f"xAI token exchange returned invalid JSON: {exc}", - provider="xai-oauth", - code="xai_token_exchange_invalid", - ) from exc - if not isinstance(payload, dict): - raise AuthError( - "xAI token exchange response was not a JSON object.", - provider="xai-oauth", - code="xai_token_exchange_invalid", - ) + payload = _xai_oauth_exchange_code_for_tokens( + token_endpoint=token_endpoint, + code=code, + redirect_uri=redirect_uri, + code_verifier=code_verifier, + code_challenge=code_challenge, + timeout_seconds=timeout_seconds, + ) access_token = str(payload.get("access_token", "") or "").strip() refresh_token = str(payload.get("refresh_token", "") or "").strip() if not access_token: