fix(auth): write rotated xAI OAuth tokens back to global root (#43589)

The salvaged read-side fix lets a profile resolve the xAI OAuth grant from
the global-root auth store when it has no own providers.xai-oauth block.
But _save_xai_oauth_tokens still wrote rotated tokens only to the active
profile store. Because xAI rotates the refresh_token on every refresh, a
profile that reads root's grant and refreshes it left root holding a now-
revoked refresh token — killing every other profile reading the stale root
grant with invalid_grant once its access token expired (#43589).

Detect the read-from-root case (profile lacks its own providers.xai-oauth
block) and, after the profile save, write the rotated chain back to the
global root too via a best-effort, TOCTOU-safe write-through that reuses
_save_auth_store with an explicit target path. A profile that genuinely
shadows root (has its own block) is left untouched, classic mode is a
no-op, and a failed root write never breaks the profile's own save.

Pairs with the read fallback in the preceding commit so the cross-profile
xAI grant stays coherent in both directions.
This commit is contained in:
kshitijk4poor 2026-06-15 17:08:19 +05:30
parent f1d6f04362
commit 497352bc4e
2 changed files with 239 additions and 2 deletions

View file

@ -1079,8 +1079,13 @@ def _load_auth_store(auth_file: Optional[Path] = None) -> Dict[str, Any]:
return {"version": AUTH_STORE_VERSION, "providers": {}}
def _save_auth_store(auth_store: Dict[str, Any]) -> Path:
auth_file = _auth_file_path()
def _save_auth_store(auth_store: Dict[str, Any], target_path: Optional[Path] = None) -> Path:
# target_path=None preserves the existing contract (write the active
# store at _auth_file_path()). An explicit path lets callers persist a
# specific store — e.g. the global-root write-through for rotating xAI
# OAuth grants (#43589) — reusing this function's atomic O_EXCL + 0o600
# write so the root auth.json gets the same TOCTOU-safe treatment.
auth_file = target_path if target_path is not None else _auth_file_path()
auth_file.parent.mkdir(parents=True, exist_ok=True)
# Tighten parent dir to 0o700 so siblings can't traverse to creds.
# No-op on Windows (POSIX mode bits not enforced); ignore failures.
@ -3983,6 +3988,62 @@ def _read_xai_oauth_tokens(*, _lock: bool = True) -> Dict[str, Any]:
}
def _profile_has_own_xai_oauth_state(auth_store: Dict[str, Any]) -> bool:
"""True when this store has its OWN ``providers.xai-oauth`` block.
Distinguishes a profile that genuinely shadows the root xAI grant from
one that only *reads* root via ``_load_provider_state``'s fallback. Only
the latter needs the refresh write-through below.
"""
providers = auth_store.get("providers")
return isinstance(providers, dict) and isinstance(providers.get("xai-oauth"), dict)
def _write_through_xai_oauth_to_global_root(state: Dict[str, Any]) -> None:
"""Persist a rotated xAI OAuth ``state`` into the global-root auth.json.
Best-effort write-through for the multi-profile rotation hazard (#43589):
xAI rotates the refresh_token on every refresh, so when a profile session
refreshes a grant it resolved from the root fallback, the rotated chain
must land back in root. Otherwise root keeps a now-revoked refresh token
and every other profile reading the stale root grant dies with
``invalid_grant`` once its access token expires.
Only updates ``providers.xai-oauth`` in the root store; never touches the
profile store (the caller already saved that). Swallows all errors a
failed write-through degrades to the pre-existing behavior (root stale),
it must never break the profile's own successful save.
"""
global_path = _global_auth_file_path()
if global_path is None:
# Classic mode (profile == root); the profile save already hit root.
return
# Seat belt: under pytest, refuse to write the real user's
# ~/.hermes/auth.json even when HERMES_HOME points at a profile path
# (mirrors the read-side guard in _load_global_auth_store). Uses the
# unmodified HOME env, not Path.home() which fixtures may monkeypatch.
if os.environ.get("PYTEST_CURRENT_TEST"):
real_home_env = os.environ.get("HOME", "")
if real_home_env:
real_root = Path(real_home_env) / ".hermes" / "auth.json"
try:
if global_path.resolve(strict=False) == real_root.resolve(strict=False):
return
except Exception:
return
try:
if global_path.exists():
global_store = _load_auth_store(global_path)
else:
global_store = {}
if not isinstance(global_store, dict):
return
_store_provider_state(global_store, "xai-oauth", dict(state), set_active=False)
_save_auth_store(global_store, global_path)
except Exception as exc: # pragma: no cover - best effort
logger.debug("xAI OAuth: write-through to global root failed: %s", exc)
def _save_xai_oauth_tokens(
tokens: Dict[str, Any],
*,
@ -3994,6 +4055,11 @@ def _save_xai_oauth_tokens(
last_refresh = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
with _auth_store_lock():
auth_store = _load_auth_store()
# A profile that lacks its own xai-oauth block is reading the root
# grant through _load_provider_state's fallback. When such a profile
# refreshes the (rotating) grant, we must write the rotated chain back
# to root too, or root is left holding a revoked refresh token (#43589).
write_through_to_root = not _profile_has_own_xai_oauth_state(auth_store)
state = _load_provider_state(auth_store, "xai-oauth") or {}
state["tokens"] = tokens
state["last_refresh"] = last_refresh
@ -4004,6 +4070,8 @@ def _save_xai_oauth_tokens(
state["redirect_uri"] = redirect_uri
_save_provider_state(auth_store, "xai-oauth", state)
_save_auth_store(auth_store)
if write_through_to_root:
_write_through_xai_oauth_to_global_root(state)
def _xai_access_token_is_expiring(access_token: str, skew_seconds: int = 0) -> bool:

View file

@ -0,0 +1,169 @@
"""Regression tests for xAI OAuth refresh write-through to the global root.
Companion to ``test_xai_oauth_profile_auth.py``. That file covers the READ
fallback (profile -> credential pool -> global root). These cover the WRITE
side: when a profile that has no own ``providers.xai-oauth`` block refreshes
the (rotating) grant it resolved from the root fallback, the rotated tokens
must be written back to the global root too. Otherwise root keeps a revoked
refresh token and every other profile reading root's stale grant dies with
``invalid_grant`` once its access token expires (issue #43589).
The tests drive the real ``_save_xai_oauth_tokens`` against real on-disk auth
stores (profile + root under ``tmp_path``) rather than mocking the save
boundary, so they exercise the actual atomic write path.
"""
import json
import pytest
from hermes_cli import auth
def _write_store(path, store):
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(store), encoding="utf-8")
def _read_store(path):
return json.loads(path.read_text(encoding="utf-8"))
@pytest.fixture
def profile_and_root(tmp_path, monkeypatch):
"""Wire a profile auth store + a distinct global-root auth store on disk.
Returns (profile_path, root_path). The pytest seat belt in
``_write_through_xai_oauth_to_global_root`` only refuses the *real* user's
``$HOME/.hermes/auth.json``; a tmp_path root is allowed, so we point HOME
away from the tmp root to keep the guard from tripping on these fixtures.
"""
profile_path = tmp_path / "profiles" / "work" / "auth.json"
root_path = tmp_path / "root" / "auth.json"
monkeypatch.setattr(auth, "_auth_file_path", lambda: profile_path)
monkeypatch.setattr(auth, "_global_auth_file_path", lambda: root_path)
# Keep the pytest write seat belt from matching our tmp root.
monkeypatch.setenv("HOME", str(tmp_path / "not-the-root"))
return profile_path, root_path
def test_refresh_writes_through_to_root_when_profile_has_no_own_state(profile_and_root):
"""Profile reading root's grant must push rotated tokens back to root."""
profile_path, root_path = profile_and_root
# Profile has NO own xai-oauth block (reads root via fallback).
_write_store(profile_path, {"version": 1, "providers": {}})
_write_store(
root_path,
{
"version": 1,
"providers": {
"xai-oauth": {
"tokens": {
"access_token": "old-access",
"refresh_token": "old-refresh",
}
}
},
},
)
rotated = {
"access_token": "new-access",
"refresh_token": "new-refresh",
"token_type": "Bearer",
}
auth._save_xai_oauth_tokens(rotated)
# Profile got the rotated chain (existing behavior).
profile = _read_store(profile_path)
assert profile["providers"]["xai-oauth"]["tokens"]["refresh_token"] == "new-refresh"
# AND the global root no longer holds the revoked refresh token (#43589).
root = _read_store(root_path)
assert root["providers"]["xai-oauth"]["tokens"]["access_token"] == "new-access"
assert root["providers"]["xai-oauth"]["tokens"]["refresh_token"] == "new-refresh"
def test_refresh_does_not_touch_root_when_profile_has_own_state(profile_and_root):
"""A profile that genuinely shadows root must NOT clobber the root grant."""
profile_path, root_path = profile_and_root
# Profile has its OWN xai-oauth block: it shadows root legitimately.
_write_store(
profile_path,
{
"version": 1,
"providers": {
"xai-oauth": {
"tokens": {
"access_token": "profile-old",
"refresh_token": "profile-old-refresh",
}
}
},
},
)
_write_store(
root_path,
{
"version": 1,
"providers": {
"xai-oauth": {
"tokens": {
"access_token": "root-untouched",
"refresh_token": "root-untouched-refresh",
}
}
},
},
)
auth._save_xai_oauth_tokens(
{"access_token": "profile-new", "refresh_token": "profile-new-refresh"}
)
profile = _read_store(profile_path)
assert profile["providers"]["xai-oauth"]["tokens"]["refresh_token"] == "profile-new-refresh"
# Root is a separate grant chain — must be left exactly as-is.
root = _read_store(root_path)
assert root["providers"]["xai-oauth"]["tokens"]["access_token"] == "root-untouched"
assert root["providers"]["xai-oauth"]["tokens"]["refresh_token"] == "root-untouched-refresh"
def test_write_through_is_noop_in_classic_mode(tmp_path, monkeypatch):
"""Classic mode (profile == root) already saves to root; no double write."""
profile_path = tmp_path / "auth.json"
monkeypatch.setattr(auth, "_auth_file_path", lambda: profile_path)
# Classic mode: _global_auth_file_path returns None.
monkeypatch.setattr(auth, "_global_auth_file_path", lambda: None)
_write_store(profile_path, {"version": 1, "providers": {}})
# Should not raise and should persist to the single store.
auth._save_xai_oauth_tokens(
{"access_token": "a", "refresh_token": "r"}
)
store = _read_store(profile_path)
assert store["providers"]["xai-oauth"]["tokens"]["refresh_token"] == "r"
def test_write_through_failure_does_not_break_profile_save(profile_and_root, monkeypatch):
"""A failed root write-through must not break the profile's own save."""
profile_path, root_path = profile_and_root
_write_store(profile_path, {"version": 1, "providers": {}})
_write_store(root_path, {"version": 1, "providers": {}})
# Make the root write blow up; the profile save must still succeed.
real_save = auth._save_auth_store
def _exploding_save(store, target_path=None):
if target_path is not None and target_path == root_path:
raise OSError("simulated root write failure")
return real_save(store, target_path)
monkeypatch.setattr(auth, "_save_auth_store", _exploding_save)
auth._save_xai_oauth_tokens({"access_token": "a", "refresh_token": "r"})
profile = _read_store(profile_path)
assert profile["providers"]["xai-oauth"]["tokens"]["refresh_token"] == "r"