fix(xai-oauth): echo code_challenge in token POST so PKCE exchange succeeds

xAI's OAuth implementation at ``auth.x.ai`` validates the PKCE
``code_challenge`` at the **token** endpoint, not just at the
authorize step.  When Hermes sends the standards-compliant token
POST with ``code_verifier`` alone — exactly what RFC 7636 §4.5
prescribes — xAI rejects the exchange with ``code_challenge is
required`` and the user is stuck with no working OAuth login.

The fix:

* Extract the token POST into ``_xai_oauth_exchange_code_for_tokens``
  so the wire format is unit-testable in isolation.
* Send the original ``code_challenge`` and ``code_challenge_method``
  in the form body alongside ``code_verifier``.  Strict RFC-compliant
  servers ignore the extras at the token endpoint, and xAI's
  permissive implementation accepts the exchange.  This is the
  standard "defensive echo" workaround used by every OAuth client
  that targets a server with this quirk.
* Refuse to fire the POST when ``code_verifier`` is empty — leaking
  the authorization code to a server that can't redeem it is worse
  than failing locally with an actionable error.  The new error
  code is ``xai_pkce_verifier_missing`` and the message points at
  this issue for context.
* Surface the HTTP status code prominently in the 4xx error message
  (``xAI token exchange failed (HTTP 400). Response: …``) so users
  and maintainers can tell a 400 (bad request / PKCE problem) from
  a 403 (tier denied, see #26847) at a glance instead of parsing
  the JSON body by eye.

Closes #26990
This commit is contained in:
xxxigm 2026-05-16 23:11:21 +07:00 committed by Teknium
parent bc7c608d54
commit cb53c40e45

View file

@ -5312,6 +5312,107 @@ def _xai_oauth_build_authorize_url(
return f"{authorization_endpoint}?{urlencode(authorize_params)}"
def _xai_oauth_exchange_code_for_tokens(
*,
token_endpoint: str,
code: str,
redirect_uri: str,
code_verifier: str,
code_challenge: str,
timeout_seconds: float = 20.0,
) -> Dict[str, Any]:
"""POST the authorization code to xAI's token endpoint and return
the parsed JSON payload.
Sends ``code_verifier`` as required by RFC 7636 §4.5. Also echoes
``code_challenge`` + ``code_challenge_method`` in the request body
as a defense-in-depth measure for OAuth servers (xAI's among them,
per #26990) that re-validate the challenge at the token step
instead of relying solely on server-side session state captured
during the authorize step. Echoing the challenge is harmless for
strict RFC-compliant servers RFC 7636 doesn't forbid additional
parameters at the token endpoint and decisively fixes the
``code_challenge is required`` failure mode users hit on the
loopback flow.
Raises :class:`AuthError` on any non-2xx response or transport
failure; the error message embeds the HTTP status code and the
full response body so users can disambiguate cause at a glance.
"""
# Paranoia: if upstream call sites ever drop ``code_verifier`` we
# want to surface a precise, local error rather than send a
# missing-PKCE request to xAI and receive their generic "code
# challenge required" message back.
if not code_verifier:
raise AuthError(
"xAI token exchange refused locally: PKCE code_verifier is empty. "
"This is a bug in Hermes — please report at "
"https://github.com/NousResearch/hermes-agent/issues/26990.",
provider="xai-oauth",
code="xai_pkce_verifier_missing",
)
data = {
"grant_type": "authorization_code",
"code": code,
"redirect_uri": redirect_uri,
"client_id": XAI_OAUTH_CLIENT_ID,
"code_verifier": code_verifier,
}
# Defense-in-depth: include the original ``code_challenge`` and
# ``code_challenge_method``. Some OAuth servers (including xAI's
# auth.x.ai implementation, per the symptom reported in #26990)
# validate these at the token endpoint instead of relying purely on
# state captured during the authorize step — without them, xAI
# rejects the exchange with ``code_challenge is required`` even
# though we sent a valid ``code_verifier``.
if code_challenge:
data["code_challenge"] = code_challenge
data["code_challenge_method"] = "S256"
try:
response = httpx.post(
token_endpoint,
headers={
"Content-Type": "application/x-www-form-urlencoded",
"Accept": "application/json",
},
data=data,
timeout=max(20.0, timeout_seconds),
)
except Exception as exc:
raise AuthError(
f"xAI token exchange failed: {exc}",
provider="xai-oauth",
code="xai_token_exchange_failed",
) from exc
if response.status_code != 200:
body = response.text.strip()
raise AuthError(
f"xAI token exchange failed (HTTP {response.status_code})."
+ (f" Response: {body}" if body else ""),
provider="xai-oauth",
code="xai_token_exchange_failed",
)
try:
payload = response.json()
except Exception as exc:
raise AuthError(
f"xAI token exchange returned invalid JSON: {exc}",
provider="xai-oauth",
code="xai_token_exchange_invalid",
) from exc
if not isinstance(payload, dict):
raise AuthError(
"xAI token exchange response was not a JSON object.",
provider="xai-oauth",
code="xai_token_exchange_invalid",
)
return payload
def _xai_oauth_loopback_login(
*,
timeout_seconds: float = 20.0,
@ -5392,47 +5493,14 @@ def _xai_oauth_loopback_login(
code="xai_code_missing",
)
try:
response = httpx.post(
token_endpoint,
headers={"Content-Type": "application/x-www-form-urlencoded", "Accept": "application/json"},
data={
"grant_type": "authorization_code",
"code": code,
"redirect_uri": redirect_uri,
"client_id": XAI_OAUTH_CLIENT_ID,
"code_verifier": code_verifier,
},
timeout=max(20.0, timeout_seconds),
)
except Exception as exc:
raise AuthError(
f"xAI token exchange failed: {exc}",
provider="xai-oauth",
code="xai_token_exchange_failed",
) from exc
if response.status_code != 200:
detail = response.text.strip()
raise AuthError(
"xAI token exchange failed."
+ (f" Response: {detail}" if detail else ""),
provider="xai-oauth",
code="xai_token_exchange_failed",
)
try:
payload = response.json()
except Exception as exc:
raise AuthError(
f"xAI token exchange returned invalid JSON: {exc}",
provider="xai-oauth",
code="xai_token_exchange_invalid",
) from exc
if not isinstance(payload, dict):
raise AuthError(
"xAI token exchange response was not a JSON object.",
provider="xai-oauth",
code="xai_token_exchange_invalid",
)
payload = _xai_oauth_exchange_code_for_tokens(
token_endpoint=token_endpoint,
code=code,
redirect_uri=redirect_uri,
code_verifier=code_verifier,
code_challenge=code_challenge,
timeout_seconds=timeout_seconds,
)
access_token = str(payload.get("access_token", "") or "").strip()
refresh_token = str(payload.get("refresh_token", "") or "").strip()
if not access_token: