feat(xai-oauth): add xAI Grok OAuth (SuperGrok Subscription) provider

Adds a new authentication provider that lets SuperGrok subscribers sign
in to Hermes with their xAI account via the standard OAuth 2.0 PKCE
loopback flow, instead of pasting a raw API key from console.x.ai.

Highlights
----------
* OAuth 2.0 PKCE loopback login against accounts.x.ai with discovery,
  state/nonce, and a strict CORS-origin allowlist on the callback.
* Authorize URL carries `plan=generic` (required for non-allowlisted
  loopback clients) and `referrer=hermes-agent` for best-effort
  attribution in xAI's OAuth server logs.
* Token storage in `auth.json` with file-locked atomic writes; JWT
  `exp`-based expiry detection with skew; refresh-token rotation
  synced both ways between the singleton store and the credential
  pool so multi-process / multi-profile setups don't tear each other's
  refresh tokens.
* Reactive 401 retry: on a 401 from the xAI Responses API, the agent
  refreshes the token, swaps it back into `self.api_key`, and retries
  the call once. Guarded against silent account swaps when the active
  key was sourced from a different (manual) pool entry.
* Auxiliary tasks (curator, vision, embeddings, etc.) route through a
  dedicated xAI Responses-mode auxiliary client instead of falling back
  to OpenRouter billing.
* Direct HTTP tools (`tools/xai_http.py`, transcription, TTS, image-gen
  plugin) resolve credentials through a unified runtime → singleton →
  env-var fallback chain so xai-oauth users get them for free.
* `hermes auth add xai-oauth` and `hermes auth remove xai-oauth N` are
  wired through the standard auth-commands surface; remove cleans up
  the singleton loopback_pkce entry so it doesn't silently reinstate.
* `hermes model` provider picker shows
  "xAI Grok OAuth (SuperGrok Subscription)" and the model-flow falls
  back to pool credentials when the singleton is missing.

Hardening
---------
* Discovery and refresh responses validate the returned
  `token_endpoint` host against the same `*.x.ai` allowlist as the
  authorization endpoint, blocking MITM persistence of a hostile
  endpoint.
* Discovery / refresh / token-exchange `response.json()` calls are
  wrapped to raise typed `AuthError` on malformed bodies (captive
  portals, proxy error pages) instead of leaking JSONDecodeError
  tracebacks.
* `prompt_cache_key` is routed through `extra_body` on the codex
  transport (sending it as a top-level kwarg trips xAI's SDK with a
  TypeError).
* Credential-pool sync-back preserves `active_provider` so refreshing
  an OAuth entry doesn't silently flip the active provider out from
  under the running agent.

Testing
-------
* New `tests/hermes_cli/test_auth_xai_oauth_provider.py` (~63 tests)
  covers JWT expiry, OAuth URL params (plan + referrer), CORS origins,
  redirect URI validation, singleton↔pool sync, concurrency races,
  refresh error paths, runtime resolution, and malformed-JSON guards.
* Extended `test_credential_pool.py`, `test_codex_transport.py`, and
  `test_run_agent_codex_responses.py` cover the pool sync-back,
  `extra_body` routing, and 401 reactive refresh paths.
* 165 tests passing on this branch via `scripts/run_tests.sh`.
This commit is contained in:
Jaaneek 2026-05-15 16:10:38 +01:00 committed by Teknium
parent 9fb40e6a3d
commit b62c997973
27 changed files with 3843 additions and 131 deletions

View file

