fix(auth): resolve xAI OAuth credentials across profiles

This commit is contained in:
Andrew Walker 2026-06-03 14:27:50 -05:00
parent a72bb03757
commit 8d8b9f50e4
2 changed files with 131 additions and 1 deletions

View file

@ -3824,13 +3824,64 @@ def _pool_codex_access_token() -> str:
# xAI Grok OAuth — tokens stored in ~/.hermes/auth.json
# =============================================================================
def _xai_oauth_state_from_store(auth_store: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""Return usable xAI OAuth state from provider state or credential pool."""
state = _load_provider_state(auth_store, "xai-oauth")
tokens = state.get("tokens") if isinstance(state, dict) else None
if isinstance(tokens, dict):
access_token = str(tokens.get("access_token", "") or "").strip()
refresh_token = str(tokens.get("refresh_token", "") or "").strip()
if access_token and refresh_token:
return state
credential_pool = auth_store.get("credential_pool")
entries = (
credential_pool.get("xai-oauth")
if isinstance(credential_pool, dict)
else None
)
if isinstance(entries, list):
for entry in entries:
if not isinstance(entry, dict):
continue
access_token = str(entry.get("access_token", "") or "").strip()
refresh_token = str(entry.get("refresh_token", "") or "").strip()
if not access_token or not refresh_token:
continue
merged = dict(state or {})
merged["tokens"] = {
"access_token": access_token,
"refresh_token": refresh_token,
"token_type": str(entry.get("token_type") or "Bearer"),
}
if entry.get("last_refresh"):
merged["last_refresh"] = entry.get("last_refresh")
merged.setdefault("auth_mode", "oauth_pkce")
return merged
return state if isinstance(state, dict) else None
def _xai_oauth_state_has_usable_tokens(state: Optional[Dict[str, Any]]) -> bool:
tokens = state.get("tokens") if isinstance(state, dict) else None
return (
isinstance(tokens, dict)
and bool(str(tokens.get("access_token", "") or "").strip())
and bool(str(tokens.get("refresh_token", "") or "").strip())
)
def _read_xai_oauth_tokens(*, _lock: bool = True) -> Dict[str, Any]:
if _lock:
with _auth_store_lock():
auth_store = _load_auth_store()
else:
auth_store = _load_auth_store()
state = _load_provider_state(auth_store, "xai-oauth")
state = _xai_oauth_state_from_store(auth_store)
if not _xai_oauth_state_has_usable_tokens(state):
global_state = _xai_oauth_state_from_store(_load_global_auth_store())
if _xai_oauth_state_has_usable_tokens(global_state):
state = global_state
if not state:
raise AuthError(
"No xAI OAuth credentials stored. Select xAI Grok OAuth (SuperGrok / Premium+) in `hermes model`.",

View file

@ -0,0 +1,79 @@
"""Regression tests for xAI OAuth auth resolution in profile/cron contexts."""
import pytest
from hermes_cli import auth
from hermes_cli.auth import AuthError
def test_read_xai_oauth_tokens_uses_credential_pool_when_provider_tokens_empty(monkeypatch):
"""Profile auth can have fresh pool tokens while singleton provider state is empty.
This mirrors profiled cron after re-auth/credential-pool sync: the xAI
OAuth credential is usable, but `providers.xai-oauth.tokens` may be empty
or stale. Treating that as missing auth makes cron keep failing after the
user has successfully re-authenticated.
"""
store = {
"providers": {"xai-oauth": {"tokens": {}, "last_auth_error": {}}},
"credential_pool": {
"xai-oauth": [
{
"access_token": "pool-access",
"refresh_token": "pool-refresh",
"token_type": "Bearer",
"last_refresh": "2026-06-03T19:00:00Z",
}
]
},
}
monkeypatch.setattr(auth, "_load_auth_store", lambda: store)
monkeypatch.setattr(auth, "_load_global_auth_store", lambda: {})
resolved = auth._read_xai_oauth_tokens(_lock=False)
assert resolved["tokens"]["access_token"] == "pool-access"
assert resolved["tokens"]["refresh_token"] == "pool-refresh"
assert resolved["tokens"]["token_type"] == "Bearer"
assert resolved["last_refresh"] == "2026-06-03T19:00:00Z"
def test_read_xai_oauth_tokens_uses_global_store_when_profile_state_empty(monkeypatch):
"""A profile/cron process should see root xAI auth after user re-auths there."""
profile_store = {"providers": {"xai-oauth": {"tokens": {}}}}
global_store = {
"providers": {
"xai-oauth": {
"tokens": {
"access_token": "global-access",
"refresh_token": "global-refresh",
"token_type": "Bearer",
},
"last_refresh": "2026-06-03T19:05:00Z",
}
}
}
monkeypatch.setattr(auth, "_load_auth_store", lambda: profile_store)
monkeypatch.setattr(auth, "_load_global_auth_store", lambda: global_store)
resolved = auth._read_xai_oauth_tokens(_lock=False)
assert resolved["tokens"]["access_token"] == "global-access"
assert resolved["tokens"]["refresh_token"] == "global-refresh"
assert resolved["last_refresh"] == "2026-06-03T19:05:00Z"
def test_read_xai_oauth_tokens_still_requires_usable_tokens(monkeypatch):
"""Fallback should not hide genuinely broken xAI auth state."""
store = {
"providers": {"xai-oauth": {"tokens": {}}},
"credential_pool": {"xai-oauth": [{"access_token": "", "refresh_token": ""}]},
}
monkeypatch.setattr(auth, "_load_auth_store", lambda: store)
monkeypatch.setattr(auth, "_load_global_auth_store", lambda: {})
with pytest.raises(AuthError) as exc:
auth._read_xai_oauth_tokens(_lock=False)
assert exc.value.code == "xai_auth_missing_access_token"
assert exc.value.relogin_required is True