From 497352bc4e53f824900ae76219d1e5c315b1a15f Mon Sep 17 00:00:00 2001 From: kshitijk4poor <82637225+kshitijk4poor@users.noreply.github.com> Date: Mon, 15 Jun 2026 17:08:19 +0530 Subject: [PATCH] fix(auth): write rotated xAI OAuth tokens back to global root (#43589) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The salvaged read-side fix lets a profile resolve the xAI OAuth grant from the global-root auth store when it has no own providers.xai-oauth block. But _save_xai_oauth_tokens still wrote rotated tokens only to the active profile store. Because xAI rotates the refresh_token on every refresh, a profile that reads root's grant and refreshes it left root holding a now- revoked refresh token — killing every other profile reading the stale root grant with invalid_grant once its access token expired (#43589). Detect the read-from-root case (profile lacks its own providers.xai-oauth block) and, after the profile save, write the rotated chain back to the global root too via a best-effort, TOCTOU-safe write-through that reuses _save_auth_store with an explicit target path. A profile that genuinely shadows root (has its own block) is left untouched, classic mode is a no-op, and a failed root write never breaks the profile's own save. Pairs with the read fallback in the preceding commit so the cross-profile xAI grant stays coherent in both directions. --- hermes_cli/auth.py | 72 +++++++- .../hermes_cli/test_xai_oauth_writethrough.py | 169 ++++++++++++++++++ 2 files changed, 239 insertions(+), 2 deletions(-) create mode 100644 tests/hermes_cli/test_xai_oauth_writethrough.py diff --git a/hermes_cli/auth.py b/hermes_cli/auth.py index e3be09eee0d..0950d225f72 100644 --- a/hermes_cli/auth.py +++ b/hermes_cli/auth.py @@ -1079,8 +1079,13 @@ def _load_auth_store(auth_file: Optional[Path] = None) -> Dict[str, Any]: return {"version": AUTH_STORE_VERSION, "providers": {}} -def _save_auth_store(auth_store: Dict[str, Any]) -> Path: - auth_file = _auth_file_path() +def _save_auth_store(auth_store: Dict[str, Any], target_path: Optional[Path] = None) -> Path: + # target_path=None preserves the existing contract (write the active + # store at _auth_file_path()). An explicit path lets callers persist a + # specific store — e.g. the global-root write-through for rotating xAI + # OAuth grants (#43589) — reusing this function's atomic O_EXCL + 0o600 + # write so the root auth.json gets the same TOCTOU-safe treatment. + auth_file = target_path if target_path is not None else _auth_file_path() auth_file.parent.mkdir(parents=True, exist_ok=True) # Tighten parent dir to 0o700 so siblings can't traverse to creds. # No-op on Windows (POSIX mode bits not enforced); ignore failures. @@ -3983,6 +3988,62 @@ def _read_xai_oauth_tokens(*, _lock: bool = True) -> Dict[str, Any]: } +def _profile_has_own_xai_oauth_state(auth_store: Dict[str, Any]) -> bool: + """True when this store has its OWN ``providers.xai-oauth`` block. + + Distinguishes a profile that genuinely shadows the root xAI grant from + one that only *reads* root via ``_load_provider_state``'s fallback. Only + the latter needs the refresh write-through below. + """ + providers = auth_store.get("providers") + return isinstance(providers, dict) and isinstance(providers.get("xai-oauth"), dict) + + +def _write_through_xai_oauth_to_global_root(state: Dict[str, Any]) -> None: + """Persist a rotated xAI OAuth ``state`` into the global-root auth.json. + + Best-effort write-through for the multi-profile rotation hazard (#43589): + xAI rotates the refresh_token on every refresh, so when a profile session + refreshes a grant it resolved from the root fallback, the rotated chain + must land back in root. Otherwise root keeps a now-revoked refresh token + and every other profile reading the stale root grant dies with + ``invalid_grant`` once its access token expires. + + Only updates ``providers.xai-oauth`` in the root store; never touches the + profile store (the caller already saved that). Swallows all errors — a + failed write-through degrades to the pre-existing behavior (root stale), + it must never break the profile's own successful save. + """ + global_path = _global_auth_file_path() + if global_path is None: + # Classic mode (profile == root); the profile save already hit root. + return + # Seat belt: under pytest, refuse to write the real user's + # ~/.hermes/auth.json even when HERMES_HOME points at a profile path + # (mirrors the read-side guard in _load_global_auth_store). Uses the + # unmodified HOME env, not Path.home() which fixtures may monkeypatch. + if os.environ.get("PYTEST_CURRENT_TEST"): + real_home_env = os.environ.get("HOME", "") + if real_home_env: + real_root = Path(real_home_env) / ".hermes" / "auth.json" + try: + if global_path.resolve(strict=False) == real_root.resolve(strict=False): + return + except Exception: + return + try: + if global_path.exists(): + global_store = _load_auth_store(global_path) + else: + global_store = {} + if not isinstance(global_store, dict): + return + _store_provider_state(global_store, "xai-oauth", dict(state), set_active=False) + _save_auth_store(global_store, global_path) + except Exception as exc: # pragma: no cover - best effort + logger.debug("xAI OAuth: write-through to global root failed: %s", exc) + + def _save_xai_oauth_tokens( tokens: Dict[str, Any], *, @@ -3994,6 +4055,11 @@ def _save_xai_oauth_tokens( last_refresh = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") with _auth_store_lock(): auth_store = _load_auth_store() + # A profile that lacks its own xai-oauth block is reading the root + # grant through _load_provider_state's fallback. When such a profile + # refreshes the (rotating) grant, we must write the rotated chain back + # to root too, or root is left holding a revoked refresh token (#43589). + write_through_to_root = not _profile_has_own_xai_oauth_state(auth_store) state = _load_provider_state(auth_store, "xai-oauth") or {} state["tokens"] = tokens state["last_refresh"] = last_refresh @@ -4004,6 +4070,8 @@ def _save_xai_oauth_tokens( state["redirect_uri"] = redirect_uri _save_provider_state(auth_store, "xai-oauth", state) _save_auth_store(auth_store) + if write_through_to_root: + _write_through_xai_oauth_to_global_root(state) def _xai_access_token_is_expiring(access_token: str, skew_seconds: int = 0) -> bool: diff --git a/tests/hermes_cli/test_xai_oauth_writethrough.py b/tests/hermes_cli/test_xai_oauth_writethrough.py new file mode 100644 index 00000000000..d706c76d099 --- /dev/null +++ b/tests/hermes_cli/test_xai_oauth_writethrough.py @@ -0,0 +1,169 @@ +"""Regression tests for xAI OAuth refresh write-through to the global root. + +Companion to ``test_xai_oauth_profile_auth.py``. That file covers the READ +fallback (profile -> credential pool -> global root). These cover the WRITE +side: when a profile that has no own ``providers.xai-oauth`` block refreshes +the (rotating) grant it resolved from the root fallback, the rotated tokens +must be written back to the global root too. Otherwise root keeps a revoked +refresh token and every other profile reading root's stale grant dies with +``invalid_grant`` once its access token expires (issue #43589). + +The tests drive the real ``_save_xai_oauth_tokens`` against real on-disk auth +stores (profile + root under ``tmp_path``) rather than mocking the save +boundary, so they exercise the actual atomic write path. +""" + +import json + +import pytest + +from hermes_cli import auth + + +def _write_store(path, store): + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(json.dumps(store), encoding="utf-8") + + +def _read_store(path): + return json.loads(path.read_text(encoding="utf-8")) + + +@pytest.fixture +def profile_and_root(tmp_path, monkeypatch): + """Wire a profile auth store + a distinct global-root auth store on disk. + + Returns (profile_path, root_path). The pytest seat belt in + ``_write_through_xai_oauth_to_global_root`` only refuses the *real* user's + ``$HOME/.hermes/auth.json``; a tmp_path root is allowed, so we point HOME + away from the tmp root to keep the guard from tripping on these fixtures. + """ + profile_path = tmp_path / "profiles" / "work" / "auth.json" + root_path = tmp_path / "root" / "auth.json" + + monkeypatch.setattr(auth, "_auth_file_path", lambda: profile_path) + monkeypatch.setattr(auth, "_global_auth_file_path", lambda: root_path) + # Keep the pytest write seat belt from matching our tmp root. + monkeypatch.setenv("HOME", str(tmp_path / "not-the-root")) + return profile_path, root_path + + +def test_refresh_writes_through_to_root_when_profile_has_no_own_state(profile_and_root): + """Profile reading root's grant must push rotated tokens back to root.""" + profile_path, root_path = profile_and_root + # Profile has NO own xai-oauth block (reads root via fallback). + _write_store(profile_path, {"version": 1, "providers": {}}) + _write_store( + root_path, + { + "version": 1, + "providers": { + "xai-oauth": { + "tokens": { + "access_token": "old-access", + "refresh_token": "old-refresh", + } + } + }, + }, + ) + + rotated = { + "access_token": "new-access", + "refresh_token": "new-refresh", + "token_type": "Bearer", + } + auth._save_xai_oauth_tokens(rotated) + + # Profile got the rotated chain (existing behavior). + profile = _read_store(profile_path) + assert profile["providers"]["xai-oauth"]["tokens"]["refresh_token"] == "new-refresh" + + # AND the global root no longer holds the revoked refresh token (#43589). + root = _read_store(root_path) + assert root["providers"]["xai-oauth"]["tokens"]["access_token"] == "new-access" + assert root["providers"]["xai-oauth"]["tokens"]["refresh_token"] == "new-refresh" + + +def test_refresh_does_not_touch_root_when_profile_has_own_state(profile_and_root): + """A profile that genuinely shadows root must NOT clobber the root grant.""" + profile_path, root_path = profile_and_root + # Profile has its OWN xai-oauth block: it shadows root legitimately. + _write_store( + profile_path, + { + "version": 1, + "providers": { + "xai-oauth": { + "tokens": { + "access_token": "profile-old", + "refresh_token": "profile-old-refresh", + } + } + }, + }, + ) + _write_store( + root_path, + { + "version": 1, + "providers": { + "xai-oauth": { + "tokens": { + "access_token": "root-untouched", + "refresh_token": "root-untouched-refresh", + } + } + }, + }, + ) + + auth._save_xai_oauth_tokens( + {"access_token": "profile-new", "refresh_token": "profile-new-refresh"} + ) + + profile = _read_store(profile_path) + assert profile["providers"]["xai-oauth"]["tokens"]["refresh_token"] == "profile-new-refresh" + + # Root is a separate grant chain — must be left exactly as-is. + root = _read_store(root_path) + assert root["providers"]["xai-oauth"]["tokens"]["access_token"] == "root-untouched" + assert root["providers"]["xai-oauth"]["tokens"]["refresh_token"] == "root-untouched-refresh" + + +def test_write_through_is_noop_in_classic_mode(tmp_path, monkeypatch): + """Classic mode (profile == root) already saves to root; no double write.""" + profile_path = tmp_path / "auth.json" + monkeypatch.setattr(auth, "_auth_file_path", lambda: profile_path) + # Classic mode: _global_auth_file_path returns None. + monkeypatch.setattr(auth, "_global_auth_file_path", lambda: None) + _write_store(profile_path, {"version": 1, "providers": {}}) + + # Should not raise and should persist to the single store. + auth._save_xai_oauth_tokens( + {"access_token": "a", "refresh_token": "r"} + ) + store = _read_store(profile_path) + assert store["providers"]["xai-oauth"]["tokens"]["refresh_token"] == "r" + + +def test_write_through_failure_does_not_break_profile_save(profile_and_root, monkeypatch): + """A failed root write-through must not break the profile's own save.""" + profile_path, root_path = profile_and_root + _write_store(profile_path, {"version": 1, "providers": {}}) + _write_store(root_path, {"version": 1, "providers": {}}) + + # Make the root write blow up; the profile save must still succeed. + real_save = auth._save_auth_store + + def _exploding_save(store, target_path=None): + if target_path is not None and target_path == root_path: + raise OSError("simulated root write failure") + return real_save(store, target_path) + + monkeypatch.setattr(auth, "_save_auth_store", _exploding_save) + + auth._save_xai_oauth_tokens({"access_token": "a", "refresh_token": "r"}) + + profile = _read_store(profile_path) + assert profile["providers"]["xai-oauth"]["tokens"]["refresh_token"] == "r"