@ -72,6 +72,7 @@ DEFAULT_AGENT_KEY_MIN_TTL_SECONDS = 30 * 60 # 30 minutes
ACCESS_TOKEN_REFRESH_SKEW_SECONDS = 120 # refresh 2 min before expiry
DEVICE_AUTH_POLL_INTERVAL_CAP_SECONDS = 1 # poll at most every 1s
DEFAULT_CODEX_BASE_URL = "https://chatgpt.com/backend-api/codex"
DEFAULT_XAI_OAUTH_BASE_URL = "https://api.x.ai/v1"
MINIMAX_OAUTH_CLIENT_ID = "78257093-7e40-4613-99e0-527b14b39113"
MINIMAX_OAUTH_SCOPE = "group_id profile model.completion"
MINIMAX_OAUTH_GRANT_TYPE = "urn:ietf:params:oauth:grant-type:user_code"
@ -89,6 +90,14 @@ STEPFUN_STEP_PLAN_CN_BASE_URL = "https://api.stepfun.com/step_plan/v1"
CODEX_OAUTH_CLIENT_ID = "app_EMoamEEZ73f0CkXaXp7hrann"
CODEX_OAUTH_TOKEN_URL = "https://auth.openai.com/oauth/token"
CODEX_ACCESS_TOKEN_REFRESH_SKEW_SECONDS = 120
XAI_OAUTH_ISSUER = "https://auth.x.ai"
XAI_OAUTH_DISCOVERY_URL = f"{XAI_OAUTH_ISSUER}/.well-known/openid-configuration"
XAI_OAUTH_CLIENT_ID = "b1a00492-073a-47ea-816f-4c329264a828"
XAI_OAUTH_SCOPE = "openid profile email offline_access grok-cli:access api:access"
XAI_OAUTH_REDIRECT_HOST = "127.0.0.1"
XAI_OAUTH_REDIRECT_PORT = 56121
XAI_OAUTH_REDIRECT_PATH = "/callback"
XAI_ACCESS_TOKEN_REFRESH_SKEW_SECONDS = 120
QWEN_OAUTH_CLIENT_ID = "f0304373b74a44d2b584a3fb70ca9e56"
QWEN_OAUTH_TOKEN_URL = "https://chat.qwen.ai/api/v1/oauth2/token"
QWEN_ACCESS_TOKEN_REFRESH_SKEW_SECONDS = 120
@ -162,6 +171,12 @@ PROVIDER_REGISTRY: Dict[str, ProviderConfig] = {
auth_type="oauth_external",
inference_base_url=DEFAULT_CODEX_BASE_URL,
),
"xai-oauth": ProviderConfig(
id="xai-oauth",
name="xAI Grok OAuth (SuperGrok Subscription)",
auth_type="oauth_external",
inference_base_url=DEFAULT_XAI_OAUTH_BASE_URL,
),
"qwen-oauth": ProviderConfig(
id="qwen-oauth",
name="Qwen OAuth",
@ -1364,6 +1379,8 @@ def resolve_provider(
"glm": "zai", "z-ai": "zai", "z.ai": "zai", "zhipu": "zai",
"google": "gemini", "google-gemini": "gemini", "google-ai-studio": "gemini",
"x-ai": "xai", "x.ai": "xai", "grok": "xai",
"xai-oauth": "xai-oauth", "x-ai-oauth": "xai-oauth",
"grok-oauth": "xai-oauth", "xai-grok-oauth": "xai-oauth",
"kimi": "kimi-coding", "kimi-for-coding": "kimi-coding", "moonshot": "kimi-coding",
"kimi-cn": "kimi-coding-cn", "moonshot-cn": "kimi-coding-cn",
"step": "stepfun", "stepfun-coding-plan": "stepfun",
@ -1907,6 +1924,16 @@ def _spotify_code_challenge(code_verifier: str) -> str:
return base64.urlsafe_b64encode(digest).decode("ascii").rstrip("=")
def _oauth_pkce_code_verifier(length: int = 64) -> str:
raw = base64.urlsafe_b64encode(os.urandom(length)).decode("ascii")
return raw.rstrip("=")[:128]
def _oauth_pkce_code_challenge(code_verifier: str) -> str:
digest = hashlib.sha256(code_verifier.encode("utf-8")).digest()
return base64.urlsafe_b64encode(digest).decode("ascii").rstrip("=")
def _spotify_build_authorize_url(
*,
client_id: str,
@ -2029,6 +2056,158 @@ def _spotify_wait_for_callback(
)
def _xai_validate_loopback_redirect_uri(redirect_uri: str) -> tuple[str, int, str]:
parsed = urlparse(redirect_uri)
if parsed.scheme != "http":
raise AuthError(
"xAI OAuth redirect_uri must use http://127.0.0.1.",
provider="xai-oauth",
code="xai_redirect_invalid",
)
host = parsed.hostname or ""
if host != XAI_OAUTH_REDIRECT_HOST:
raise AuthError(
"xAI OAuth redirect_uri must point to 127.0.0.1.",
provider="xai-oauth",
code="xai_redirect_invalid",
)
if not parsed.port:
raise AuthError(
"xAI OAuth redirect_uri must include an explicit localhost port.",
provider="xai-oauth",
code="xai_redirect_invalid",
)
return host, parsed.port, parsed.path or "/"
def _xai_callback_cors_origin(origin: Optional[str]) -> str:
allowed = {
"https://accounts.x.ai",
"https://auth.x.ai",
"https://accounts.mouseion.dev",
"http://localhost:20000",
"http://127.0.0.1:20000",
}
return origin if origin in allowed else ""
def _make_xai_callback_handler(expected_path: str) -> tuple[type[BaseHTTPRequestHandler], dict[str, Any]]:
result: dict[str, Any] = {
"code": None,
"state": None,
"error": None,
"error_description": None,
}
class _XAICallbackHandler(BaseHTTPRequestHandler):
def _maybe_write_cors_headers(self) -> None:
origin = self.headers.get("Origin")
allow_origin = _xai_callback_cors_origin(origin)
if allow_origin:
self.send_header("Access-Control-Allow-Origin", allow_origin)
self.send_header("Access-Control-Allow-Methods", "GET, OPTIONS")
self.send_header("Access-Control-Allow-Headers", "Content-Type")
self.send_header("Access-Control-Allow-Private-Network", "true")
self.send_header("Vary", "Origin")
def do_OPTIONS(self) -> None: # noqa: N802
self.send_response(204)
self._maybe_write_cors_headers()
self.end_headers()
def do_GET(self) -> None: # noqa: N802
parsed = urlparse(self.path)
if parsed.path != expected_path:
self.send_response(404)
self.end_headers()
self.wfile.write(b"Not found.")
return
params = parse_qs(parsed.query)
result["code"] = params.get("code", [None])[0]
result["state"] = params.get("state", [None])[0]
result["error"] = params.get("error", [None])[0]
result["error_description"] = params.get("error_description", [None])[0]
self.send_response(200)
self._maybe_write_cors_headers()
self.send_header("Content-Type", "text/html; charset=utf-8")
self.end_headers()
if result["error"]:
body = "<html><body><h1>xAI authorization failed.</h1>You can close this tab.</body></html>"
else:
body = "<html><body><h1>xAI authorization received.</h1>You can close this tab.</body></html>"
self.wfile.write(body.encode("utf-8"))
def log_message(self, format: str, *args: Any) -> None: # noqa: A003
return
return _XAICallbackHandler, result
def _xai_start_callback_server(
preferred_port: int = XAI_OAUTH_REDIRECT_PORT,
) -> tuple[HTTPServer, threading.Thread, dict[str, Any], str]:
host = XAI_OAUTH_REDIRECT_HOST
expected_path = XAI_OAUTH_REDIRECT_PATH
handler_cls, result = _make_xai_callback_handler(expected_path)
class _ReuseHTTPServer(HTTPServer):
allow_reuse_address = True
ports_to_try = [preferred_port]
if preferred_port != 0:
ports_to_try.append(0)
server = None
last_error: Optional[OSError] = None
for port in ports_to_try:
try:
server = _ReuseHTTPServer((host, port), handler_cls)
break
except OSError as exc:
last_error = exc
if server is None:
raise AuthError(
f"Could not bind xAI callback server on {host}:{preferred_port}: {last_error}",
provider="xai-oauth",
code="xai_callback_bind_failed",
) from last_error
actual_port = int(server.server_address[1])
redirect_uri = f"http://{host}:{actual_port}{expected_path}"
thread = threading.Thread(
target=server.serve_forever,
kwargs={"poll_interval": 0.1},
daemon=True,
)
thread.start()
return server, thread, result, redirect_uri
def _xai_wait_for_callback(
server: HTTPServer,
thread: threading.Thread,
result: dict[str, Any],
*,
timeout_seconds: float = 180.0,
) -> dict[str, Any]:
deadline = time.monotonic() + max(5.0, timeout_seconds)
try:
while time.monotonic() < deadline:
if result["code"] or result["error"]:
return result
time.sleep(0.1)
finally:
server.shutdown()
server.server_close()
thread.join(timeout=1.0)
raise AuthError(
"xAI authorization timed out waiting for the local callback.",
provider="xai-oauth",
code="xai_callback_timeout",
)
def _spotify_token_payload_to_state(
token_payload: Dict[str, Any],
*,
@ -2680,6 +2859,348 @@ def resolve_codex_runtime_credentials(
}
# =============================================================================
# xAI Grok OAuth — tokens stored in ~/.hermes/auth.json
# =============================================================================
def _read_xai_oauth_tokens(*, _lock: bool = True) -> Dict[str, Any]:
if _lock:
with _auth_store_lock():
auth_store = _load_auth_store()
else:
auth_store = _load_auth_store()
state = _load_provider_state(auth_store, "xai-oauth")
if not state:
raise AuthError(
"No xAI OAuth credentials stored. Select xAI Grok OAuth (SuperGrok Subscription) in `hermes model`.",
provider="xai-oauth",
code="xai_auth_missing",
relogin_required=True,
)
tokens = state.get("tokens")
if not isinstance(tokens, dict):
raise AuthError(
"xAI OAuth state is missing tokens. Re-authenticate with `hermes model`.",
provider="xai-oauth",
code="xai_auth_invalid_shape",
relogin_required=True,
)
access_token = str(tokens.get("access_token", "") or "").strip()
refresh_token = str(tokens.get("refresh_token", "") or "").strip()
if not access_token:
raise AuthError(
"xAI OAuth state is missing access_token. Re-authenticate with `hermes model`.",
provider="xai-oauth",
code="xai_auth_missing_access_token",
relogin_required=True,
)
if not refresh_token:
raise AuthError(
"xAI OAuth state is missing refresh_token. Re-authenticate with `hermes model`.",
provider="xai-oauth",
code="xai_auth_missing_refresh_token",
relogin_required=True,
)
return {
"tokens": tokens,
"last_refresh": state.get("last_refresh"),
"discovery": state.get("discovery") or {},
"redirect_uri": state.get("redirect_uri"),
}
def _save_xai_oauth_tokens(
tokens: Dict[str, Any],
*,
discovery: Optional[Dict[str, Any]] = None,
redirect_uri: str = "",
last_refresh: Optional[str] = None,
) -> None:
if last_refresh is None:
last_refresh = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
with _auth_store_lock():
auth_store = _load_auth_store()
state = _load_provider_state(auth_store, "xai-oauth") or {}
state["tokens"] = tokens
state["last_refresh"] = last_refresh
state["auth_mode"] = "oauth_pkce"
if discovery:
state["discovery"] = discovery
if redirect_uri:
state["redirect_uri"] = redirect_uri
_save_provider_state(auth_store, "xai-oauth", state)
_save_auth_store(auth_store)
def _xai_access_token_is_expiring(access_token: str, skew_seconds: int = 0) -> bool:
if not isinstance(access_token, str) or "." not in access_token:
return False
try:
parts = access_token.split(".")
if len(parts) < 2:
return False
payload_b64 = parts[1]
payload_b64 += "=" * (-len(payload_b64) % 4)
payload = json.loads(base64.urlsafe_b64decode(payload_b64.encode("ascii")).decode("utf-8"))
exp = payload.get("exp")
if not isinstance(exp, (int, float)):
return False
return float(exp) <= (time.time() + max(0, int(skew_seconds)))
except Exception:
return False
def _xai_validate_oauth_endpoint(url: str, *, field: str) -> str:
"""Refuse any OIDC discovery endpoint that isn't HTTPS on the xAI origin.
The OIDC discovery response is a long-lived, low-frequency request whose
output is cached in ``~/.hermes/auth.json``. A single MITM during initial
login could substitute a malicious ``token_endpoint``; that URL would
then receive the refresh_token on every subsequent refresh a permanent
credential leak from a one-time MITM. Validating scheme + host pins the
cached endpoint to the xAI auth origin (or a future ``*.x.ai`` subdomain
if xAI migrates) so the cache poisoning loses its persistence guarantee.
RFC 8414 §2 requires the issuer to be ``https://`` and SHOULD-keeps the
token_endpoint on the same origin; we enforce both. ``x.ai`` is the
bare apex, so we accept either exact host match or any ``.x.ai`` suffix.
"""
parsed = urlparse(url)
if parsed.scheme != "https":
raise AuthError(
f"xAI OIDC discovery returned a non-HTTPS {field}: {url!r}.",
provider="xai-oauth",
code="xai_discovery_invalid",
)
host = (parsed.hostname or "").lower()
if not host:
raise AuthError(
f"xAI OIDC discovery {field} is missing a hostname: {url!r}.",
provider="xai-oauth",
code="xai_discovery_invalid",
)
if host != "x.ai" and not host.endswith(".x.ai"):
raise AuthError(
f"xAI OIDC discovery {field} host {host!r} is not on the xAI origin "
f"(expected x.ai or a *.x.ai subdomain). Refusing to use a cached "
f"endpoint that may have been substituted by a MITM during initial "
f"discovery; re-authenticate with `hermes model` to re-fetch.",
provider="xai-oauth",
code="xai_discovery_invalid",
)
return url
def _xai_oauth_discovery(timeout_seconds: float = 15.0) -> Dict[str, str]:
try:
response = httpx.get(
XAI_OAUTH_DISCOVERY_URL,
headers={"Accept": "application/json"},
timeout=timeout_seconds,
)
except Exception as exc:
raise AuthError(
f"xAI OIDC discovery failed: {exc}",
provider="xai-oauth",
code="xai_discovery_failed",
) from exc
if response.status_code != 200:
raise AuthError(
f"xAI OIDC discovery returned status {response.status_code}.",
provider="xai-oauth",
code="xai_discovery_failed",
)
try:
payload = response.json()
except Exception as exc:
raise AuthError(
f"xAI OIDC discovery returned invalid JSON: {exc}",
provider="xai-oauth",
code="xai_discovery_invalid_json",
) from exc
if not isinstance(payload, dict):
raise AuthError(
"xAI OIDC discovery response was not a JSON object.",
provider="xai-oauth",
code="xai_discovery_incomplete",
)
authorization_endpoint = str(payload.get("authorization_endpoint", "") or "").strip()
token_endpoint = str(payload.get("token_endpoint", "") or "").strip()
if not authorization_endpoint or not token_endpoint:
raise AuthError(
"xAI OIDC discovery response was missing required endpoints.",
provider="xai-oauth",
code="xai_discovery_incomplete",
)
_xai_validate_oauth_endpoint(authorization_endpoint, field="authorization_endpoint")
_xai_validate_oauth_endpoint(token_endpoint, field="token_endpoint")
return {
"authorization_endpoint": authorization_endpoint,
"token_endpoint": token_endpoint,
}
def refresh_xai_oauth_pure(
access_token: str,
refresh_token: str,
*,
token_endpoint: str = "",
timeout_seconds: float = 20.0,
) -> Dict[str, Any]:
del access_token
if not isinstance(refresh_token, str) or not refresh_token.strip():
raise AuthError(
"xAI OAuth is missing refresh_token. Re-authenticate with `hermes model`.",
provider="xai-oauth",
code="xai_auth_missing_refresh_token",
relogin_required=True,
)
endpoint = token_endpoint.strip() or _xai_oauth_discovery(timeout_seconds)["token_endpoint"]
# Re-validate cached endpoints on the refresh hot path: an auth.json
# written by an older Hermes (or hand-edited) may carry a non-xAI
# token_endpoint that would receive every future refresh_token in
# plaintext if we trusted it blindly. Cheap suffix check; fast-fail
# with a clear error so the user can re-run `hermes model` to refetch.
_xai_validate_oauth_endpoint(endpoint, field="token_endpoint")
timeout = httpx.Timeout(max(5.0, float(timeout_seconds)))
with httpx.Client(timeout=timeout, headers={"Accept": "application/json"}) as client:
response = client.post(
endpoint,
headers={"Content-Type": "application/x-www-form-urlencoded"},
data={
"grant_type": "refresh_token",
"client_id": XAI_OAUTH_CLIENT_ID,
"refresh_token": refresh_token,
},
)
if response.status_code != 200:
detail = response.text.strip()
raise AuthError(
"xAI token refresh failed."
+ (f" Response: {detail}" if detail else ""),
provider="xai-oauth",
code="xai_refresh_failed",
relogin_required=(response.status_code in {400, 401, 403}),
)
try:
payload = response.json()
except Exception as exc:
raise AuthError(
f"xAI token refresh returned invalid JSON: {exc}",
provider="xai-oauth",
code="xai_refresh_invalid_json",
) from exc
if not isinstance(payload, dict):
raise AuthError(
"xAI token refresh response was not a JSON object.",
provider="xai-oauth",
code="xai_refresh_invalid_response",
relogin_required=True,
)
refreshed_access = str(payload.get("access_token", "") or "").strip()
if not refreshed_access:
raise AuthError(
"xAI token refresh response was missing access_token.",
provider="xai-oauth",
code="xai_refresh_missing_access_token",
relogin_required=True,
)
updated = {
"access_token": refreshed_access,
"refresh_token": str(payload.get("refresh_token") or refresh_token).strip(),
"id_token": str(payload.get("id_token") or "").strip(),
"expires_in": payload.get("expires_in"),
"token_type": str(payload.get("token_type") or "Bearer").strip() or "Bearer",
"last_refresh": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"),
}
return updated
def _refresh_xai_oauth_tokens(
tokens: Dict[str, Any],
*,
token_endpoint: str,
redirect_uri: str = "",
timeout_seconds: float,
) -> Dict[str, Any]:
refreshed = refresh_xai_oauth_pure(
str(tokens.get("access_token", "") or ""),
str(tokens.get("refresh_token", "") or ""),
token_endpoint=token_endpoint,
timeout_seconds=timeout_seconds,
)
updated_tokens = dict(tokens)
updated_tokens["access_token"] = refreshed["access_token"]
updated_tokens["refresh_token"] = refreshed["refresh_token"]
if refreshed.get("id_token"):
updated_tokens["id_token"] = refreshed["id_token"]
if refreshed.get("expires_in") is not None:
updated_tokens["expires_in"] = refreshed["expires_in"]
if refreshed.get("token_type"):
updated_tokens["token_type"] = refreshed["token_type"]
_save_xai_oauth_tokens(
updated_tokens,
discovery={"token_endpoint": token_endpoint},
redirect_uri=redirect_uri,
last_refresh=refreshed["last_refresh"],
)
return updated_tokens
def resolve_xai_oauth_runtime_credentials(
*,
force_refresh: bool = False,
refresh_if_expiring: bool = True,
refresh_skew_seconds: int = XAI_ACCESS_TOKEN_REFRESH_SKEW_SECONDS,
) -> Dict[str, Any]:
data = _read_xai_oauth_tokens()
tokens = dict(data["tokens"])
access_token = str(tokens.get("access_token", "") or "").strip()
refresh_timeout_seconds = float(os.getenv("HERMES_XAI_REFRESH_TIMEOUT_SECONDS", "20"))
discovery = dict(data.get("discovery") or {})
token_endpoint = str(discovery.get("token_endpoint", "") or "").strip()
redirect_uri = str(data.get("redirect_uri", "") or "").strip()
should_refresh = bool(force_refresh)
if (not should_refresh) and refresh_if_expiring:
should_refresh = _xai_access_token_is_expiring(access_token, refresh_skew_seconds)
if should_refresh:
with _auth_store_lock(timeout_seconds=max(float(AUTH_LOCK_TIMEOUT_SECONDS), refresh_timeout_seconds + 5.0)):
data = _read_xai_oauth_tokens(_lock=False)
tokens = dict(data["tokens"])
access_token = str(tokens.get("access_token", "") or "").strip()
discovery = dict(data.get("discovery") or {})
token_endpoint = str(discovery.get("token_endpoint", "") or "").strip()
redirect_uri = str(data.get("redirect_uri", "") or "").strip()
should_refresh = bool(force_refresh)
if (not should_refresh) and refresh_if_expiring:
should_refresh = _xai_access_token_is_expiring(access_token, refresh_skew_seconds)
if should_refresh:
if not token_endpoint:
token_endpoint = _xai_oauth_discovery(refresh_timeout_seconds)["token_endpoint"]
tokens = _refresh_xai_oauth_tokens(
tokens,
token_endpoint=token_endpoint,
redirect_uri=redirect_uri,
timeout_seconds=refresh_timeout_seconds,
)
access_token = str(tokens.get("access_token", "") or "").strip()
base_url = (
os.getenv("HERMES_XAI_BASE_URL", "").strip().rstrip("/")
or os.getenv("XAI_BASE_URL", "").strip().rstrip("/")
or DEFAULT_XAI_OAUTH_BASE_URL
)
return {
"provider": "xai-oauth",
"base_url": base_url,
"api_key": access_token,
"source": "hermes-auth-store",
"last_refresh": data.get("last_refresh"),
"auth_mode": "oauth_pkce",
}
# =============================================================================
# TLS verification helper
# =============================================================================
@ -4030,6 +4551,48 @@ def get_codex_auth_status() -> Dict[str, Any]:
}
def get_xai_oauth_auth_status() -> Dict[str, Any]:
try:
from agent.credential_pool import load_pool
pool = load_pool("xai-oauth")
if pool and pool.has_credentials():
entry = pool.select()
if entry is not None:
api_key = (
getattr(entry, "runtime_api_key", None)
or getattr(entry, "access_token", "")
)
if api_key and not _xai_access_token_is_expiring(api_key, 0):
return {
"logged_in": True,
"auth_store": str(_auth_file_path()),
"last_refresh": getattr(entry, "last_refresh", None),
"auth_mode": "oauth_pkce",
"source": f"pool:{getattr(entry, 'label', 'unknown')}",
"api_key": api_key,
}
except Exception:
pass
try:
creds = resolve_xai_oauth_runtime_credentials()
return {
"logged_in": True,
"auth_store": str(_auth_file_path()),
"last_refresh": creds.get("last_refresh"),
"auth_mode": creds.get("auth_mode"),
"source": creds.get("source"),
"api_key": creds.get("api_key"),
}
except AuthError as exc:
return {
"logged_in": False,
"auth_store": str(_auth_file_path()),
"error": str(exc),
}
def get_api_key_provider_status(provider_id: str) -> Dict[str, Any]:
"""Status snapshot for API-key providers (z.ai, Kimi, MiniMax)."""
pconfig = PROVIDER_REGISTRY.get(provider_id)
@ -4100,6 +4663,8 @@ def get_auth_status(provider_id: Optional[str] = None) -> Dict[str, Any]:
return get_nous_auth_status()
if target == "openai-codex":
return get_codex_auth_status()
if target == "xai-oauth":
return get_xai_oauth_auth_status()
if target == "qwen-oauth":
return get_qwen_auth_status()
if target == "google-gemini-cli":
@ -4320,7 +4885,7 @@ def _logout_default_provider_from_config() -> Optional[str]:
"No provider is currently logged in" and never reset model.provider.
"""
provider = _get_config_provider()
if provider in {"nous", "openai-codex"}:
if provider in {"nous", "openai-codex", "xai-oauth"}:
return provider
return None
@ -4619,6 +5184,245 @@ def _login_openai_codex(
print(f" Config updated: {config_path} (model.provider=openai-codex)")
def _login_xai_oauth(
args,
pconfig: ProviderConfig,
*,
force_new_login: bool = False,
) -> None:
del pconfig
if not force_new_login:
try:
existing = resolve_xai_oauth_runtime_credentials()
api_key = existing.get("api_key", "")
if isinstance(api_key, str) and api_key and not _xai_access_token_is_expiring(api_key, 60):
print("Existing xAI OAuth credentials found in Hermes auth store.")
try:
reuse = input("Use existing credentials? [Y/n]: ").strip().lower()
except (EOFError, KeyboardInterrupt):
reuse = "y"
if reuse in ("", "y", "yes"):
config_path = _update_config_for_provider(
"xai-oauth",
existing.get("base_url", DEFAULT_XAI_OAUTH_BASE_URL),
)
print()
print("Login successful!")
print(f" Config updated: {config_path} (model.provider=xai-oauth)")
return
except AuthError:
pass
print()
print("Signing in to xAI Grok OAuth (SuperGrok Subscription)...")
print("(Hermes creates its own local OAuth session)")
print()
timeout_seconds = float(getattr(args, "timeout", None) or 20.0)
open_browser = not getattr(args, "no_browser", False)
if _is_remote_session():
open_browser = False
creds = _xai_oauth_loopback_login(timeout_seconds=timeout_seconds, open_browser=open_browser)
_save_xai_oauth_tokens(
creds["tokens"],
discovery=creds.get("discovery"),
redirect_uri=creds.get("redirect_uri", ""),
last_refresh=creds.get("last_refresh"),
)
config_path = _update_config_for_provider("xai-oauth", creds.get("base_url", DEFAULT_XAI_OAUTH_BASE_URL))
print()
print("Login successful!")
from hermes_constants import display_hermes_home as _dhh
print(f" Auth state: {_dhh()}/auth.json")
print(f" Config updated: {config_path} (model.provider=xai-oauth)")
def _xai_oauth_build_authorize_url(
*,
authorization_endpoint: str,
redirect_uri: str,
code_challenge: str,
state: str,
nonce: str,
) -> str:
# `plan=generic` opts the consent screen into xAI's generic OAuth plan
# tier instead of falling back to the per-account default. Without it,
# accounts.x.ai rejects loopback OAuth from non-allowlisted clients.
# `referrer=hermes-agent` lets xAI attribute Hermes-originated logins
# in their OAuth server logs (we still impersonate the upstream Grok-CLI
# client_id; this is best-effort attribution until xAI mints us our own).
authorize_params = {
"response_type": "code",
"client_id": XAI_OAUTH_CLIENT_ID,
"redirect_uri": redirect_uri,
"scope": XAI_OAUTH_SCOPE,
"code_challenge": code_challenge,
"code_challenge_method": "S256",
"state": state,
"nonce": nonce,
"plan": "generic",
"referrer": "hermes-agent",
}
return f"{authorization_endpoint}?{urlencode(authorize_params)}"
def _xai_oauth_loopback_login(
*,
timeout_seconds: float = 20.0,
open_browser: bool = True,
) -> Dict[str, Any]:
discovery = _xai_oauth_discovery(timeout_seconds)
authorization_endpoint = discovery["authorization_endpoint"]
token_endpoint = discovery["token_endpoint"]
server, thread, callback_result, redirect_uri = _xai_start_callback_server()
try:
_xai_validate_loopback_redirect_uri(redirect_uri)
code_verifier = _oauth_pkce_code_verifier()
code_challenge = _oauth_pkce_code_challenge(code_verifier)
state = uuid.uuid4().hex
nonce = uuid.uuid4().hex
authorize_url = _xai_oauth_build_authorize_url(
authorization_endpoint=authorization_endpoint,
redirect_uri=redirect_uri,
code_challenge=code_challenge,
state=state,
nonce=nonce,
)
print("Open this URL to authorize Hermes with xAI:")
print(authorize_url)
print()
print(f"Waiting for callback on {redirect_uri}")
if open_browser and not _is_remote_session():
try:
opened = webbrowser.open(authorize_url)
except Exception:
opened = False
if opened:
print("Browser opened for xAI authorization.")
else:
print("Could not open the browser automatically; use the URL above.")
callback = _xai_wait_for_callback(
server,
thread,
callback_result,
timeout_seconds=max(30.0, timeout_seconds * 9),
)
except Exception:
try:
server.shutdown()
server.server_close()
except Exception:
pass
try:
thread.join(timeout=1.0)
except Exception:
pass
raise
if callback.get("error"):
detail = callback.get("error_description") or callback["error"]
raise AuthError(
f"xAI authorization failed: {detail}",
provider="xai-oauth",
code="xai_authorization_failed",
)
if callback.get("state") != state:
raise AuthError(
"xAI authorization failed: state mismatch.",
provider="xai-oauth",
code="xai_state_mismatch",
)
code = str(callback.get("code") or "").strip()
if not code:
raise AuthError(
"xAI authorization failed: missing authorization code.",
provider="xai-oauth",
code="xai_code_missing",
)
try:
response = httpx.post(
token_endpoint,
headers={"Content-Type": "application/x-www-form-urlencoded", "Accept": "application/json"},
data={
"grant_type": "authorization_code",
"code": code,
"redirect_uri": redirect_uri,
"client_id": XAI_OAUTH_CLIENT_ID,
"code_verifier": code_verifier,
},
timeout=max(20.0, timeout_seconds),
)
except Exception as exc:
raise AuthError(
f"xAI token exchange failed: {exc}",
provider="xai-oauth",
code="xai_token_exchange_failed",
) from exc
if response.status_code != 200:
detail = response.text.strip()
raise AuthError(
"xAI token exchange failed."
+ (f" Response: {detail}" if detail else ""),
provider="xai-oauth",
code="xai_token_exchange_failed",
)
try:
payload = response.json()
except Exception as exc:
raise AuthError(
f"xAI token exchange returned invalid JSON: {exc}",
provider="xai-oauth",
code="xai_token_exchange_invalid",
) from exc
if not isinstance(payload, dict):
raise AuthError(
"xAI token exchange response was not a JSON object.",
provider="xai-oauth",
code="xai_token_exchange_invalid",
)
access_token = str(payload.get("access_token", "") or "").strip()
refresh_token = str(payload.get("refresh_token", "") or "").strip()
if not access_token:
raise AuthError(
"xAI token exchange did not return an access_token.",
provider="xai-oauth",
code="xai_token_exchange_invalid",
)
if not refresh_token:
raise AuthError(
"xAI token exchange did not return a refresh_token.",
provider="xai-oauth",
code="xai_token_exchange_invalid",
)
base_url = (
os.getenv("HERMES_XAI_BASE_URL", "").strip().rstrip("/")
or os.getenv("XAI_BASE_URL", "").strip().rstrip("/")
or DEFAULT_XAI_OAUTH_BASE_URL
)
return {
"tokens": {
"access_token": access_token,
"refresh_token": refresh_token,
"id_token": str(payload.get("id_token", "") or "").strip(),
"expires_in": payload.get("expires_in"),
"token_type": str(payload.get("token_type") or "Bearer").strip() or "Bearer",
},
"discovery": discovery,
"redirect_uri": redirect_uri,
"base_url": base_url,
"last_refresh": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"),
"source": "oauth-loopback",
}
def _codex_device_code_login() -> Dict[str, Any]:
"""Run the OpenAI device code login flow and return credentials dict."""
import time as _time

View file

@ -33,7 +33,7 @@ from hermes_constants import OPENROUTER_BASE_URL
# Providers that support OAuth login in addition to API keys.
_OAUTH_CAPABLE_PROVIDERS = {"anthropic", "nous", "openai-codex", "qwen-oauth", "google-gemini-cli", "minimax-oauth"}
_OAUTH_CAPABLE_PROVIDERS = {"anthropic", "nous", "openai-codex", "xai-oauth", "qwen-oauth", "google-gemini-cli", "minimax-oauth"}
def _get_custom_provider_names() -> list:
@ -77,6 +77,8 @@ def _normalize_provider(provider: str) -> str:
normalized = (provider or "").strip().lower()
if normalized in {"or", "open-router"}:
return "openrouter"
if normalized in {"grok-oauth", "xai-oauth", "x-ai-oauth", "xai-grok-oauth"}:
return "xai-oauth"
# Check if it matches a custom provider name
custom_key = _resolve_custom_provider_input(normalized)
if custom_key:
@ -170,7 +172,7 @@ def auth_add_command(args) -> None:
if provider.startswith(CUSTOM_POOL_PREFIX):
requested_type = AUTH_TYPE_API_KEY
else:
requested_type = AUTH_TYPE_OAUTH if provider in {"anthropic", "nous", "openai-codex", "qwen-oauth", "google-gemini-cli", "minimax-oauth"} else AUTH_TYPE_API_KEY
requested_type = AUTH_TYPE_OAUTH if provider in _OAUTH_CAPABLE_PROVIDERS else AUTH_TYPE_API_KEY
pool = load_pool(provider)
@ -333,6 +335,31 @@ def auth_add_command(args) -> None:
print(f'Added {provider} OAuth credential #{len(pool.entries())}: "{entry.label}"')
return
if provider == "xai-oauth":
creds = auth_mod._xai_oauth_loopback_login(
timeout_seconds=getattr(args, "timeout", None) or 20.0,
open_browser=not getattr(args, "no_browser", False),
)
label = (getattr(args, "label", None) or "").strip() or label_from_token(
creds["tokens"]["access_token"],
_oauth_default_label(provider, len(pool.entries()) + 1),
)
entry = PooledCredential(
provider=provider,
id=uuid.uuid4().hex[:6],
label=label,
auth_type=AUTH_TYPE_OAUTH,
priority=0,
source=f"{SOURCE_MANUAL}:xai_pkce",
access_token=creds["tokens"]["access_token"],
refresh_token=creds["tokens"].get("refresh_token"),
base_url=creds.get("base_url"),
last_refresh=creds.get("last_refresh"),
)
pool.add_entry(entry)
print(f'Added {provider} OAuth credential #{len(pool.entries())}: "{entry.label}"')
return
if provider == "google-gemini-cli":
from agent.google_oauth import run_gemini_oauth_login_pure

View file

@ -1932,6 +1932,8 @@ def select_provider_and_model(args=None):
_model_flow_nous(config, current_model, args=args)
elif selected_provider == "openai-codex":
_model_flow_openai_codex(config, current_model)
elif selected_provider == "xai-oauth":
_model_flow_xai_oauth(config, current_model)
elif selected_provider == "qwen-oauth":
_model_flow_qwen_oauth(config, current_model)
elif selected_provider == "minimax-oauth":
@ -2813,6 +2815,87 @@ def _model_flow_openai_codex(config, current_model=""):
print("No change.")
def _model_flow_xai_oauth(_config, current_model=""):
"""xAI Grok OAuth (SuperGrok Subscription) provider: ensure logged in, then pick model."""
from hermes_cli.auth import (
get_xai_oauth_auth_status,
_prompt_model_selection,
_save_model_choice,
_update_config_for_provider,
resolve_xai_oauth_runtime_credentials,
_login_xai_oauth,
DEFAULT_XAI_OAUTH_BASE_URL,
PROVIDER_REGISTRY,
)
from hermes_cli.models import _PROVIDER_MODELS
status = get_xai_oauth_auth_status()
if status.get("logged_in"):
print(" xAI Grok OAuth (SuperGrok Subscription) credentials: ✓")
print()
print(" 1. Use existing credentials")
print(" 2. Reauthenticate (new OAuth login)")
print(" 3. Cancel")
print()
try:
choice = input(" Choice [1/2/3]: ").strip()
except (KeyboardInterrupt, EOFError):
choice = "1"
if choice == "2":
print("Starting a fresh xAI OAuth login...")
print()
try:
mock_args = argparse.Namespace()
_login_xai_oauth(
mock_args,
PROVIDER_REGISTRY["xai-oauth"],
force_new_login=True,
)
except SystemExit:
print("Login cancelled or failed.")
return
except Exception as exc:
print(f"Login failed: {exc}")
return
elif choice == "3":
return
else:
print("Not logged into xAI Grok OAuth (SuperGrok Subscription). Starting login...")
print()
try:
mock_args = argparse.Namespace()
_login_xai_oauth(mock_args, PROVIDER_REGISTRY["xai-oauth"])
except SystemExit:
print("Login cancelled or failed.")
return
except Exception as exc:
print(f"Login failed: {exc}")
return
# Resolve a usable base URL. ``resolve_xai_oauth_runtime_credentials``
# only reads from the auth.json singleton — but credentials may legitimately
# live only in the pool (e.g. after ``hermes auth add xai-oauth``). Fall
# back to the default base URL in that case so the model picker still
# completes successfully instead of bailing out with
# ``Could not resolve xAI OAuth credentials``.
base_url = DEFAULT_XAI_OAUTH_BASE_URL
try:
creds = resolve_xai_oauth_runtime_credentials()
base_url = (creds.get("base_url") or "").strip().rstrip("/") or base_url
except Exception:
pass
models = list(_PROVIDER_MODELS.get("xai-oauth") or _PROVIDER_MODELS.get("xai") or [])
selected = _prompt_model_selection(models, current_model=current_model or (models[0] if models else "grok-code-fast-1"))
if selected:
_save_model_choice(selected)
_update_config_for_provider("xai-oauth", base_url)
print(f"Default model set to: {selected} (via xAI Grok OAuth — SuperGrok Subscription)")
else:
print("No change.")
_DEFAULT_QWEN_PORTAL_MODELS = [
"qwen3-coder-plus",
"qwen3-coder",
@ -9400,7 +9483,7 @@ def _build_provider_choices() -> list[str]:
except Exception:
# Fallback: static list guarantees the CLI always works
return [
"auto", "openrouter", "nous", "openai-codex", "copilot-acp", "copilot",
"auto", "openrouter", "nous", "openai-codex", "xai-oauth", "copilot-acp", "copilot",
"anthropic", "gemini", "google-gemini-cli", "xai", "bedrock", "azure-foundry",
"ollama-cloud", "huggingface", "zai", "kimi-coding", "kimi-coding-cn",
"stepfun", "minimax", "minimax-cn", "kilocode", "novita", "xiaomi", "arcee",
@ -9931,7 +10014,7 @@ def main():
)
login_parser.add_argument(
"--provider",
choices=["nous", "openai-codex"],
choices=["nous", "openai-codex", "xai-oauth"],
default=None,
help="Provider to authenticate with (default: nous)",
)
@ -9977,7 +10060,7 @@ def main():
)
logout_parser.add_argument(
"--provider",
choices=["nous", "openai-codex", "spotify"],
choices=["nous", "openai-codex", "xai-oauth", "spotify"],
default=None,
help="Provider to log out from (default: active provider)",
)

View file

@ -116,13 +116,23 @@ def _codex_curated_models() -> list[str]:
# (grok-4, grok-4-0709, grok-4-fast{,-reasoning,-non-reasoning},
# grok-4-1-fast{,-reasoning,-non-reasoning}, grok-code-fast-1 → grok-4.3).
_XAI_STATIC_FALLBACK: list[str] = [
"grok-4.3",
"grok-4.20-0309-reasoning",
"grok-4.20-0309-non-reasoning",
"grok-4.20-multi-agent-0309",
"grok-4.3",
]
_XAI_TOP_MODEL = "grok-4.3"
def _xai_promote_top(ids: list[str]) -> list[str]:
"""Pin the headline xAI model to the top of the curated list."""
if _XAI_TOP_MODEL in ids:
return [_XAI_TOP_MODEL] + [m for m in ids if m != _XAI_TOP_MODEL]
return ids
def _xai_curated_models() -> list[str]:
"""Derive the xAI-direct curated list from models.dev disk cache.
@ -142,7 +152,7 @@ def _xai_curated_models() -> list[str]:
if isinstance(models, dict) and models:
ids = [mid for mid in models.keys() if isinstance(mid, str)]
if ids:
return sorted(ids)
return _xai_promote_top(sorted(ids))
except Exception:
# Any failure (missing file, malformed JSON, import error)
# falls through to the static list.
@ -190,6 +200,7 @@ _PROVIDER_MODELS: dict[str, list[str]] = {
"gpt-4o-mini",
],
"openai-codex": _codex_curated_models(),
"xai-oauth": _xai_curated_models(),
"copilot-acp": [
"copilot-acp",
],
@ -918,6 +929,7 @@ CANONICAL_PROVIDERS: list[ProviderEntry] = [
ProviderEntry("anthropic", "Anthropic", "Anthropic (Claude models — API key or Claude Code)"),
ProviderEntry("openai-codex", "OpenAI Codex", "OpenAI Codex"),
ProviderEntry("alibaba", "Qwen Cloud", "Qwen Cloud / DashScope Coding (Qwen + multi-provider)"),
ProviderEntry("xai-oauth", "xAI Grok OAuth (SuperGrok Subscription)", "xAI Grok OAuth (SuperGrok Subscription)"),
ProviderEntry("xiaomi", "Xiaomi MiMo", "Xiaomi MiMo (MiMo-V2.5 and V2 models — pro, omni, flash)"),
ProviderEntry("tencent-tokenhub", "Tencent TokenHub", "Tencent TokenHub (Hy3 Preview — direct API via tokenhub.tencentmaas.com)"),
ProviderEntry("nvidia", "NVIDIA NIM", "NVIDIA NIM (Nemotron models — build.nvidia.com or local NIM)"),
@ -1036,6 +1048,10 @@ _PROVIDER_ALIASES = {
"amazon-bedrock": "bedrock",
"amazon": "bedrock",
"grok": "xai",
"grok-oauth": "xai-oauth",
"xai-oauth": "xai-oauth",
"x-ai-oauth": "xai-oauth",
"xai-grok-oauth": "xai-oauth",
"x-ai": "xai",
"x.ai": "xai",
"nim": "nvidia",
@ -2166,6 +2182,8 @@ def provider_model_ids(provider: Optional[str], *, force_refresh: bool = False)
except Exception:
access_token = None
return get_codex_model_ids(access_token=access_token)
if normalized == "xai-oauth":
return list(_PROVIDER_MODELS.get("xai-oauth", _PROVIDER_MODELS.get("xai", [])))
if normalized in {"copilot", "copilot-acp"}:
try:
live = _fetch_github_models(_resolve_copilot_catalog_api_key())
@ -3444,14 +3462,14 @@ def validate_requested_model(
"message": message,
}
# OpenAI Codex has its own catalog path; /v1/models probing is not the right validation path.
if normalized == "openai-codex":
# Providers with non-standard catalog validation — /v1/models probing is not the right path.
if normalized in {"openai-codex", "xai-oauth"}:
try:
codex_models = provider_model_ids("openai-codex")
catalog_models = provider_model_ids(normalized)
except Exception:
codex_models = []
if codex_models:
if requested_for_lookup in set(codex_models):
catalog_models = []
if catalog_models:
if requested_for_lookup in set(catalog_models):
return {
"accepted": True,
"persist": True,
@ -3459,7 +3477,7 @@ def validate_requested_model(
"message": None,
}
# Auto-correct if the top match is very similar (e.g. typo)
auto = get_close_matches(requested_for_lookup, codex_models, n=1, cutoff=0.9)
auto = get_close_matches(requested_for_lookup, catalog_models, n=1, cutoff=0.9)
if auto:
return {
"accepted": True,
@ -3468,17 +3486,18 @@ def validate_requested_model(
"corrected_model": auto[0],
"message": f"Auto-corrected `{requested}` → `{auto[0]}`",
}
suggestions = get_close_matches(requested_for_lookup, codex_models, n=3, cutoff=0.5)
suggestions = get_close_matches(requested_for_lookup, catalog_models, n=3, cutoff=0.5)
suggestion_text = ""
if suggestions:
suggestion_text = "\n Similar models: " + ", ".join(f"`{s}`" for s in suggestions)
provider_label = "OpenAI Codex" if normalized == "openai-codex" else "xAI Grok OAuth (SuperGrok Subscription)"
return {
"accepted": True,
"persist": True,
"recognized": False,
"message": (
f"Note: `{requested}` was not found in the OpenAI Codex model listing. "
"It may still work if your ChatGPT/Codex account has access to a newer or hidden model ID."
f"Note: `{requested}` was not found in the {provider_label} model listing. "
"It may still work if your account has access to a newer or hidden model ID."
f"{suggestion_text}"
),
}

View file

@ -60,6 +60,12 @@ HERMES_OVERLAYS: Dict[str, HermesOverlay] = {
auth_type="oauth_external",
base_url_override="https://chatgpt.com/backend-api/codex",
),
"xai-oauth": HermesOverlay(
transport="codex_responses",
auth_type="oauth_external",
base_url_override="https://api.x.ai/v1",
base_url_env_var="XAI_BASE_URL",
),
"qwen-oauth": HermesOverlay(
transport="openai_chat",
auth_type="oauth_external",
@ -244,6 +250,10 @@ ALIASES: Dict[str, str] = {
"x-ai": "xai",
"x.ai": "xai",
"grok": "xai",
"grok-oauth": "xai-oauth",
"xai-oauth": "xai-oauth",
"x-ai-oauth": "xai-oauth",
"xai-grok-oauth": "xai-oauth",
# nvidia
"nim": "nvidia",

View file

@ -15,12 +15,14 @@ from hermes_cli.auth import (
AuthError,
DEFAULT_CODEX_BASE_URL,
DEFAULT_QWEN_BASE_URL,
DEFAULT_XAI_OAUTH_BASE_URL,
PROVIDER_REGISTRY,
_agent_key_is_usable,
format_auth_error,
resolve_provider,
resolve_nous_runtime_credentials,
resolve_codex_runtime_credentials,
resolve_xai_oauth_runtime_credentials,
resolve_qwen_runtime_credentials,
resolve_gemini_oauth_runtime_credentials,
resolve_api_key_provider_credentials,
@ -238,6 +240,9 @@ def _resolve_runtime_from_pool_entry(
if provider == "openai-codex":
api_mode = "codex_responses"
base_url = base_url or DEFAULT_CODEX_BASE_URL
elif provider == "xai-oauth":
api_mode = "codex_responses"
base_url = base_url or DEFAULT_XAI_OAUTH_BASE_URL
elif provider == "qwen-oauth":
api_mode = "chat_completions"
base_url = base_url or DEFAULT_QWEN_BASE_URL
@ -1132,6 +1137,24 @@ def resolve_runtime_provider(
logger.info("Auto-detected Codex provider but credentials failed; "
"falling through to next provider.")
if provider == "xai-oauth":
try:
creds = resolve_xai_oauth_runtime_credentials()
return {
"provider": "xai-oauth",
"api_mode": "codex_responses",
"base_url": (creds.get("base_url") or "").rstrip("/") or DEFAULT_XAI_OAUTH_BASE_URL,
"api_key": creds.get("api_key", ""),
"source": creds.get("source", "hermes-auth-store"),
"last_refresh": creds.get("last_refresh"),
"requested_provider": requested_provider,
}
except AuthError:
if requested_provider != "auto":
raise
logger.info("Auto-detected xAI OAuth provider but credentials failed; "
"falling through to next provider.")
if provider == "qwen-oauth":
try:
creds = resolve_qwen_runtime_credentials()

View file

@ -1091,6 +1091,58 @@ def _install_kittentts_deps() -> bool:
return False
def _xai_oauth_logged_in_for_setup() -> bool:
"""True iff xAI Grok OAuth credentials are already stored locally.
Lets TTS / STT setup skip the API-key prompt for users who logged in
through ``hermes model`` -> xAI Grok OAuth (SuperGrok Subscription).
"""
try:
from hermes_cli.auth import get_xai_oauth_auth_status
return bool(get_xai_oauth_auth_status().get("logged_in"))
except Exception:
return False
def _run_xai_oauth_login_from_setup() -> bool:
"""Run the xAI Grok OAuth loopback login from inside the setup wizard.
Returns True on success, False on any failure (the caller falls back
to whatever the user picked next, e.g. Edge TTS).
"""
try:
from hermes_cli.auth import (
DEFAULT_XAI_OAUTH_BASE_URL,
_is_remote_session,
_save_xai_oauth_tokens,
_update_config_for_provider,
_xai_oauth_loopback_login,
)
except Exception as exc:
print_warning(f"xAI Grok OAuth helpers unavailable: {exc}")
return False
open_browser = not _is_remote_session()
print()
print_info("Signing in to xAI Grok OAuth (SuperGrok Subscription)...")
try:
creds = _xai_oauth_loopback_login(open_browser=open_browser)
_save_xai_oauth_tokens(
creds["tokens"],
discovery=creds.get("discovery"),
redirect_uri=creds.get("redirect_uri", ""),
last_refresh=creds.get("last_refresh"),
)
_update_config_for_provider(
"xai-oauth", creds.get("base_url", DEFAULT_XAI_OAUTH_BASE_URL)
)
return True
except Exception as exc:
print_warning(f"xAI Grok OAuth login failed: {exc}")
return False
def _setup_tts_provider(config: dict):
"""Interactive TTS provider selection with install flow for NeuTTS."""
tts_config = config.get("tts", {})
@ -1125,7 +1177,7 @@ def _setup_tts_provider(config: dict):
"Edge TTS (free, cloud-based, no setup needed)",
"ElevenLabs (premium quality, needs API key)",
"OpenAI TTS (good quality, needs API key)",
"xAI TTS (Grok voices, needs API key)",
"xAI TTS (Grok voices — OAuth login or API key)",
"MiniMax TTS (high quality with voice cloning, needs API key)",
"Mistral Voxtral TTS (multilingual, native Opus, needs API key)",
"Google Gemini TTS (30 prebuilt voices, prompt-controllable, needs API key)",
@ -1199,21 +1251,59 @@ def _setup_tts_provider(config: dict):
selected = "edge"
elif selected == "xai":
existing = get_env_value("XAI_API_KEY")
if not existing:
# Resolution order: existing OAuth tokens (free for SuperGrok subscribers
# via the Hermes auth store) > existing XAI_API_KEY > prompt the user.
# When neither is configured, offer both options instead of forcing the
# API-key path — xAI TTS works fine with OAuth bearer tokens too.
oauth_logged_in = _xai_oauth_logged_in_for_setup()
existing_api_key = get_env_value("XAI_API_KEY")
if oauth_logged_in:
print_success(
"xAI TTS will use your xAI Grok OAuth (SuperGrok Subscription) "
"credentials"
)
elif existing_api_key:
print_success("xAI TTS will use your existing XAI_API_KEY")
else:
print()
api_key = prompt("xAI API key for TTS", password=True)
if api_key:
save_env_value("XAI_API_KEY", api_key)
print_success("xAI TTS API key saved")
choice_idx = prompt_choice(
"How do you want xAI TTS to authenticate?",
choices=[
"Sign in with xAI Grok OAuth (SuperGrok Subscription) — browser login",
"Paste an xAI API key (console.x.ai)",
"Skip → fallback to Edge TTS",
],
default=0,
)
if choice_idx == 0:
if _run_xai_oauth_login_from_setup():
print_success(
"Logged in — xAI TTS will use these OAuth credentials"
)
else:
print_warning(
"xAI Grok OAuth login did not complete. "
"Falling back to Edge TTS."
)
selected = "edge"
elif choice_idx == 1:
api_key = prompt("xAI API key for TTS", password=True)
if api_key:
save_env_value("XAI_API_KEY", api_key)
print_success("xAI TTS API key saved")
else:
from hermes_constants import display_hermes_home as _dhh
print_warning(
"No xAI API key provided for TTS. Configure XAI_API_KEY "
f"via hermes setup model or {_dhh()}/.env to use xAI TTS. "
"Falling back to Edge TTS."
)
selected = "edge"
else:
from hermes_constants import display_hermes_home as _dhh
print_warning(
"No xAI API key provided for TTS. Configure XAI_API_KEY via "
f"hermes setup model or {_dhh()}/.env to use xAI TTS. "
"Falling back to Edge TTS."
)
print_warning("xAI TTS skipped. Falling back to Edge TTS.")
selected = "edge"
if selected == "xai":
print()
voice_id = prompt("xAI voice_id (Enter for 'eve', or paste a custom voice ID)")

View file

@ -194,11 +194,10 @@ TOOL_CATEGORIES = {
},
{
"name": "xAI TTS",
"tag": "Grok voices - requires xAI API key",
"env_vars": [
{"key": "XAI_API_KEY", "prompt": "xAI API key", "url": "https://console.x.ai/"},
],
"tag": "Grok voices — uses xAI Grok OAuth or XAI_API_KEY",
"env_vars": [],
"tts_provider": "xai",
"post_setup": "xai_grok",
},
{
"name": "ElevenLabs",
@ -925,6 +924,73 @@ def _run_post_setup(post_setup_key: str):
_print_info(" Restart Hermes for tracing to take effect.")
_print_info(" Verify: hermes plugins list")
elif post_setup_key == "xai_grok":
# Shared credential bootstrap for any picker entry that talks to xAI
# (TTS, Video Gen, future Image Gen, etc.). Accepts either a
# SuperGrok-tier OAuth bearer token (preferred — billed against the
# user's existing subscription) or a raw XAI_API_KEY from
# console.x.ai. The picker entries declare empty env_vars so we
# drive the full auth UX here.
try:
from hermes_cli.auth import get_xai_oauth_auth_status
oauth_logged_in = bool(get_xai_oauth_auth_status().get("logged_in"))
except Exception:
oauth_logged_in = False
existing_api_key = get_env_value("XAI_API_KEY")
if oauth_logged_in:
_print_success(
" xAI will use your xAI Grok OAuth (SuperGrok Subscription) credentials"
)
return
if existing_api_key:
_print_success(" xAI will use your existing XAI_API_KEY")
return
_print_info(" xAI needs credentials. Choose one:")
try:
from hermes_cli.setup import (
_run_xai_oauth_login_from_setup,
prompt_choice,
prompt as _setup_prompt,
)
from hermes_cli.config import save_env_value
except Exception as exc:
_print_warning(f" Could not load setup helpers: {exc}")
_print_info(" Run later: hermes auth add xai-oauth (or set XAI_API_KEY)")
return
idx = prompt_choice(
" How do you want xAI to authenticate?",
choices=[
"Sign in with xAI Grok OAuth (SuperGrok Subscription) — browser login",
"Paste an xAI API key (console.x.ai)",
"Skip — configure later via `hermes auth add xai-oauth`",
],
default=0,
)
if idx == 0:
if _run_xai_oauth_login_from_setup():
_print_success(
" Logged in — xAI will use these OAuth credentials"
)
else:
_print_warning(
" xAI Grok OAuth login did not complete. "
"Run later: hermes auth add xai-oauth"
)
elif idx == 1:
api_key = _setup_prompt(" xAI API key", password=True)
if api_key:
save_env_value("XAI_API_KEY", api_key)
_print_success(" XAI_API_KEY saved")
else:
_print_warning(
" No API key provided. Run later: hermes auth add xai-oauth"
)
else:
_print_info(" xAI will remain inactive until credentials are configured.")
# ─── Platform / Toolset Helpers ───────────────────────────────────────────────