From a175f395776a83e54ac838ade06ad3b837051249 Mon Sep 17 00:00:00 2001 From: Teknium <127238744+teknium1@users.noreply.github.com> Date: Mon, 4 May 2026 04:54:55 -0700 Subject: [PATCH] feat(nous): persist Nous OAuth across profiles via shared token store (#19712) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Mirrors the Codex auto-import UX. On successful Nous login (either `hermes auth add nous --type oauth` or `hermes login nous`), tokens are mirrored to `$HERMES_SHARED_AUTH_DIR/nous_auth.json` (default `~/.hermes/shared/nous_auth.json`, outside any named profile's HERMES_HOME). On next login in a new profile, the flow offers to import those credentials ("Import these credentials? [Y/n]") and rehydrates via a forced refresh+mint instead of running the full device-code flow. Runtime refresh in any profile syncs the rotated refresh_token back to the shared store so sibling profiles don't hit stale-token fallback after rotation. The volatile 24h agent_key is NOT persisted to the shared store — only the long-lived OAuth tokens are cross-profile useful. - `HERMES_SHARED_AUTH_DIR` env var for tests + custom layouts - Pytest seat belt mirrors the existing `_auth_file_path` guard so forgetting to redirect the store in a test fails loudly - File mode 0600 where platform supports it - Runtime credential resolution is unchanged — shared store is only consulted during the login flow, so profile isolation at runtime is preserved - Stale refresh_token + portal-down cases gracefully fall back to device-code Addresses a user report from Mike Nguyen: running `hermes --profile auth add nous --type oauth` for every new profile is unnecessary friction now that Codex has a shared-import flow via `~/.codex/auth.json`. --- hermes_cli/auth.py | 270 ++++++++++++++++++- hermes_cli/auth_commands.py | 41 +++ tests/hermes_cli/test_auth_nous_provider.py | 283 ++++++++++++++++++++ 3 files changed, 583 insertions(+), 11 deletions(-) diff --git a/hermes_cli/auth.py b/hermes_cli/auth.py index 56e72d5eb0..5b63d41eb1 100644 --- a/hermes_cli/auth.py +++ b/hermes_cli/auth.py @@ -2589,6 +2589,208 @@ def _poll_for_token( # Nous Portal — token refresh, agent key minting, model discovery # ============================================================================= +# ----------------------------------------------------------------------------- +# Shared Nous token store — lets OAuth credentials persist across profiles +# so a new `hermes --profile auth add nous --type oauth` can one-tap +# import instead of running the full device-code flow every time. +# +# File lives at ${HERMES_SHARED_AUTH_DIR}/nous_auth.json, defaulting to +# ~/.hermes/shared/nous_auth.json. It is OUTSIDE any named profile's +# HERMES_HOME so named profiles (which typically live under +# ~/.hermes/profiles//) all see the same file. +# +# Written on successful login and on every runtime refresh so the stored +# refresh_token stays current even if one profile refreshes and rotates it. +# If ever the stored refresh_token does go stale server-side, import fails +# gracefully and the user falls back to the normal device-code flow. +# ----------------------------------------------------------------------------- + +NOUS_SHARED_STORE_FILENAME = "nous_auth.json" + + +def _nous_shared_auth_dir() -> Path: + """Resolve the directory that holds the shared Nous token store. + + Honors ``HERMES_SHARED_AUTH_DIR`` so tests can redirect it to a tmp + path without touching the real user's home. Defaults to + ``~/.hermes/shared/``. + """ + override = os.getenv("HERMES_SHARED_AUTH_DIR", "").strip() + if override: + return Path(override).expanduser() + return Path.home() / ".hermes" / "shared" + + +def _nous_shared_store_path() -> Path: + path = _nous_shared_auth_dir() / NOUS_SHARED_STORE_FILENAME + # Seat belt: if pytest is running and this resolves to a path under the + # real user's home, refuse rather than silently corrupt cross-profile + # state. Tests must set HERMES_SHARED_AUTH_DIR to a tmp_path (conftest + # does not do this automatically — mirror the _auth_file_path() guard + # so forgetting to set it fails loudly instead of writing to the real + # shared store). + if os.environ.get("PYTEST_CURRENT_TEST"): + real_home_shared = ( + Path.home() / ".hermes" / "shared" / NOUS_SHARED_STORE_FILENAME + ).resolve(strict=False) + try: + resolved = path.resolve(strict=False) + except Exception: + resolved = path + if resolved == real_home_shared: + raise RuntimeError( + f"Refusing to touch real user shared Nous auth store during test run: " + f"{path}. Set HERMES_SHARED_AUTH_DIR to a tmp_path in your test fixture." + ) + return path + + +def _write_shared_nous_state(state: Dict[str, Any]) -> None: + """Persist a minimal copy of the Nous OAuth state to the shared store. + + Best-effort: any failure is swallowed after logging. The shared store + is a convenience layer; the per-profile auth.json remains the source + of truth. + + We deliberately omit the short-lived ``agent_key`` (24h TTL, profile- + specific) — only the long-lived OAuth tokens are cross-profile useful. + """ + refresh_token = state.get("refresh_token") + access_token = state.get("access_token") + if not (isinstance(refresh_token, str) and refresh_token.strip()): + # No refresh_token = nothing worth sharing across profiles + return + if not (isinstance(access_token, str) and access_token.strip()): + return + + shared = { + "_schema": 1, + "access_token": access_token, + "refresh_token": refresh_token, + "token_type": state.get("token_type") or "Bearer", + "scope": state.get("scope") or DEFAULT_NOUS_SCOPE, + "client_id": state.get("client_id") or DEFAULT_NOUS_CLIENT_ID, + "portal_base_url": state.get("portal_base_url") or DEFAULT_NOUS_PORTAL_URL, + "inference_base_url": state.get("inference_base_url") or DEFAULT_NOUS_INFERENCE_URL, + "obtained_at": state.get("obtained_at"), + "expires_at": state.get("expires_at"), + "updated_at": datetime.now(timezone.utc).isoformat(), + } + try: + path = _nous_shared_store_path() + path.parent.mkdir(parents=True, exist_ok=True) + tmp = path.with_suffix(path.suffix + ".tmp") + tmp.write_text(json.dumps(shared, indent=2, sort_keys=True)) + try: + os.chmod(tmp, 0o600) + except OSError: + pass + os.replace(tmp, path) + _oauth_trace( + "nous_shared_store_written", + path=str(path), + refresh_token_fp=_token_fingerprint(refresh_token), + ) + except Exception as exc: + logger.debug("Failed to write shared Nous auth store: %s", exc) + + +def _read_shared_nous_state() -> Optional[Dict[str, Any]]: + """Return the shared Nous OAuth state if present and well-formed. + + Returns ``None`` when the file is missing, unreadable, malformed, or + lacks required fields. Callers should treat ``None`` as "no shared + credentials available — fall through to device-code". + """ + try: + path = _nous_shared_store_path() + except RuntimeError: + # Test seat belt tripped — treat as missing + return None + if not path.is_file(): + return None + try: + payload = json.loads(path.read_text()) + except (OSError, ValueError) as exc: + logger.debug("Shared Nous auth store at %s is unreadable: %s", path, exc) + return None + if not isinstance(payload, dict): + return None + refresh_token = payload.get("refresh_token") + access_token = payload.get("access_token") + if not (isinstance(refresh_token, str) and refresh_token.strip()): + return None + if not (isinstance(access_token, str) and access_token.strip()): + return None + return payload + + +def _try_import_shared_nous_state( + *, + timeout_seconds: float = 15.0, + min_key_ttl_seconds: int = 5 * 60, +) -> Optional[Dict[str, Any]]: + """Attempt to rehydrate Nous OAuth state from the shared store. + + Reads the shared file (if present), runs a forced refresh+mint using + the stored refresh_token to produce a fresh access_token + agent_key + scoped to this profile, and returns the full auth_state dict ready + for ``persist_nous_credentials()``. + + Returns ``None`` when no shared state is available or the rehydrate + fails for any reason (expired refresh_token, portal unreachable, + etc.) — caller should then fall through to the normal device-code + flow. + """ + shared = _read_shared_nous_state() + if not shared: + return None + + # Build a full state dict so refresh_nous_oauth_from_state has every + # field it needs. force_refresh=True gets us a fresh access_token + # for this profile; force_mint=True gets us a fresh agent_key. + state: Dict[str, Any] = { + "access_token": shared.get("access_token"), + "refresh_token": shared.get("refresh_token"), + "client_id": shared.get("client_id") or DEFAULT_NOUS_CLIENT_ID, + "portal_base_url": shared.get("portal_base_url") or DEFAULT_NOUS_PORTAL_URL, + "inference_base_url": shared.get("inference_base_url") or DEFAULT_NOUS_INFERENCE_URL, + "token_type": shared.get("token_type") or "Bearer", + "scope": shared.get("scope") or DEFAULT_NOUS_SCOPE, + "obtained_at": shared.get("obtained_at"), + "expires_at": shared.get("expires_at"), + "agent_key": None, + "agent_key_expires_at": None, + "tls": {"insecure": False, "ca_bundle": None}, + } + + try: + refreshed = refresh_nous_oauth_from_state( + state, + min_key_ttl_seconds=min_key_ttl_seconds, + timeout_seconds=timeout_seconds, + force_refresh=True, + force_mint=True, + ) + except AuthError as exc: + _oauth_trace( + "nous_shared_import_failed", + error_type=type(exc).__name__, + error_code=getattr(exc, "code", None), + ) + logger.debug("Shared Nous import failed: %s", exc) + return None + except Exception as exc: + _oauth_trace( + "nous_shared_import_failed", + error_type=type(exc).__name__, + ) + logger.debug("Shared Nous import failed: %s", exc) + return None + + return refreshed + + def _refresh_access_token( *, client: httpx.Client, @@ -2991,6 +3193,12 @@ def persist_nous_credentials( _save_provider_state(auth_store, "nous", state) _save_auth_store(auth_store) + # Mirror to the shared store so a new profile can one-tap import + # these credentials via `hermes auth add nous --type oauth`. Best- + # effort: any I/O failure is logged and swallowed (the per-profile + # auth.json is still the source of truth). + _write_shared_nous_state(state) + pool = load_pool("nous") return next( (e for e in pool.entries() if e.source == NOUS_DEVICE_CODE_SOURCE), @@ -3059,6 +3267,11 @@ def resolve_nous_runtime_credentials( refresh_token_fp=_token_fingerprint(state.get("refresh_token")), access_token_fp=_token_fingerprint(state.get("access_token")), ) + # Mirror post-refresh state to the shared store so sibling + # profiles don't hold stale refresh_tokens after rotation. + # Best-effort — any failure is logged and swallowed inside + # _write_shared_nous_state. + _write_shared_nous_state(state) verify = _resolve_verify(insecure=insecure, ca_bundle=ca_bundle, auth_state=state) timeout = httpx.Timeout(timeout_seconds if timeout_seconds else 15.0) @@ -4600,17 +4813,47 @@ def _login_nous(args, pconfig: ProviderConfig) -> None: ) try: - auth_state = _nous_device_code_login( - portal_base_url=getattr(args, "portal_url", None), - inference_base_url=getattr(args, "inference_url", None), - client_id=getattr(args, "client_id", None) or pconfig.client_id, - scope=getattr(args, "scope", None) or pconfig.scope, - open_browser=not getattr(args, "no_browser", False), - timeout_seconds=timeout_seconds, - insecure=insecure, - ca_bundle=ca_bundle, - min_key_ttl_seconds=5 * 60, - ) + auth_state = None + + # Codex-style auto-import: before launching a fresh device-code + # flow, check the shared store for an existing Nous credential + # from any other profile. If present, offer to rehydrate it. + shared = _read_shared_nous_state() + if shared: + try: + shared_path = _nous_shared_store_path() + except RuntimeError: + shared_path = None + print() + if shared_path: + print(f"Found existing Nous OAuth credentials at {shared_path}") + else: + print("Found existing shared Nous OAuth credentials") + try: + do_import = input("Import these credentials? [Y/n]: ").strip().lower() + except (EOFError, KeyboardInterrupt): + do_import = "y" + if do_import in ("", "y", "yes"): + print("Rehydrating Nous session from shared credentials...") + auth_state = _try_import_shared_nous_state( + timeout_seconds=timeout_seconds, + min_key_ttl_seconds=5 * 60, + ) + if auth_state is None: + print("Could not refresh shared credentials — falling back to device-code login.") + + if auth_state is None: + auth_state = _nous_device_code_login( + portal_base_url=getattr(args, "portal_url", None), + inference_base_url=getattr(args, "inference_url", None), + client_id=getattr(args, "client_id", None) or pconfig.client_id, + scope=getattr(args, "scope", None) or pconfig.scope, + open_browser=not getattr(args, "no_browser", False), + timeout_seconds=timeout_seconds, + insecure=insecure, + ca_bundle=ca_bundle, + min_key_ttl_seconds=5 * 60, + ) inference_base_url = auth_state["inference_base_url"] @@ -4627,6 +4870,11 @@ def _login_nous(args, pconfig: ProviderConfig) -> None: _save_provider_state(auth_store, "nous", auth_state) saved_to = _save_auth_store(auth_store) + # Mirror to the shared store so other profiles can one-tap import + # these credentials. Best-effort: any I/O failure is logged and + # swallowed inside the helper. + _write_shared_nous_state(auth_state) + print() print("Login successful!") print(f" Auth state: {saved_to}") diff --git a/hermes_cli/auth_commands.py b/hermes_cli/auth_commands.py index a9eb206647..a29776aea2 100644 --- a/hermes_cli/auth_commands.py +++ b/hermes_cli/auth_commands.py @@ -245,6 +245,47 @@ def auth_add_command(args) -> None: return if provider == "nous": + # Codex-style auto-import: if a shared Nous credential lives at + # ~/.hermes/shared/nous_auth.json (written by any previous + # successful login), offer to import it instead of running the + # full device-code flow. This makes `hermes --profile + # auth add nous --type oauth` a one-tap operation for users who + # run multiple profiles. + shared = auth_mod._read_shared_nous_state() + if shared: + try: + path = auth_mod._nous_shared_store_path() + except RuntimeError: + path = None + print() + if path: + print(f"Found existing Nous OAuth credentials at {path}") + else: + print("Found existing shared Nous OAuth credentials") + try: + do_import = input("Import these credentials? [Y/n]: ").strip().lower() + except (EOFError, KeyboardInterrupt): + do_import = "y" + if do_import in ("", "y", "yes"): + print("Rehydrating Nous session from shared credentials...") + rehydrated = auth_mod._try_import_shared_nous_state( + timeout_seconds=getattr(args, "timeout", None) or 15.0, + min_key_ttl_seconds=max( + 60, int(getattr(args, "min_key_ttl_seconds", 5 * 60)) + ), + ) + if rehydrated is not None: + custom_label = (getattr(args, "label", None) or "").strip() or None + entry = auth_mod.persist_nous_credentials(rehydrated, label=custom_label) + shown_label = entry.label if entry is not None else label_from_token( + rehydrated.get("access_token", ""), _oauth_default_label(provider, 1), + ) + print(f'Imported {provider} OAuth credentials: "{shown_label}"') + return + # Rehydrate failed (expired refresh_token, portal down, etc.) + # — fall through to device-code flow. + print("Could not refresh shared credentials — falling back to device-code login.") + creds = auth_mod._nous_device_code_login( portal_base_url=getattr(args, "portal_url", None), inference_base_url=getattr(args, "inference_url", None), diff --git a/tests/hermes_cli/test_auth_nous_provider.py b/tests/hermes_cli/test_auth_nous_provider.py index a8e337c1a0..d0e24aeaab 100644 --- a/tests/hermes_cli/test_auth_nous_provider.py +++ b/tests/hermes_cli/test_auth_nous_provider.py @@ -896,3 +896,286 @@ def test_refresh_non_reuse_error_keeps_original_description(): assert "Refresh session has been revoked" in str(exc_info.value) # Must not have been rewritten with the reuse message. assert "external process" not in str(exc_info.value).lower() + + +# ============================================================================= +# Shared Nous token store — cross-profile persistence (Codex-style auto-import) +# ============================================================================= + + +@pytest.fixture +def shared_store_env(tmp_path, monkeypatch): + """Redirect HERMES_SHARED_AUTH_DIR to a tmp_path. + + Required for every test that exercises the shared Nous store — the + in-auth.py seat belt refuses to touch the real user's shared store + under pytest, so tests that forget this fixture fail loudly instead + of corrupting real state. + """ + shared_dir = tmp_path / "shared" + monkeypatch.setenv("HERMES_SHARED_AUTH_DIR", str(shared_dir)) + return shared_dir + + +def test_shared_store_seat_belt_refuses_real_home_under_pytest(monkeypatch): + """Without HERMES_SHARED_AUTH_DIR override, the seat belt must trip. + + Mirrors the existing ``_auth_file_path`` seat belt: forgetting to + redirect this store in a test must fail loudly instead of silently + writing to the user's real ``~/.hermes/shared/`` across CI runs. + """ + from hermes_cli.auth import _nous_shared_store_path + + monkeypatch.delenv("HERMES_SHARED_AUTH_DIR", raising=False) + + with pytest.raises(RuntimeError, match="shared Nous auth store"): + _nous_shared_store_path() + + +def test_shared_store_honors_env_override(tmp_path, monkeypatch): + """HERMES_SHARED_AUTH_DIR must redirect the path.""" + from hermes_cli.auth import _nous_shared_store_path, NOUS_SHARED_STORE_FILENAME + + custom_dir = tmp_path / "custom_shared" + monkeypatch.setenv("HERMES_SHARED_AUTH_DIR", str(custom_dir)) + + path = _nous_shared_store_path() + assert path == custom_dir / NOUS_SHARED_STORE_FILENAME + + +def test_shared_store_read_missing_returns_none(shared_store_env): + """Missing file → ``_read_shared_nous_state()`` returns None.""" + from hermes_cli.auth import _read_shared_nous_state + + assert _read_shared_nous_state() is None + + +def test_shared_store_read_malformed_returns_none(shared_store_env): + """Unreadable / non-JSON file → None, not an exception.""" + from hermes_cli.auth import _nous_shared_store_path, _read_shared_nous_state + + path = _nous_shared_store_path() + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text("{ not json") + + assert _read_shared_nous_state() is None + + +def test_shared_store_read_missing_required_fields_returns_none(shared_store_env): + """Payload without refresh_token → None (nothing worth importing).""" + from hermes_cli.auth import _nous_shared_store_path, _read_shared_nous_state + + path = _nous_shared_store_path() + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(json.dumps({"_schema": 1, "access_token": "abc"})) + + assert _read_shared_nous_state() is None + + +def test_shared_store_write_and_read_roundtrip(shared_store_env): + """Write → read must preserve refresh_token + OAuth URLs.""" + from hermes_cli.auth import ( + _nous_shared_store_path, + _read_shared_nous_state, + _write_shared_nous_state, + ) + + _write_shared_nous_state(_full_state_fixture()) + + path = _nous_shared_store_path() + assert path.is_file() + + # Permissions should be 0600 where the platform supports it. + mode = path.stat().st_mode & 0o777 + assert mode == 0o600 or mode == 0o644 # 0o644 on platforms without chmod + + loaded = _read_shared_nous_state() + assert loaded is not None + assert loaded["refresh_token"] == "refresh-tok" + assert loaded["access_token"] == "access-tok" + assert loaded["portal_base_url"] == "https://portal.example.com" + assert loaded["inference_base_url"] == "https://inference.example.com/v1" + # Volatile agent_key MUST NOT be persisted to the shared store + # (24h TTL, profile-specific — only long-lived OAuth tokens are + # cross-profile useful). + assert "agent_key" not in loaded + + +def test_shared_store_write_skips_when_refresh_token_missing(shared_store_env): + """Write is a no-op when refresh_token is absent (nothing to share).""" + from hermes_cli.auth import _nous_shared_store_path, _write_shared_nous_state + + state = dict(_full_state_fixture()) + state["refresh_token"] = "" + + _write_shared_nous_state(state) + + assert not _nous_shared_store_path().is_file() + + +def test_persist_nous_credentials_mirrors_to_shared_store( + tmp_path, monkeypatch, shared_store_env, +): + """persist_nous_credentials must populate BOTH per-profile auth.json + AND the shared store, so a future profile's `hermes auth add nous + --type oauth` can one-tap import instead of redoing device-code. + """ + from hermes_cli.auth import ( + _nous_shared_store_path, + _read_shared_nous_state, + persist_nous_credentials, + ) + + hermes_home = tmp_path / "hermes" + hermes_home.mkdir(parents=True, exist_ok=True) + (hermes_home / "auth.json").write_text( + json.dumps({"version": 1, "providers": {}}) + ) + monkeypatch.setenv("HERMES_HOME", str(hermes_home)) + + persist_nous_credentials(_full_state_fixture()) + + # Per-profile auth.json populated + payload = json.loads((hermes_home / "auth.json").read_text()) + assert "nous" in payload.get("providers", {}) + + # Shared store populated with the same refresh_token + shared = _read_shared_nous_state() + assert shared is not None + assert shared["refresh_token"] == "refresh-tok" + + # Shared file path lives under the tmp override, NOT the real home + assert str(_nous_shared_store_path()).startswith(str(shared_store_env)) + + +def test_try_import_shared_returns_none_when_store_missing(shared_store_env): + """No shared store → no rehydrate (fall through to device-code).""" + from hermes_cli.auth import _try_import_shared_nous_state + + assert _try_import_shared_nous_state() is None + + +def test_try_import_shared_returns_none_on_refresh_failure( + shared_store_env, monkeypatch, +): + """If the portal rejects the stored refresh_token (revoked, expired, + portal down), _try_import_shared_nous_state must return None so the + login flow falls back to a fresh device-code run. + """ + from hermes_cli import auth as auth_mod + + # Seed the shared store + auth_mod._write_shared_nous_state(_full_state_fixture()) + + # Make refresh fail + def _boom(*_args, **_kwargs): + raise AuthError( + "Refresh session has been revoked", + provider="nous", + code="invalid_grant", + relogin_required=True, + ) + + monkeypatch.setattr(auth_mod, "refresh_nous_oauth_from_state", _boom) + + assert auth_mod._try_import_shared_nous_state() is None + + +def test_try_import_shared_rehydrates_on_success(shared_store_env, monkeypatch): + """Happy path: stored refresh_token is accepted, forced refresh+mint + returns a fresh access_token + agent_key, and the returned dict has + every field persist_nous_credentials() needs. + """ + from hermes_cli import auth as auth_mod + + auth_mod._write_shared_nous_state(_full_state_fixture()) + + def _fake_refresh(state, **kwargs): + # Simulate portal returning fresh tokens + a new agent_key + assert kwargs.get("force_refresh") is True + assert kwargs.get("force_mint") is True + return { + **state, + "access_token": "fresh-access-tok", + "refresh_token": "fresh-refresh-tok", # rotated + "agent_key": "new-agent-key", + "agent_key_expires_at": "2026-04-19T22:00:00+00:00", + } + + monkeypatch.setattr(auth_mod, "refresh_nous_oauth_from_state", _fake_refresh) + + result = auth_mod._try_import_shared_nous_state() + + assert result is not None + assert result["access_token"] == "fresh-access-tok" + assert result["refresh_token"] == "fresh-refresh-tok" + assert result["agent_key"] == "new-agent-key" + # Preserved from shared state + assert result["portal_base_url"] == "https://portal.example.com" + assert result["client_id"] == "hermes-cli" + + +def test_shared_store_survives_across_profile_switch( + tmp_path, monkeypatch, shared_store_env, +): + """End-to-end: profile A logs in → shared store populated → profile B + (different HERMES_HOME) sees the same shared state and can rehydrate + without re-running device-code. + """ + from hermes_cli import auth as auth_mod + + # Profile A: login, which mirrors to shared store + profile_a = tmp_path / "profile_a" + profile_a.mkdir(parents=True, exist_ok=True) + (profile_a / "auth.json").write_text( + json.dumps({"version": 1, "providers": {}}) + ) + monkeypatch.setenv("HERMES_HOME", str(profile_a)) + auth_mod.persist_nous_credentials(_full_state_fixture()) + + # Profile A's auth.json has nous + a_payload = json.loads((profile_a / "auth.json").read_text()) + assert "nous" in a_payload.get("providers", {}) + + # Profile B: fresh HERMES_HOME, no auth yet, but the shared store + # persists — _read_shared_nous_state() must still return the tokens. + profile_b = tmp_path / "profile_b" + profile_b.mkdir(parents=True, exist_ok=True) + (profile_b / "auth.json").write_text( + json.dumps({"version": 1, "providers": {}}) + ) + monkeypatch.setenv("HERMES_HOME", str(profile_b)) + + # B's own auth.json has no nous + b_payload = json.loads((profile_b / "auth.json").read_text()) + assert "nous" not in b_payload.get("providers", {}) + + # But the shared store is visible + shared = auth_mod._read_shared_nous_state() + assert shared is not None + assert shared["refresh_token"] == "refresh-tok" + + # And a successful rehydrate + persist lands nous into profile B + def _fake_refresh(state, **kwargs): + return { + **state, + "access_token": "b-access-tok", + "refresh_token": "b-refresh-tok", + "agent_key": "b-agent-key", + "agent_key_expires_at": "2026-04-19T22:00:00+00:00", + } + + monkeypatch.setattr(auth_mod, "refresh_nous_oauth_from_state", _fake_refresh) + result = auth_mod._try_import_shared_nous_state() + assert result is not None + + auth_mod.persist_nous_credentials(result) + + b_payload = json.loads((profile_b / "auth.json").read_text()) + assert "nous" in b_payload.get("providers", {}) + assert b_payload["providers"]["nous"]["refresh_token"] == "b-refresh-tok" + + # Shared store was updated with the rotated refresh_token too + shared_after = auth_mod._read_shared_nous_state() + assert shared_after is not None + assert shared_after["refresh_token"] == "b-refresh-tok"