mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-18 04:41:56 +00:00
Adds a new authentication provider that lets SuperGrok subscribers sign in to Hermes with their xAI account via the standard OAuth 2.0 PKCE loopback flow, instead of pasting a raw API key from console.x.ai. Highlights ---------- * OAuth 2.0 PKCE loopback login against accounts.x.ai with discovery, state/nonce, and a strict CORS-origin allowlist on the callback. * Authorize URL carries `plan=generic` (required for non-allowlisted loopback clients) and `referrer=hermes-agent` for best-effort attribution in xAI's OAuth server logs. * Token storage in `auth.json` with file-locked atomic writes; JWT `exp`-based expiry detection with skew; refresh-token rotation synced both ways between the singleton store and the credential pool so multi-process / multi-profile setups don't tear each other's refresh tokens. * Reactive 401 retry: on a 401 from the xAI Responses API, the agent refreshes the token, swaps it back into `self.api_key`, and retries the call once. Guarded against silent account swaps when the active key was sourced from a different (manual) pool entry. * Auxiliary tasks (curator, vision, embeddings, etc.) route through a dedicated xAI Responses-mode auxiliary client instead of falling back to OpenRouter billing. * Direct HTTP tools (`tools/xai_http.py`, transcription, TTS, image-gen plugin) resolve credentials through a unified runtime → singleton → env-var fallback chain so xai-oauth users get them for free. * `hermes auth add xai-oauth` and `hermes auth remove xai-oauth N` are wired through the standard auth-commands surface; remove cleans up the singleton loopback_pkce entry so it doesn't silently reinstate. * `hermes model` provider picker shows "xAI Grok OAuth (SuperGrok Subscription)" and the model-flow falls back to pool credentials when the singleton is missing. Hardening --------- * Discovery and refresh responses validate the returned `token_endpoint` host against the same `*.x.ai` allowlist as the authorization endpoint, blocking MITM persistence of a hostile endpoint. * Discovery / refresh / token-exchange `response.json()` calls are wrapped to raise typed `AuthError` on malformed bodies (captive portals, proxy error pages) instead of leaking JSONDecodeError tracebacks. * `prompt_cache_key` is routed through `extra_body` on the codex transport (sending it as a top-level kwarg trips xAI's SDK with a TypeError). * Credential-pool sync-back preserves `active_provider` so refreshing an OAuth entry doesn't silently flip the active provider out from under the running agent. Testing ------- * New `tests/hermes_cli/test_auth_xai_oauth_provider.py` (~63 tests) covers JWT expiry, OAuth URL params (plan + referrer), CORS origins, redirect URI validation, singleton↔pool sync, concurrency races, refresh error paths, runtime resolution, and malformed-JSON guards. * Extended `test_credential_pool.py`, `test_codex_transport.py`, and `test_run_agent_codex_responses.py` cover the pool sync-back, `extra_body` routing, and 401 reactive refresh paths. * 165 tests passing on this branch via `scripts/run_tests.sh`.
1605 lines
62 KiB
Python
1605 lines
62 KiB
Python
"""Tests for xAI Grok OAuth — tokens stored in Hermes auth store (~/.hermes/auth.json)."""
|
|
|
|
import base64
|
|
import json
|
|
import time
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
from hermes_cli.auth import (
|
|
AuthError,
|
|
DEFAULT_XAI_OAUTH_BASE_URL,
|
|
PROVIDER_REGISTRY,
|
|
XAI_OAUTH_CLIENT_ID,
|
|
XAI_OAUTH_REDIRECT_HOST,
|
|
XAI_OAUTH_REDIRECT_PATH,
|
|
XAI_OAUTH_SCOPE,
|
|
_read_xai_oauth_tokens,
|
|
_save_xai_oauth_tokens,
|
|
_xai_access_token_is_expiring,
|
|
_xai_callback_cors_origin,
|
|
_xai_oauth_build_authorize_url,
|
|
_xai_validate_loopback_redirect_uri,
|
|
get_xai_oauth_auth_status,
|
|
refresh_xai_oauth_pure,
|
|
resolve_provider,
|
|
resolve_xai_oauth_runtime_credentials,
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _setup_hermes_auth(
|
|
hermes_home: Path,
|
|
*,
|
|
access_token: str = "access",
|
|
refresh_token: str = "refresh",
|
|
discovery: dict | None = None,
|
|
):
|
|
"""Write xAI OAuth tokens into the Hermes auth store at the given root."""
|
|
hermes_home.mkdir(parents=True, exist_ok=True)
|
|
state = {
|
|
"tokens": {
|
|
"access_token": access_token,
|
|
"refresh_token": refresh_token,
|
|
"id_token": "",
|
|
"expires_in": 3600,
|
|
"token_type": "Bearer",
|
|
},
|
|
"last_refresh": "2026-05-14T00:00:00Z",
|
|
"auth_mode": "oauth_pkce",
|
|
}
|
|
if discovery is not None:
|
|
state["discovery"] = discovery
|
|
auth_store = {
|
|
"version": 1,
|
|
"active_provider": "xai-oauth",
|
|
"providers": {"xai-oauth": state},
|
|
}
|
|
auth_file = hermes_home / "auth.json"
|
|
auth_file.write_text(json.dumps(auth_store, indent=2))
|
|
return auth_file
|
|
|
|
|
|
def _jwt_with_exp(exp_epoch: int) -> str:
|
|
"""Build a minimal JWT-shaped string with the given exp claim."""
|
|
payload = {"exp": exp_epoch}
|
|
encoded = (
|
|
base64.urlsafe_b64encode(json.dumps(payload).encode("utf-8"))
|
|
.rstrip(b"=")
|
|
.decode("utf-8")
|
|
)
|
|
return f"h.{encoded}.s"
|
|
|
|
|
|
class _StubHTTPResponse:
|
|
def __init__(self, status_code: int, payload):
|
|
self.status_code = status_code
|
|
self._payload = payload
|
|
self.text = json.dumps(payload) if isinstance(payload, (dict, list)) else str(payload)
|
|
|
|
def json(self):
|
|
if isinstance(self._payload, Exception):
|
|
raise self._payload
|
|
return self._payload
|
|
|
|
|
|
class _StubHTTPClient:
|
|
def __init__(self, response):
|
|
self._response = response
|
|
self.last_call = None
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, *args):
|
|
return False
|
|
|
|
def post(self, *args, **kwargs):
|
|
self.last_call = ("post", args, kwargs)
|
|
return self._response
|
|
|
|
|
|
def _patch_httpx_client(monkeypatch, response):
|
|
holder = {"client": None}
|
|
|
|
def _factory(*args, **kwargs):
|
|
client = _StubHTTPClient(response)
|
|
holder["client"] = client
|
|
return client
|
|
|
|
monkeypatch.setattr("hermes_cli.auth.httpx.Client", _factory)
|
|
return holder
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Constants and registry
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_xai_oauth_provider_registered():
|
|
assert "xai-oauth" in PROVIDER_REGISTRY
|
|
pconfig = PROVIDER_REGISTRY["xai-oauth"]
|
|
assert pconfig.id == "xai-oauth"
|
|
assert pconfig.auth_type == "oauth_external"
|
|
assert pconfig.inference_base_url == DEFAULT_XAI_OAUTH_BASE_URL
|
|
|
|
|
|
def test_resolve_provider_normalizes_xai_oauth_aliases():
|
|
assert resolve_provider("xai-oauth") == "xai-oauth"
|
|
assert resolve_provider("grok-oauth") == "xai-oauth"
|
|
assert resolve_provider("x-ai-oauth") == "xai-oauth"
|
|
assert resolve_provider("xai-grok-oauth") == "xai-oauth"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# JWT expiry detection
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_xai_access_token_is_expiring_returns_true_for_expired_jwt():
|
|
expired = _jwt_with_exp(int(time.time()) - 60)
|
|
assert _xai_access_token_is_expiring(expired, 0) is True
|
|
|
|
|
|
def test_xai_access_token_is_expiring_returns_false_for_fresh_jwt():
|
|
fresh = _jwt_with_exp(int(time.time()) + 3600)
|
|
assert _xai_access_token_is_expiring(fresh, 0) is False
|
|
|
|
|
|
def test_xai_access_token_is_expiring_honors_skew_window():
|
|
near = _jwt_with_exp(int(time.time()) + 30)
|
|
assert _xai_access_token_is_expiring(near, 60) is True
|
|
assert _xai_access_token_is_expiring(near, 0) is False
|
|
|
|
|
|
def test_xai_access_token_is_expiring_returns_false_for_non_jwt():
|
|
assert _xai_access_token_is_expiring("not.a.jwt.but.has.dots", 0) is False
|
|
assert _xai_access_token_is_expiring("opaque-token-no-dots", 0) is False
|
|
assert _xai_access_token_is_expiring("", 0) is False
|
|
assert _xai_access_token_is_expiring(None, 0) is False # type: ignore[arg-type]
|
|
|
|
|
|
def test_xai_access_token_is_expiring_returns_false_for_jwt_without_exp():
|
|
payload = {"sub": "user"}
|
|
encoded = base64.urlsafe_b64encode(json.dumps(payload).encode("utf-8")).rstrip(b"=").decode()
|
|
token = f"h.{encoded}.s"
|
|
assert _xai_access_token_is_expiring(token, 0) is False
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Loopback redirect URI validation
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_xai_validate_loopback_redirect_uri_accepts_localhost_with_port():
|
|
host, port, path = _xai_validate_loopback_redirect_uri(
|
|
"http://127.0.0.1:56121/callback"
|
|
)
|
|
assert host == XAI_OAUTH_REDIRECT_HOST
|
|
assert port == 56121
|
|
assert path == XAI_OAUTH_REDIRECT_PATH
|
|
|
|
|
|
def test_xai_validate_loopback_redirect_uri_rejects_https():
|
|
with pytest.raises(AuthError) as exc:
|
|
_xai_validate_loopback_redirect_uri("https://127.0.0.1:56121/callback")
|
|
assert exc.value.code == "xai_redirect_invalid"
|
|
|
|
|
|
def test_xai_validate_loopback_redirect_uri_rejects_non_loopback():
|
|
with pytest.raises(AuthError) as exc:
|
|
_xai_validate_loopback_redirect_uri("http://example.com:56121/callback")
|
|
assert exc.value.code == "xai_redirect_invalid"
|
|
|
|
|
|
def test_xai_validate_loopback_redirect_uri_rejects_missing_port():
|
|
with pytest.raises(AuthError) as exc:
|
|
_xai_validate_loopback_redirect_uri("http://127.0.0.1/callback")
|
|
assert exc.value.code == "xai_redirect_invalid"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Authorize URL construction
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _parse_authorize_url(url: str) -> dict:
|
|
from urllib.parse import urlparse, parse_qs
|
|
|
|
parsed = urlparse(url)
|
|
return {k: v[0] for k, v in parse_qs(parsed.query).items()}
|
|
|
|
|
|
def test_xai_oauth_authorize_url_includes_plan_generic():
|
|
"""Regression: accounts.x.ai requires `plan=generic` for loopback OAuth on
|
|
non-allowlisted clients. Must always be present on the authorize URL."""
|
|
url = _xai_oauth_build_authorize_url(
|
|
authorization_endpoint="https://auth.x.ai/oauth2/authorize",
|
|
redirect_uri="http://127.0.0.1:56121/callback",
|
|
code_challenge="challenge-xyz",
|
|
state="state-abc",
|
|
nonce="nonce-def",
|
|
)
|
|
params = _parse_authorize_url(url)
|
|
assert params["plan"] == "generic"
|
|
|
|
|
|
def test_xai_oauth_authorize_url_includes_referrer_hermes_agent():
|
|
"""Attribution: xAI's OAuth server can identify Hermes-originated logins
|
|
via the referrer query param. Must always be present on the authorize URL."""
|
|
url = _xai_oauth_build_authorize_url(
|
|
authorization_endpoint="https://auth.x.ai/oauth2/authorize",
|
|
redirect_uri="http://127.0.0.1:56121/callback",
|
|
code_challenge="challenge-xyz",
|
|
state="state-abc",
|
|
nonce="nonce-def",
|
|
)
|
|
params = _parse_authorize_url(url)
|
|
assert params["referrer"] == "hermes-agent"
|
|
|
|
|
|
def test_xai_oauth_authorize_url_includes_pkce_and_oidc_params():
|
|
url = _xai_oauth_build_authorize_url(
|
|
authorization_endpoint="https://auth.x.ai/oauth2/authorize",
|
|
redirect_uri="http://127.0.0.1:56121/callback",
|
|
code_challenge="challenge-xyz",
|
|
state="state-abc",
|
|
nonce="nonce-def",
|
|
)
|
|
params = _parse_authorize_url(url)
|
|
assert params["response_type"] == "code"
|
|
assert params["client_id"] == XAI_OAUTH_CLIENT_ID
|
|
assert params["redirect_uri"] == "http://127.0.0.1:56121/callback"
|
|
assert params["scope"] == XAI_OAUTH_SCOPE
|
|
assert params["code_challenge"] == "challenge-xyz"
|
|
assert params["code_challenge_method"] == "S256"
|
|
assert params["state"] == "state-abc"
|
|
assert params["nonce"] == "nonce-def"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# CORS allowlist
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_xai_callback_cors_origin_allowlist():
|
|
assert _xai_callback_cors_origin("https://accounts.x.ai") == "https://accounts.x.ai"
|
|
assert _xai_callback_cors_origin("https://auth.x.ai") == "https://auth.x.ai"
|
|
|
|
|
|
def test_xai_callback_cors_origin_rejects_unknown_origin():
|
|
assert _xai_callback_cors_origin("https://attacker.example.com") == ""
|
|
assert _xai_callback_cors_origin(None) == ""
|
|
assert _xai_callback_cors_origin("") == ""
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Token roundtrip + reads
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_save_and_read_xai_oauth_tokens_roundtrip(tmp_path, monkeypatch):
|
|
hermes_home = tmp_path / "hermes"
|
|
hermes_home.mkdir(parents=True, exist_ok=True)
|
|
(hermes_home / "auth.json").write_text(json.dumps({"version": 1, "providers": {}}))
|
|
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
|
|
|
|
_save_xai_oauth_tokens(
|
|
{
|
|
"access_token": "at-1",
|
|
"refresh_token": "rt-1",
|
|
"id_token": "",
|
|
"expires_in": 3600,
|
|
"token_type": "Bearer",
|
|
},
|
|
discovery={"token_endpoint": "https://auth.x.ai/oauth2/token"},
|
|
redirect_uri="http://127.0.0.1:56121/callback",
|
|
)
|
|
data = _read_xai_oauth_tokens()
|
|
assert data["tokens"]["access_token"] == "at-1"
|
|
assert data["tokens"]["refresh_token"] == "rt-1"
|
|
assert data["redirect_uri"] == "http://127.0.0.1:56121/callback"
|
|
assert data["discovery"]["token_endpoint"] == "https://auth.x.ai/oauth2/token"
|
|
|
|
|
|
def test_read_xai_oauth_tokens_missing(tmp_path, monkeypatch):
|
|
hermes_home = tmp_path / "hermes"
|
|
hermes_home.mkdir(parents=True, exist_ok=True)
|
|
(hermes_home / "auth.json").write_text(json.dumps({"version": 1, "providers": {}}))
|
|
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
|
|
|
|
with pytest.raises(AuthError) as exc:
|
|
_read_xai_oauth_tokens()
|
|
assert exc.value.code == "xai_auth_missing"
|
|
assert exc.value.relogin_required is True
|
|
|
|
|
|
def test_read_xai_oauth_tokens_missing_access_token(tmp_path, monkeypatch):
|
|
hermes_home = tmp_path / "hermes"
|
|
_setup_hermes_auth(hermes_home, access_token="")
|
|
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
|
|
|
|
with pytest.raises(AuthError) as exc:
|
|
_read_xai_oauth_tokens()
|
|
assert exc.value.code == "xai_auth_missing_access_token"
|
|
assert exc.value.relogin_required is True
|
|
|
|
|
|
def test_read_xai_oauth_tokens_missing_refresh_token(tmp_path, monkeypatch):
|
|
hermes_home = tmp_path / "hermes"
|
|
_setup_hermes_auth(hermes_home, refresh_token="")
|
|
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
|
|
|
|
with pytest.raises(AuthError) as exc:
|
|
_read_xai_oauth_tokens()
|
|
assert exc.value.code == "xai_auth_missing_refresh_token"
|
|
assert exc.value.relogin_required is True
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Runtime credential resolution
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_resolve_xai_runtime_credentials_returns_singleton_state(tmp_path, monkeypatch):
|
|
hermes_home = tmp_path / "hermes"
|
|
fresh = _jwt_with_exp(int(time.time()) + 3600)
|
|
_setup_hermes_auth(hermes_home, access_token=fresh)
|
|
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
|
|
monkeypatch.delenv("HERMES_XAI_BASE_URL", raising=False)
|
|
monkeypatch.delenv("XAI_BASE_URL", raising=False)
|
|
|
|
creds = resolve_xai_oauth_runtime_credentials()
|
|
assert creds["provider"] == "xai-oauth"
|
|
assert creds["api_key"] == fresh
|
|
assert creds["base_url"] == DEFAULT_XAI_OAUTH_BASE_URL
|
|
assert creds["source"] == "hermes-auth-store"
|
|
assert creds["auth_mode"] == "oauth_pkce"
|
|
|
|
|
|
def test_resolve_xai_runtime_credentials_refreshes_expiring_token(tmp_path, monkeypatch):
|
|
hermes_home = tmp_path / "hermes"
|
|
expiring = _jwt_with_exp(int(time.time()) - 10)
|
|
_setup_hermes_auth(
|
|
hermes_home,
|
|
access_token=expiring,
|
|
refresh_token="rt-old",
|
|
discovery={"token_endpoint": "https://auth.x.ai/oauth2/token"},
|
|
)
|
|
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
|
|
|
|
new_access = _jwt_with_exp(int(time.time()) + 3600)
|
|
called = {"count": 0}
|
|
|
|
def _fake_refresh(tokens, **kwargs):
|
|
called["count"] += 1
|
|
updated = dict(tokens)
|
|
updated["access_token"] = new_access
|
|
updated["refresh_token"] = "rt-new"
|
|
return updated
|
|
|
|
monkeypatch.setattr("hermes_cli.auth._refresh_xai_oauth_tokens", _fake_refresh)
|
|
|
|
creds = resolve_xai_oauth_runtime_credentials()
|
|
assert called["count"] == 1
|
|
assert creds["api_key"] == new_access
|
|
|
|
|
|
def test_resolve_xai_runtime_credentials_force_refresh(tmp_path, monkeypatch):
|
|
hermes_home = tmp_path / "hermes"
|
|
fresh = _jwt_with_exp(int(time.time()) + 3600)
|
|
_setup_hermes_auth(
|
|
hermes_home,
|
|
access_token=fresh,
|
|
discovery={"token_endpoint": "https://auth.x.ai/oauth2/token"},
|
|
)
|
|
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
|
|
|
|
forced = _jwt_with_exp(int(time.time()) + 7200)
|
|
called = {"count": 0}
|
|
|
|
def _fake_refresh(tokens, **kwargs):
|
|
called["count"] += 1
|
|
updated = dict(tokens)
|
|
updated["access_token"] = forced
|
|
return updated
|
|
|
|
monkeypatch.setattr("hermes_cli.auth._refresh_xai_oauth_tokens", _fake_refresh)
|
|
|
|
creds = resolve_xai_oauth_runtime_credentials(force_refresh=True, refresh_if_expiring=False)
|
|
assert called["count"] == 1
|
|
assert creds["api_key"] == forced
|
|
|
|
|
|
def test_resolve_xai_runtime_credentials_honours_env_base_url(tmp_path, monkeypatch):
|
|
hermes_home = tmp_path / "hermes"
|
|
fresh = _jwt_with_exp(int(time.time()) + 3600)
|
|
_setup_hermes_auth(hermes_home, access_token=fresh)
|
|
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
|
|
monkeypatch.setenv("HERMES_XAI_BASE_URL", "https://custom.x.ai/v1/")
|
|
|
|
creds = resolve_xai_oauth_runtime_credentials()
|
|
assert creds["base_url"] == "https://custom.x.ai/v1"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Auth status surface
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_get_xai_oauth_auth_status_logged_in_via_singleton(tmp_path, monkeypatch):
|
|
hermes_home = tmp_path / "hermes"
|
|
fresh = _jwt_with_exp(int(time.time()) + 3600)
|
|
_setup_hermes_auth(hermes_home, access_token=fresh)
|
|
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
|
|
|
|
status = get_xai_oauth_auth_status()
|
|
assert status["logged_in"] is True
|
|
assert status["api_key"] == fresh
|
|
assert status["auth_mode"] == "oauth_pkce"
|
|
|
|
|
|
def test_get_xai_oauth_auth_status_logged_out(tmp_path, monkeypatch):
|
|
hermes_home = tmp_path / "hermes"
|
|
hermes_home.mkdir(parents=True, exist_ok=True)
|
|
(hermes_home / "auth.json").write_text(json.dumps({"version": 1, "providers": {}}))
|
|
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
|
|
|
|
status = get_xai_oauth_auth_status()
|
|
assert status["logged_in"] is False
|
|
assert "error" in status
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# refresh_xai_oauth_pure error handling
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_refresh_xai_oauth_pure_requires_refresh_token():
|
|
with pytest.raises(AuthError) as exc:
|
|
refresh_xai_oauth_pure("at", "")
|
|
assert exc.value.code == "xai_auth_missing_refresh_token"
|
|
assert exc.value.relogin_required is True
|
|
|
|
|
|
def test_refresh_xai_oauth_pure_relogin_on_400(monkeypatch):
|
|
response = _StubHTTPResponse(400, {"error": "invalid_grant"})
|
|
_patch_httpx_client(monkeypatch, response)
|
|
with pytest.raises(AuthError) as exc:
|
|
refresh_xai_oauth_pure(
|
|
"at", "rt", token_endpoint="https://auth.x.ai/oauth2/token"
|
|
)
|
|
assert exc.value.code == "xai_refresh_failed"
|
|
assert exc.value.relogin_required is True
|
|
|
|
|
|
def test_refresh_xai_oauth_pure_no_relogin_on_500(monkeypatch):
|
|
response = _StubHTTPResponse(503, "service unavailable")
|
|
_patch_httpx_client(monkeypatch, response)
|
|
with pytest.raises(AuthError) as exc:
|
|
refresh_xai_oauth_pure(
|
|
"at", "rt", token_endpoint="https://auth.x.ai/oauth2/token"
|
|
)
|
|
assert exc.value.code == "xai_refresh_failed"
|
|
assert exc.value.relogin_required is False
|
|
|
|
|
|
def test_refresh_xai_oauth_pure_returns_updated_tokens(monkeypatch):
|
|
new_access = _jwt_with_exp(int(time.time()) + 3600)
|
|
response = _StubHTTPResponse(
|
|
200,
|
|
{
|
|
"access_token": new_access,
|
|
"refresh_token": "rt-rotated",
|
|
"id_token": "id-1",
|
|
"expires_in": 3600,
|
|
"token_type": "Bearer",
|
|
},
|
|
)
|
|
holder = _patch_httpx_client(monkeypatch, response)
|
|
|
|
updated = refresh_xai_oauth_pure(
|
|
"at", "rt-old", token_endpoint="https://auth.x.ai/oauth2/token"
|
|
)
|
|
assert updated["access_token"] == new_access
|
|
assert updated["refresh_token"] == "rt-rotated"
|
|
assert updated["id_token"] == "id-1"
|
|
assert updated["token_type"] == "Bearer"
|
|
assert updated["last_refresh"].endswith("Z")
|
|
client = holder["client"]
|
|
assert client is not None
|
|
_method, _args, kwargs = client.last_call
|
|
assert kwargs["data"]["grant_type"] == "refresh_token"
|
|
assert kwargs["data"]["refresh_token"] == "rt-old"
|
|
assert kwargs["data"]["client_id"] == XAI_OAUTH_CLIENT_ID
|
|
|
|
|
|
def test_refresh_xai_oauth_pure_keeps_refresh_token_when_response_omits_it(monkeypatch):
|
|
"""Some OAuth providers don't rotate refresh tokens — preserve the old one."""
|
|
new_access = _jwt_with_exp(int(time.time()) + 3600)
|
|
response = _StubHTTPResponse(
|
|
200,
|
|
{
|
|
"access_token": new_access,
|
|
"expires_in": 3600,
|
|
"token_type": "Bearer",
|
|
},
|
|
)
|
|
_patch_httpx_client(monkeypatch, response)
|
|
|
|
updated = refresh_xai_oauth_pure(
|
|
"at", "rt-stable", token_endpoint="https://auth.x.ai/oauth2/token"
|
|
)
|
|
assert updated["access_token"] == new_access
|
|
assert updated["refresh_token"] == "rt-stable"
|
|
|
|
|
|
def test_refresh_xai_oauth_pure_rejects_response_without_access_token(monkeypatch):
|
|
response = _StubHTTPResponse(
|
|
200,
|
|
{"refresh_token": "rt-new", "expires_in": 3600},
|
|
)
|
|
_patch_httpx_client(monkeypatch, response)
|
|
with pytest.raises(AuthError) as exc:
|
|
refresh_xai_oauth_pure(
|
|
"at", "rt", token_endpoint="https://auth.x.ai/oauth2/token"
|
|
)
|
|
assert exc.value.code == "xai_refresh_missing_access_token"
|
|
assert exc.value.relogin_required is True
|
|
|
|
|
|
def test_refresh_xai_oauth_pure_raises_typed_error_on_malformed_json(monkeypatch):
|
|
"""xAI returning HTTP 200 with a non-JSON body (captive portal, proxy
|
|
error page, etc.) must surface a typed AuthError, not a raw
|
|
``json.JSONDecodeError`` traceback. Matches the qwen-oauth precedent
|
|
so the upstream UX layer (``format_auth_error``) can map the failure."""
|
|
response = _StubHTTPResponse(200, ValueError("not json"))
|
|
response.text = "<html>captive portal</html>"
|
|
_patch_httpx_client(monkeypatch, response)
|
|
with pytest.raises(AuthError) as exc:
|
|
refresh_xai_oauth_pure(
|
|
"at", "rt", token_endpoint="https://auth.x.ai/oauth2/token"
|
|
)
|
|
assert exc.value.code == "xai_refresh_invalid_json"
|
|
|
|
|
|
def test_xai_oauth_discovery_raises_typed_error_on_malformed_json(monkeypatch):
|
|
"""Discovery is a cold-start, one-time fetch. If the response is HTTP
|
|
200 with a non-JSON body (corporate proxy / captive portal returning
|
|
HTML), surface a typed AuthError rather than letting the
|
|
``json.JSONDecodeError`` escape — so the message reads as an auth
|
|
problem instead of an internal parsing crash."""
|
|
from hermes_cli.auth import _xai_oauth_discovery
|
|
|
|
class _BadJSON:
|
|
status_code = 200
|
|
|
|
def json(self):
|
|
raise ValueError("Expecting value: line 1 column 1 (char 0)")
|
|
|
|
monkeypatch.setattr(
|
|
"hermes_cli.auth.httpx.get",
|
|
lambda *a, **kw: _BadJSON(),
|
|
)
|
|
with pytest.raises(AuthError) as exc:
|
|
_xai_oauth_discovery()
|
|
assert exc.value.code == "xai_discovery_invalid_json"
|
|
|
|
|
|
def test_xai_oauth_discovery_raises_typed_error_on_non_object_payload(monkeypatch):
|
|
"""A discovery body that decodes as JSON but isn't an object (e.g. a
|
|
bare string or array) must not slip through and trigger an
|
|
``AttributeError`` on ``payload.get(...)`` later. Reject loudly
|
|
with the same incomplete-response code the missing-endpoint path uses."""
|
|
from hermes_cli.auth import _xai_oauth_discovery
|
|
|
|
class _StubResponse:
|
|
status_code = 200
|
|
|
|
def json(self):
|
|
return ["not", "an", "object"]
|
|
|
|
monkeypatch.setattr(
|
|
"hermes_cli.auth.httpx.get",
|
|
lambda *a, **kw: _StubResponse(),
|
|
)
|
|
with pytest.raises(AuthError) as exc:
|
|
_xai_oauth_discovery()
|
|
assert exc.value.code == "xai_discovery_incomplete"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# OIDC discovery endpoint origin/scheme validation (MITM hardening)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_refresh_xai_oauth_pure_rejects_non_https_token_endpoint(monkeypatch):
|
|
"""A poisoned auth.json (from MITM during initial discovery, or an older
|
|
Hermes that didn't validate) must not be silently honored on the refresh
|
|
hot path. A non-HTTPS ``token_endpoint`` would leak the refresh_token in
|
|
cleartext on every refresh; refuse before the POST."""
|
|
# No HTTP stub installed — refresh must fail at validation, not at POST.
|
|
with pytest.raises(AuthError) as exc:
|
|
refresh_xai_oauth_pure(
|
|
"at", "rt", token_endpoint="http://auth.x.ai/oauth2/token"
|
|
)
|
|
assert exc.value.code == "xai_discovery_invalid"
|
|
|
|
|
|
def test_refresh_xai_oauth_pure_rejects_off_origin_token_endpoint(monkeypatch):
|
|
"""Pin the cached token_endpoint host to the xAI origin. A one-time MITM
|
|
during discovery could persist a token_endpoint on attacker-controlled
|
|
infrastructure — every subsequent refresh would silently leak the
|
|
refresh_token to that attacker. Refuse off-origin endpoints loudly so
|
|
the user can re-run discovery."""
|
|
with pytest.raises(AuthError) as exc:
|
|
refresh_xai_oauth_pure(
|
|
"at", "rt", token_endpoint="https://evil.example.com/token"
|
|
)
|
|
assert exc.value.code == "xai_discovery_invalid"
|
|
|
|
|
|
def test_refresh_xai_oauth_pure_rejects_lookalike_suffix(monkeypatch):
|
|
"""Substring confusion: ``evil-x.ai`` ends in ``x.ai`` but is NOT a
|
|
``.x.ai`` subdomain. The validator must enforce the leading-dot suffix
|
|
so attacker-registered apex lookalikes can't slip through."""
|
|
with pytest.raises(AuthError) as exc:
|
|
refresh_xai_oauth_pure(
|
|
"at", "rt", token_endpoint="https://evilx.ai/token"
|
|
)
|
|
assert exc.value.code == "xai_discovery_invalid"
|
|
|
|
|
|
def test_refresh_xai_oauth_pure_accepts_apex_and_subdomain_endpoints(monkeypatch):
|
|
"""The validator must accept BOTH the bare xAI apex (``x.ai``) and any
|
|
``*.x.ai`` subdomain (e.g. ``auth.x.ai`` today, future migrations to
|
|
``accounts.x.ai`` etc.). Without subdomain support we'd lock the
|
|
integration to whatever xAI happens to use today."""
|
|
new_access = _jwt_with_exp(int(time.time()) + 3600)
|
|
response = _StubHTTPResponse(
|
|
200,
|
|
{"access_token": new_access, "expires_in": 3600, "token_type": "Bearer"},
|
|
)
|
|
_patch_httpx_client(monkeypatch, response)
|
|
# auth.x.ai (current production)
|
|
updated = refresh_xai_oauth_pure(
|
|
"at", "rt", token_endpoint="https://auth.x.ai/oauth2/token"
|
|
)
|
|
assert updated["access_token"] == new_access
|
|
# hypothetical migration to accounts.x.ai
|
|
_patch_httpx_client(monkeypatch, response)
|
|
updated2 = refresh_xai_oauth_pure(
|
|
"at", "rt", token_endpoint="https://accounts.x.ai/token"
|
|
)
|
|
assert updated2["access_token"] == new_access
|
|
|
|
|
|
def test_xai_oauth_discovery_validates_endpoints(monkeypatch):
|
|
"""The discovery response itself goes through endpoint validation, so a
|
|
one-time MITM during initial login cannot poison ``auth.json`` with an
|
|
attacker-controlled ``token_endpoint``. (The persistence is what makes
|
|
this attack worth defending against — one MITM = forever credential
|
|
leak.)"""
|
|
from hermes_cli.auth import _xai_oauth_discovery
|
|
|
|
class _StubGetResponse:
|
|
status_code = 200
|
|
|
|
def __init__(self, payload):
|
|
self._payload = payload
|
|
|
|
def json(self):
|
|
return self._payload
|
|
|
|
def _fake_get(url, headers=None, timeout=None):
|
|
return _StubGetResponse({
|
|
"authorization_endpoint": "https://auth.x.ai/oauth2/authorize",
|
|
"token_endpoint": "https://evil.example.com/token", # poisoned
|
|
})
|
|
|
|
monkeypatch.setattr("hermes_cli.auth.httpx.get", _fake_get)
|
|
with pytest.raises(AuthError) as exc:
|
|
_xai_oauth_discovery()
|
|
assert exc.value.code == "xai_discovery_invalid"
|
|
|
|
|
|
def test_xai_oauth_discovery_validates_authorization_endpoint(monkeypatch):
|
|
"""A poisoned ``authorization_endpoint`` is just as dangerous as a
|
|
poisoned ``token_endpoint``: it sends the user's browser (with their
|
|
logged-in xAI session cookies) to attacker infrastructure that can
|
|
phish the consent screen and exchange a stolen authorization code.
|
|
|
|
Both endpoints must be validated independently. This test pins the
|
|
parity so nobody can later "optimise" by validating only the token
|
|
endpoint and silently lose authorization-endpoint defense."""
|
|
from hermes_cli.auth import _xai_oauth_discovery
|
|
|
|
class _StubGetResponse:
|
|
status_code = 200
|
|
|
|
def __init__(self, payload):
|
|
self._payload = payload
|
|
|
|
def json(self):
|
|
return self._payload
|
|
|
|
def _fake_get(url, headers=None, timeout=None):
|
|
return _StubGetResponse({
|
|
"authorization_endpoint": "https://evil.example.com/authorize", # poisoned
|
|
"token_endpoint": "https://auth.x.ai/oauth2/token",
|
|
})
|
|
|
|
monkeypatch.setattr("hermes_cli.auth.httpx.get", _fake_get)
|
|
with pytest.raises(AuthError) as exc:
|
|
_xai_oauth_discovery()
|
|
assert exc.value.code == "xai_discovery_invalid"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Pool seeding from singleton
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_credential_pool_seeds_xai_oauth_from_singleton(tmp_path, monkeypatch):
|
|
"""After `hermes model` -> xai-oauth, the singleton holds tokens. load_pool
|
|
must surface that as a pool entry so `hermes auth list` reflects truth and
|
|
refreshes route through the pool consistently with codex."""
|
|
from agent.credential_pool import load_pool
|
|
|
|
hermes_home = tmp_path / "hermes"
|
|
fresh = _jwt_with_exp(int(time.time()) + 3600)
|
|
_setup_hermes_auth(hermes_home, access_token=fresh, refresh_token="rt-1")
|
|
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
|
|
|
|
pool = load_pool("xai-oauth")
|
|
assert pool.has_credentials()
|
|
entries = pool.entries()
|
|
assert len(entries) == 1
|
|
entry = entries[0]
|
|
assert entry.access_token == fresh
|
|
assert entry.refresh_token == "rt-1"
|
|
assert entry.source == "loopback_pkce"
|
|
assert entry.base_url == DEFAULT_XAI_OAUTH_BASE_URL
|
|
|
|
|
|
def test_credential_pool_does_not_seed_when_singleton_missing_access_token(tmp_path, monkeypatch):
|
|
from agent.credential_pool import load_pool
|
|
|
|
hermes_home = tmp_path / "hermes"
|
|
hermes_home.mkdir(parents=True, exist_ok=True)
|
|
auth_store = {
|
|
"version": 1,
|
|
"providers": {
|
|
"xai-oauth": {
|
|
"tokens": {"access_token": "", "refresh_token": "rt"},
|
|
"auth_mode": "oauth_pkce",
|
|
}
|
|
},
|
|
}
|
|
(hermes_home / "auth.json").write_text(json.dumps(auth_store))
|
|
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
|
|
|
|
pool = load_pool("xai-oauth")
|
|
assert not pool.has_credentials()
|
|
|
|
|
|
def test_credential_pool_seed_respects_suppression(tmp_path, monkeypatch):
|
|
"""`hermes auth remove xai-oauth <N>` for the seeded entry suppresses
|
|
further re-seeding so the removal is stable across load_pool calls."""
|
|
from agent.credential_pool import load_pool
|
|
|
|
hermes_home = tmp_path / "hermes"
|
|
fresh = _jwt_with_exp(int(time.time()) + 3600)
|
|
_setup_hermes_auth(hermes_home, access_token=fresh)
|
|
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
|
|
|
|
# Suppress the source — mimic `hermes auth remove`.
|
|
from hermes_cli.auth import suppress_credential_source
|
|
|
|
suppress_credential_source("xai-oauth", "loopback_pkce")
|
|
|
|
pool = load_pool("xai-oauth")
|
|
assert not pool.has_credentials()
|
|
|
|
|
|
def test_auth_remove_xai_oauth_clears_singleton_and_sticks(tmp_path, monkeypatch):
|
|
"""End-to-end regression: ``hermes auth remove xai-oauth 1`` for a
|
|
singleton-seeded entry must clear auth.json providers.xai-oauth AND
|
|
suppress further re-seeding — otherwise the next ``load_pool`` call
|
|
silently resurrects the entry from the still-present singleton, making
|
|
the user-facing removal a no-op (the entry reappears on the next
|
|
invocation with no warning).
|
|
|
|
The bug pre-fix: there was no RemovalStep registered for
|
|
(xai-oauth, loopback_pkce), so ``find_removal_step`` returned None
|
|
and ``auth_remove_command`` fell through to the "unregistered source —
|
|
nothing to clean up" branch. That branch is correct for ``manual``
|
|
entries (pool-only) but wrong for singleton-seeded loopback_pkce
|
|
entries (auth.json singleton survives the in-memory removal)."""
|
|
from agent.credential_pool import load_pool
|
|
from hermes_cli.auth_commands import auth_remove_command
|
|
from types import SimpleNamespace
|
|
|
|
hermes_home = tmp_path / "hermes"
|
|
fresh = _jwt_with_exp(int(time.time()) + 3600)
|
|
_setup_hermes_auth(hermes_home, access_token=fresh, refresh_token="rt-1")
|
|
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
|
|
|
|
# Confirm pre-state: pool sees the seeded entry, auth.json has the singleton.
|
|
pool = load_pool("xai-oauth")
|
|
assert pool.has_credentials()
|
|
raw = json.loads((hermes_home / "auth.json").read_text())
|
|
assert "xai-oauth" in raw.get("providers", {})
|
|
|
|
# Act: the user runs `hermes auth remove xai-oauth 1`.
|
|
auth_remove_command(SimpleNamespace(provider="xai-oauth", target="1"))
|
|
|
|
# Post-state: auth.json singleton must be cleared so a re-seed has
|
|
# nothing to import.
|
|
raw_after = json.loads((hermes_home / "auth.json").read_text())
|
|
assert "xai-oauth" not in raw_after.get("providers", {}), (
|
|
"auth.json providers.xai-oauth must be cleared — otherwise the "
|
|
"next load_pool() reseeds the removed entry from the surviving "
|
|
"singleton, silently undoing the user's removal."
|
|
)
|
|
|
|
# And the next load must not reseed the entry from anywhere.
|
|
pool_after = load_pool("xai-oauth")
|
|
assert not pool_after.has_credentials(), (
|
|
"Removal must stick across load_pool() calls — without the "
|
|
"loopback_pkce RemovalStep, the seed function reads the singleton "
|
|
"and rebuilds the entry on every Hermes invocation."
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Pool sync-back to singleton after refresh
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_pool_sync_back_writes_to_singleton(tmp_path, monkeypatch):
|
|
"""When the pool refreshes a singleton-seeded xAI entry, the new tokens
|
|
must be written back to providers["xai-oauth"] so that
|
|
resolve_xai_oauth_runtime_credentials() (which reads the singleton)
|
|
doesn't keep using the consumed refresh token."""
|
|
from agent.credential_pool import load_pool
|
|
|
|
hermes_home = tmp_path / "hermes"
|
|
expired = _jwt_with_exp(int(time.time()) - 10)
|
|
_setup_hermes_auth(hermes_home, access_token=expired, refresh_token="rt-old")
|
|
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
|
|
|
|
new_access = _jwt_with_exp(int(time.time()) + 3600)
|
|
|
|
def _fake_refresh(access_token, refresh_token, **kwargs):
|
|
assert refresh_token == "rt-old"
|
|
return {
|
|
"access_token": new_access,
|
|
"refresh_token": "rt-new",
|
|
"id_token": "",
|
|
"expires_in": 3600,
|
|
"token_type": "Bearer",
|
|
"last_refresh": "2026-05-15T01:00:00Z",
|
|
}
|
|
|
|
monkeypatch.setattr("hermes_cli.auth.refresh_xai_oauth_pure", _fake_refresh)
|
|
|
|
pool = load_pool("xai-oauth")
|
|
selected = pool.select()
|
|
assert selected is not None
|
|
assert selected.access_token == new_access
|
|
assert selected.refresh_token == "rt-new"
|
|
|
|
# Singleton must reflect refreshed tokens — otherwise the next process
|
|
# to load credentials would re-seed the consumed refresh token.
|
|
auth_path = hermes_home / "auth.json"
|
|
raw = json.loads(auth_path.read_text())
|
|
state = raw["providers"]["xai-oauth"]
|
|
assert state["tokens"]["access_token"] == new_access
|
|
assert state["tokens"]["refresh_token"] == "rt-new"
|
|
assert state["last_refresh"] == "2026-05-15T01:00:00Z"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Runtime provider routing
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_runtime_provider_uses_pool_entry_for_xai_oauth(tmp_path, monkeypatch):
|
|
from hermes_cli.runtime_provider import resolve_runtime_provider
|
|
|
|
hermes_home = tmp_path / "hermes"
|
|
fresh = _jwt_with_exp(int(time.time()) + 3600)
|
|
_setup_hermes_auth(hermes_home, access_token=fresh)
|
|
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
|
|
monkeypatch.delenv("HERMES_XAI_BASE_URL", raising=False)
|
|
monkeypatch.delenv("XAI_BASE_URL", raising=False)
|
|
|
|
runtime = resolve_runtime_provider(requested="xai-oauth")
|
|
assert runtime["provider"] == "xai-oauth"
|
|
assert runtime["api_mode"] == "codex_responses"
|
|
assert runtime["api_key"] == fresh
|
|
assert runtime["base_url"] == DEFAULT_XAI_OAUTH_BASE_URL
|
|
|
|
|
|
def test_runtime_provider_default_base_url_when_pool_entry_missing_url(tmp_path, monkeypatch):
|
|
"""Edge case: a pool entry that somehow has an empty base_url should still
|
|
surface the default xAI inference base URL instead of an empty string."""
|
|
from agent.credential_pool import load_pool, AUTH_TYPE_OAUTH, PooledCredential
|
|
import uuid
|
|
|
|
hermes_home = tmp_path / "hermes"
|
|
hermes_home.mkdir(parents=True, exist_ok=True)
|
|
(hermes_home / "auth.json").write_text(json.dumps({"version": 1, "providers": {}}))
|
|
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
|
|
monkeypatch.delenv("HERMES_XAI_BASE_URL", raising=False)
|
|
monkeypatch.delenv("XAI_BASE_URL", raising=False)
|
|
|
|
fresh = _jwt_with_exp(int(time.time()) + 3600)
|
|
pool = load_pool("xai-oauth")
|
|
pool.add_entry(
|
|
PooledCredential(
|
|
provider="xai-oauth",
|
|
id=uuid.uuid4().hex[:6],
|
|
label="test",
|
|
auth_type=AUTH_TYPE_OAUTH,
|
|
priority=0,
|
|
source="manual:xai_pkce",
|
|
access_token=fresh,
|
|
refresh_token="rt",
|
|
base_url="",
|
|
)
|
|
)
|
|
|
|
from hermes_cli.runtime_provider import resolve_runtime_provider
|
|
|
|
runtime = resolve_runtime_provider(requested="xai-oauth")
|
|
assert runtime["provider"] == "xai-oauth"
|
|
assert runtime["api_mode"] == "codex_responses"
|
|
assert runtime["api_key"] == fresh
|
|
assert runtime["base_url"] == DEFAULT_XAI_OAUTH_BASE_URL
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Token-expiry behavior on the pool path
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_pool_entry_needs_refresh_when_jwt_within_skew(tmp_path, monkeypatch):
|
|
"""The pool's proactive-refresh gate must trigger when the JWT exp claim
|
|
is within the XAI_ACCESS_TOKEN_REFRESH_SKEW_SECONDS window — otherwise a
|
|
near-expired token will hit the API and 401 unnecessarily. Mirrors the
|
|
Codex skew-window behavior."""
|
|
from agent.credential_pool import load_pool, AUTH_TYPE_OAUTH, PooledCredential
|
|
from hermes_cli.auth import XAI_ACCESS_TOKEN_REFRESH_SKEW_SECONDS
|
|
import uuid
|
|
|
|
hermes_home = tmp_path / "hermes"
|
|
hermes_home.mkdir(parents=True, exist_ok=True)
|
|
(hermes_home / "auth.json").write_text(json.dumps({"version": 1, "providers": {}}))
|
|
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
|
|
|
|
# Token expires in 30s — well inside the 120s skew window.
|
|
near_expiry = _jwt_with_exp(int(time.time()) + 30)
|
|
pool = load_pool("xai-oauth")
|
|
entry = PooledCredential(
|
|
provider="xai-oauth",
|
|
id=uuid.uuid4().hex[:6],
|
|
label="test",
|
|
auth_type=AUTH_TYPE_OAUTH,
|
|
priority=0,
|
|
source="manual:xai_pkce",
|
|
access_token=near_expiry,
|
|
refresh_token="rt",
|
|
base_url=DEFAULT_XAI_OAUTH_BASE_URL,
|
|
)
|
|
pool.add_entry(entry)
|
|
assert XAI_ACCESS_TOKEN_REFRESH_SKEW_SECONDS > 30
|
|
assert pool._entry_needs_refresh(entry) is True
|
|
|
|
|
|
def test_pool_entry_no_refresh_for_fresh_jwt(tmp_path, monkeypatch):
|
|
"""A fresh JWT beyond the skew window must NOT trigger proactive refresh."""
|
|
from agent.credential_pool import load_pool, AUTH_TYPE_OAUTH, PooledCredential
|
|
import uuid
|
|
|
|
hermes_home = tmp_path / "hermes"
|
|
hermes_home.mkdir(parents=True, exist_ok=True)
|
|
(hermes_home / "auth.json").write_text(json.dumps({"version": 1, "providers": {}}))
|
|
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
|
|
|
|
fresh = _jwt_with_exp(int(time.time()) + 3600)
|
|
pool = load_pool("xai-oauth")
|
|
entry = PooledCredential(
|
|
provider="xai-oauth",
|
|
id=uuid.uuid4().hex[:6],
|
|
label="test",
|
|
auth_type=AUTH_TYPE_OAUTH,
|
|
priority=0,
|
|
source="manual:xai_pkce",
|
|
access_token=fresh,
|
|
refresh_token="rt",
|
|
base_url=DEFAULT_XAI_OAUTH_BASE_URL,
|
|
)
|
|
pool.add_entry(entry)
|
|
assert pool._entry_needs_refresh(entry) is False
|
|
|
|
|
|
def test_pool_select_proactively_refreshes_expiring_token(tmp_path, monkeypatch):
|
|
"""End-to-end: pool.select() with refresh=True on an expiring entry must
|
|
return the refreshed token. This is the proactive path that runs BEFORE
|
|
the API call — separate from the 401-reactive path."""
|
|
from agent.credential_pool import load_pool, AUTH_TYPE_OAUTH, PooledCredential
|
|
import uuid
|
|
|
|
hermes_home = tmp_path / "hermes"
|
|
hermes_home.mkdir(parents=True, exist_ok=True)
|
|
(hermes_home / "auth.json").write_text(json.dumps({"version": 1, "providers": {}}))
|
|
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
|
|
|
|
near_expiry = _jwt_with_exp(int(time.time()) + 30)
|
|
new_access = _jwt_with_exp(int(time.time()) + 3600)
|
|
|
|
refresh_calls = {"count": 0}
|
|
|
|
def _fake_refresh(access_token, refresh_token, **kwargs):
|
|
refresh_calls["count"] += 1
|
|
assert refresh_token == "rt-old"
|
|
return {
|
|
"access_token": new_access,
|
|
"refresh_token": "rt-new",
|
|
"id_token": "",
|
|
"expires_in": 3600,
|
|
"token_type": "Bearer",
|
|
"last_refresh": "2026-05-15T01:00:00Z",
|
|
}
|
|
|
|
monkeypatch.setattr("hermes_cli.auth.refresh_xai_oauth_pure", _fake_refresh)
|
|
|
|
pool = load_pool("xai-oauth")
|
|
pool.add_entry(
|
|
PooledCredential(
|
|
provider="xai-oauth",
|
|
id=uuid.uuid4().hex[:6],
|
|
label="test",
|
|
auth_type=AUTH_TYPE_OAUTH,
|
|
priority=0,
|
|
source="manual:xai_pkce",
|
|
access_token=near_expiry,
|
|
refresh_token="rt-old",
|
|
base_url=DEFAULT_XAI_OAUTH_BASE_URL,
|
|
)
|
|
)
|
|
|
|
selected = pool.select()
|
|
assert refresh_calls["count"] == 1
|
|
assert selected is not None
|
|
assert selected.access_token == new_access
|
|
assert selected.refresh_token == "rt-new"
|
|
|
|
|
|
def test_pool_try_refresh_current_handles_xai_oauth(tmp_path, monkeypatch):
|
|
"""The reactive 401-recovery path uses pool.try_refresh_current(). This
|
|
must work for xai-oauth alongside openai-codex — otherwise mid-call
|
|
expirations get propagated as hard failures instead of being retried with
|
|
fresh tokens."""
|
|
from agent.credential_pool import load_pool, AUTH_TYPE_OAUTH, PooledCredential
|
|
import uuid
|
|
|
|
hermes_home = tmp_path / "hermes"
|
|
hermes_home.mkdir(parents=True, exist_ok=True)
|
|
(hermes_home / "auth.json").write_text(json.dumps({"version": 1, "providers": {}}))
|
|
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
|
|
|
|
# Even a "fresh-looking" token gets force-refreshed via try_refresh_current.
|
|
# We simulate the scenario where the server rejected the token (401)
|
|
# despite client-side expiry math saying it's still valid (e.g. clock
|
|
# skew, server-side revocation, token bound to a session that expired).
|
|
seemingly_fresh = _jwt_with_exp(int(time.time()) + 3600)
|
|
new_access = _jwt_with_exp(int(time.time()) + 7200)
|
|
|
|
def _fake_refresh(access_token, refresh_token, **kwargs):
|
|
return {
|
|
"access_token": new_access,
|
|
"refresh_token": "rt-rotated",
|
|
"id_token": "",
|
|
"expires_in": 3600,
|
|
"token_type": "Bearer",
|
|
"last_refresh": "2026-05-15T02:00:00Z",
|
|
}
|
|
|
|
monkeypatch.setattr("hermes_cli.auth.refresh_xai_oauth_pure", _fake_refresh)
|
|
|
|
pool = load_pool("xai-oauth")
|
|
pool.add_entry(
|
|
PooledCredential(
|
|
provider="xai-oauth",
|
|
id=uuid.uuid4().hex[:6],
|
|
label="test",
|
|
auth_type=AUTH_TYPE_OAUTH,
|
|
priority=0,
|
|
source="manual:xai_pkce",
|
|
access_token=seemingly_fresh,
|
|
refresh_token="rt-old",
|
|
base_url=DEFAULT_XAI_OAUTH_BASE_URL,
|
|
)
|
|
)
|
|
pool.select()
|
|
refreshed = pool.try_refresh_current()
|
|
assert refreshed is not None
|
|
assert refreshed.access_token == new_access
|
|
assert refreshed.refresh_token == "rt-rotated"
|
|
|
|
|
|
def test_pool_refresh_marks_entry_exhausted_on_failure(tmp_path, monkeypatch):
|
|
"""When the xAI refresh endpoint rejects the refresh_token (e.g. consumed
|
|
by another process, revoked), the pool must surface the failure cleanly
|
|
rather than silently retaining stale tokens. This is critical for the
|
|
failover path — _recover_with_credential_pool rotates to the next entry
|
|
only if try_refresh_current returns None."""
|
|
from agent.credential_pool import load_pool, AUTH_TYPE_OAUTH, PooledCredential
|
|
from hermes_cli.auth import AuthError
|
|
import uuid
|
|
|
|
hermes_home = tmp_path / "hermes"
|
|
hermes_home.mkdir(parents=True, exist_ok=True)
|
|
(hermes_home / "auth.json").write_text(json.dumps({"version": 1, "providers": {}}))
|
|
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
|
|
|
|
def _fake_refresh_fail(*args, **kwargs):
|
|
raise AuthError("refresh_token_reused", code="xai_refresh_failed", relogin_required=True)
|
|
|
|
monkeypatch.setattr("hermes_cli.auth.refresh_xai_oauth_pure", _fake_refresh_fail)
|
|
|
|
pool = load_pool("xai-oauth")
|
|
seemingly_fresh = _jwt_with_exp(int(time.time()) + 3600)
|
|
pool.add_entry(
|
|
PooledCredential(
|
|
provider="xai-oauth",
|
|
id=uuid.uuid4().hex[:6],
|
|
label="test",
|
|
auth_type=AUTH_TYPE_OAUTH,
|
|
priority=0,
|
|
source="manual:xai_pkce",
|
|
access_token=seemingly_fresh,
|
|
refresh_token="rt-revoked",
|
|
base_url=DEFAULT_XAI_OAUTH_BASE_URL,
|
|
)
|
|
)
|
|
pool.select()
|
|
refreshed = pool.try_refresh_current()
|
|
# Refresh failure must return None so the caller falls through to
|
|
# credential rotation / friendly error display.
|
|
assert refreshed is None
|
|
|
|
|
|
def test_pool_seeded_entry_sync_back_after_refresh(tmp_path, monkeypatch):
|
|
"""When an entry seeded from the singleton (source='loopback_pkce')
|
|
is refreshed by the pool, the new tokens must be written back so a
|
|
fresh process load doesn't re-seed the now-consumed refresh token."""
|
|
from agent.credential_pool import load_pool
|
|
|
|
hermes_home = tmp_path / "hermes"
|
|
near_expiry = _jwt_with_exp(int(time.time()) + 30)
|
|
_setup_hermes_auth(hermes_home, access_token=near_expiry, refresh_token="rt-singleton")
|
|
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
|
|
|
|
new_access = _jwt_with_exp(int(time.time()) + 3600)
|
|
|
|
def _fake_refresh(access_token, refresh_token, **kwargs):
|
|
assert refresh_token == "rt-singleton"
|
|
return {
|
|
"access_token": new_access,
|
|
"refresh_token": "rt-rotated",
|
|
"id_token": "",
|
|
"expires_in": 3600,
|
|
"token_type": "Bearer",
|
|
"last_refresh": "2026-05-15T03:00:00Z",
|
|
}
|
|
|
|
monkeypatch.setattr("hermes_cli.auth.refresh_xai_oauth_pure", _fake_refresh)
|
|
|
|
pool = load_pool("xai-oauth")
|
|
selected = pool.select()
|
|
assert selected is not None
|
|
assert selected.access_token == new_access
|
|
|
|
raw = json.loads((hermes_home / "auth.json").read_text())
|
|
tokens = raw["providers"]["xai-oauth"]["tokens"]
|
|
assert tokens["access_token"] == new_access
|
|
assert tokens["refresh_token"] == "rt-rotated"
|
|
|
|
|
|
def test_pool_refresh_adopts_singleton_tokens_when_consumed_elsewhere(tmp_path, monkeypatch):
|
|
"""Multi-process race: another Hermes process refreshed the singleton
|
|
(rotating the refresh_token) while this process held a stale in-memory
|
|
pool entry. ``_refresh_entry`` must adopt the fresher singleton tokens
|
|
BEFORE spending its own (now-consumed) refresh_token, otherwise the
|
|
refresh POST would replay the consumed token and fail with
|
|
``refresh_token_reused``.
|
|
|
|
Mirrors the proactive sync codex/nous already perform for the same
|
|
reason, and is what makes the pool actually safe to share across
|
|
profiles + Hermes processes."""
|
|
from agent.credential_pool import load_pool
|
|
|
|
hermes_home = tmp_path / "hermes"
|
|
in_memory_at = _jwt_with_exp(int(time.time()) + 30) # near-expiry
|
|
_setup_hermes_auth(hermes_home, access_token=in_memory_at, refresh_token="rt-stale")
|
|
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
|
|
|
|
# Load the pool once so the in-memory entry is seeded with rt-stale.
|
|
pool = load_pool("xai-oauth")
|
|
|
|
# Now simulate "another process refreshed the tokens" by overwriting
|
|
# the singleton on disk WITHOUT touching this process's pool object.
|
|
other_process_at = _jwt_with_exp(int(time.time()) + 3600)
|
|
raw = json.loads((hermes_home / "auth.json").read_text())
|
|
raw["providers"]["xai-oauth"]["tokens"] = {
|
|
"access_token": other_process_at,
|
|
"refresh_token": "rt-rotated-by-other-process",
|
|
"id_token": "",
|
|
"expires_in": 3600,
|
|
"token_type": "Bearer",
|
|
}
|
|
(hermes_home / "auth.json").write_text(json.dumps(raw))
|
|
|
|
refresh_calls = {"refresh_token_seen": None}
|
|
final_at = _jwt_with_exp(int(time.time()) + 7200)
|
|
|
|
def _fake_refresh(access_token, refresh_token, **kwargs):
|
|
# The pool MUST have adopted the rotated token from auth.json before
|
|
# POSTing the refresh — otherwise it would replay the stale one.
|
|
refresh_calls["refresh_token_seen"] = refresh_token
|
|
return {
|
|
"access_token": final_at,
|
|
"refresh_token": "rt-final",
|
|
"id_token": "",
|
|
"expires_in": 3600,
|
|
"token_type": "Bearer",
|
|
"last_refresh": "2026-05-15T05:00:00Z",
|
|
}
|
|
|
|
monkeypatch.setattr("hermes_cli.auth.refresh_xai_oauth_pure", _fake_refresh)
|
|
|
|
selected = pool.select()
|
|
assert selected is not None
|
|
assert refresh_calls["refresh_token_seen"] == "rt-rotated-by-other-process"
|
|
assert selected.access_token == final_at
|
|
|
|
|
|
def test_pool_refresh_recovers_when_other_process_already_refreshed(tmp_path, monkeypatch):
|
|
"""Variant of the multi-process race where the other process refreshes
|
|
BETWEEN our proactive sync and the HTTP POST. Our refresh fails with a
|
|
consumed-token error; we must re-check auth.json, find the fresh pair
|
|
(written by the racing process), and adopt it instead of marking the
|
|
entry exhausted."""
|
|
from agent.credential_pool import load_pool
|
|
|
|
hermes_home = tmp_path / "hermes"
|
|
in_memory_at = _jwt_with_exp(int(time.time()) + 30)
|
|
_setup_hermes_auth(hermes_home, access_token=in_memory_at, refresh_token="rt-shared")
|
|
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
|
|
|
|
pool = load_pool("xai-oauth")
|
|
|
|
other_process_at = _jwt_with_exp(int(time.time()) + 3600)
|
|
|
|
def _fake_refresh(access_token, refresh_token, **kwargs):
|
|
# Simulate the racing process winning at the auth server right
|
|
# before our POST: by the time we reach this call, auth.json
|
|
# already holds the fresher pair, but we POSTed with rt-shared.
|
|
raw = json.loads((hermes_home / "auth.json").read_text())
|
|
raw["providers"]["xai-oauth"]["tokens"] = {
|
|
"access_token": other_process_at,
|
|
"refresh_token": "rt-rotated",
|
|
"id_token": "",
|
|
"expires_in": 3600,
|
|
"token_type": "Bearer",
|
|
}
|
|
(hermes_home / "auth.json").write_text(json.dumps(raw))
|
|
raise AuthError(
|
|
"refresh_token_reused",
|
|
provider="xai-oauth",
|
|
code="xai_refresh_failed",
|
|
relogin_required=True,
|
|
)
|
|
|
|
monkeypatch.setattr("hermes_cli.auth.refresh_xai_oauth_pure", _fake_refresh)
|
|
|
|
selected = pool.select()
|
|
# Even though refresh_xai_oauth_pure raised, the post-failure
|
|
# recovery path should adopt the fresher singleton tokens.
|
|
assert selected is not None
|
|
assert selected.access_token == other_process_at
|
|
assert selected.refresh_token == "rt-rotated"
|
|
|
|
|
|
def test_pool_exhausted_xai_entry_recovers_after_singleton_refresh(tmp_path, monkeypatch):
|
|
"""When a singleton-seeded entry is parked as STATUS_EXHAUSTED and the
|
|
user runs ``hermes model`` -> xAI Grok OAuth (or another process
|
|
refreshes), the next ``_available_entries`` pass must adopt the fresh
|
|
auth.json tokens instead of leaving the entry frozen until the
|
|
cooldown elapses. Mirrors the codex/nous self-heal pattern."""
|
|
from agent.credential_pool import load_pool, STATUS_EXHAUSTED
|
|
from dataclasses import replace
|
|
|
|
hermes_home = tmp_path / "hermes"
|
|
stale_at = _jwt_with_exp(int(time.time()) + 3600)
|
|
_setup_hermes_auth(hermes_home, access_token=stale_at, refresh_token="rt-stale")
|
|
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
|
|
|
|
pool = load_pool("xai-oauth")
|
|
seeded = pool.entries()[0]
|
|
assert seeded.source == "loopback_pkce"
|
|
|
|
# Park the seeded entry as exhausted with a far-future cooldown so
|
|
# without resync it would never be selectable.
|
|
exhausted = replace(
|
|
seeded,
|
|
last_status=STATUS_EXHAUSTED,
|
|
last_status_at=time.time(),
|
|
last_error_code=401,
|
|
last_error_reset_at=time.time() + 3600, # 1h cooldown
|
|
)
|
|
pool._replace_entry(seeded, exhausted)
|
|
pool._persist()
|
|
assert pool.has_credentials()
|
|
assert not pool.has_available() # cooldown blocks everything
|
|
|
|
# Simulate the user re-running `hermes model` -> xAI Grok OAuth: the
|
|
# singleton now has fresh tokens.
|
|
fresh_at = _jwt_with_exp(int(time.time()) + 7200)
|
|
raw = json.loads((hermes_home / "auth.json").read_text())
|
|
raw["providers"]["xai-oauth"]["tokens"] = {
|
|
"access_token": fresh_at,
|
|
"refresh_token": "rt-fresh",
|
|
"id_token": "",
|
|
"expires_in": 3600,
|
|
"token_type": "Bearer",
|
|
}
|
|
(hermes_home / "auth.json").write_text(json.dumps(raw))
|
|
|
|
# _available_entries must sync from the singleton, lifting the
|
|
# exhausted state for the seeded entry.
|
|
available = pool._available_entries(clear_expired=True, refresh=False)
|
|
assert len(available) == 1
|
|
assert available[0].access_token == fresh_at
|
|
assert available[0].refresh_token == "rt-fresh"
|
|
assert available[0].last_status != STATUS_EXHAUSTED
|
|
|
|
|
|
def test_pool_manual_xai_entry_not_synced_from_singleton(tmp_path, monkeypatch):
|
|
"""Sync from the singleton must apply ONLY to the singleton-seeded
|
|
entry (source='loopback_pkce'). Manually added entries (e.g. via
|
|
``hermes auth add xai-oauth``) own their own refresh-token lifecycle
|
|
and must not be silently overwritten when the user logs in via
|
|
``hermes model``."""
|
|
from agent.credential_pool import load_pool, AUTH_TYPE_OAUTH, PooledCredential
|
|
import uuid
|
|
|
|
hermes_home = tmp_path / "hermes"
|
|
singleton_at = _jwt_with_exp(int(time.time()) + 3600)
|
|
_setup_hermes_auth(hermes_home, access_token=singleton_at, refresh_token="rt-singleton")
|
|
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
|
|
|
|
pool = load_pool("xai-oauth")
|
|
|
|
manual_at_old = _jwt_with_exp(int(time.time()) + 30)
|
|
pool.add_entry(
|
|
PooledCredential(
|
|
provider="xai-oauth",
|
|
id=uuid.uuid4().hex[:6],
|
|
label="manual",
|
|
auth_type=AUTH_TYPE_OAUTH,
|
|
priority=1,
|
|
source="manual:xai_pkce",
|
|
access_token=manual_at_old,
|
|
refresh_token="rt-manual",
|
|
base_url=DEFAULT_XAI_OAUTH_BASE_URL,
|
|
)
|
|
)
|
|
manual_entry = next(e for e in pool.entries() if e.source == "manual:xai_pkce")
|
|
synced = pool._sync_xai_oauth_entry_from_auth_store(manual_entry)
|
|
# Same object — no sync happened.
|
|
assert synced is manual_entry
|
|
assert synced.access_token == manual_at_old
|
|
assert synced.refresh_token == "rt-manual"
|
|
|
|
|
|
def test_pool_manual_entry_does_not_sync_back_to_singleton(tmp_path, monkeypatch):
|
|
"""`hermes auth add xai-oauth` entries (source='manual:xai_pkce') are
|
|
independent credentials and must NOT write to the singleton. Sync-back
|
|
is restricted to entries seeded from the singleton. Otherwise adding a
|
|
second pool credential would silently overwrite the user's main login."""
|
|
from agent.credential_pool import load_pool, AUTH_TYPE_OAUTH, PooledCredential
|
|
import uuid
|
|
|
|
hermes_home = tmp_path / "hermes"
|
|
# Singleton has its own tokens (separate login).
|
|
singleton_at = _jwt_with_exp(int(time.time()) + 3600)
|
|
_setup_hermes_auth(hermes_home, access_token=singleton_at, refresh_token="rt-singleton")
|
|
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
|
|
|
|
manual_at_old = _jwt_with_exp(int(time.time()) + 30)
|
|
manual_at_new = _jwt_with_exp(int(time.time()) + 7200)
|
|
|
|
def _fake_refresh(access_token, refresh_token, **kwargs):
|
|
assert refresh_token == "rt-manual"
|
|
return {
|
|
"access_token": manual_at_new,
|
|
"refresh_token": "rt-manual-new",
|
|
"id_token": "",
|
|
"expires_in": 3600,
|
|
"token_type": "Bearer",
|
|
"last_refresh": "2026-05-15T04:00:00Z",
|
|
}
|
|
|
|
monkeypatch.setattr("hermes_cli.auth.refresh_xai_oauth_pure", _fake_refresh)
|
|
|
|
pool = load_pool("xai-oauth")
|
|
pool.add_entry(
|
|
PooledCredential(
|
|
provider="xai-oauth",
|
|
id=uuid.uuid4().hex[:6],
|
|
label="manual",
|
|
auth_type=AUTH_TYPE_OAUTH,
|
|
priority=0,
|
|
source="manual:xai_pkce",
|
|
access_token=manual_at_old,
|
|
refresh_token="rt-manual",
|
|
base_url=DEFAULT_XAI_OAUTH_BASE_URL,
|
|
)
|
|
)
|
|
# Refresh the manual entry — singleton must be left alone.
|
|
manual_entries = [e for e in pool.entries() if e.source == "manual:xai_pkce"]
|
|
assert len(manual_entries) == 1
|
|
pool._refresh_entry(manual_entries[0], force=True)
|
|
|
|
raw = json.loads((hermes_home / "auth.json").read_text())
|
|
tokens = raw["providers"]["xai-oauth"]["tokens"]
|
|
# Singleton must be untouched — manual refresh shouldn't leak across.
|
|
assert tokens["access_token"] == singleton_at
|
|
assert tokens["refresh_token"] == "rt-singleton"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Auxiliary client routing
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_auxiliary_client_routes_xai_oauth_through_responses_api(tmp_path, monkeypatch):
|
|
"""Without explicit xai-oauth handling in ``resolve_provider_client``, an
|
|
xai-oauth main provider falls through to the generic ``oauth_external``
|
|
arm and returns ``(None, None)`` — silently re-routing every auxiliary
|
|
task (compression, curator, web extract, session search, ...) to
|
|
whatever Step-2 fallback chain the user has configured (OpenRouter,
|
|
Nous, etc.). Users on xAI Grok OAuth would then see surprise charges
|
|
on those side providers for side tasks they thought were running on
|
|
their xAI subscription.
|
|
|
|
Pin the routing contract: ``resolve_provider_client("xai-oauth", model)``
|
|
must return a non-None client wrapping the xAI Responses API."""
|
|
from agent.auxiliary_client import (
|
|
CodexAuxiliaryClient,
|
|
resolve_provider_client,
|
|
)
|
|
|
|
hermes_home = tmp_path / "hermes"
|
|
fresh = _jwt_with_exp(int(time.time()) + 3600)
|
|
_setup_hermes_auth(hermes_home, access_token=fresh)
|
|
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
|
|
monkeypatch.delenv("HERMES_XAI_BASE_URL", raising=False)
|
|
monkeypatch.delenv("XAI_BASE_URL", raising=False)
|
|
|
|
client, model = resolve_provider_client("xai-oauth", model="grok-4")
|
|
assert client is not None, (
|
|
"xai-oauth must route to a Responses-API client; falling through to "
|
|
"the generic oauth_external branch silently swaps providers for "
|
|
"every auxiliary task."
|
|
)
|
|
assert isinstance(client, CodexAuxiliaryClient)
|
|
assert model == "grok-4"
|
|
# The wrapper preserves base_url + api_key so async wrappers and cache
|
|
# eviction can introspect them. Pin both to the live xAI runtime.
|
|
assert str(client.base_url).rstrip("/") == DEFAULT_XAI_OAUTH_BASE_URL
|
|
assert client.api_key == fresh
|
|
|
|
|
|
def test_auxiliary_client_xai_oauth_returns_none_when_unauthenticated(tmp_path, monkeypatch):
|
|
"""No xAI OAuth tokens in the auth store → ``resolve_provider_client``
|
|
must return ``(None, None)`` so ``_resolve_auto`` falls through to the
|
|
next provider in the chain instead of crashing or constructing a
|
|
misconfigured client."""
|
|
from agent.auxiliary_client import resolve_provider_client
|
|
|
|
hermes_home = tmp_path / "hermes"
|
|
hermes_home.mkdir(parents=True, exist_ok=True)
|
|
(hermes_home / "auth.json").write_text(json.dumps({"version": 1, "providers": {}}))
|
|
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
|
|
|
|
client, model = resolve_provider_client("xai-oauth", model="grok-4")
|
|
assert client is None
|
|
assert model is None
|
|
|
|
|
|
def test_auxiliary_client_xai_oauth_requires_explicit_model(tmp_path, monkeypatch):
|
|
"""xAI's Responses API has no safe "cheap aux model" default —
|
|
pinning one would silently rot the same way Codex's did. Callers
|
|
must pass an explicit model (auxiliary.<task>.model in config.yaml)."""
|
|
from agent.auxiliary_client import resolve_provider_client
|
|
|
|
hermes_home = tmp_path / "hermes"
|
|
fresh = _jwt_with_exp(int(time.time()) + 3600)
|
|
_setup_hermes_auth(hermes_home, access_token=fresh)
|
|
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
|
|
|
|
client, model = resolve_provider_client("xai-oauth", model=None)
|
|
assert client is None
|
|
assert model is None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# active_provider preservation on pool sync-back
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_pool_sync_back_preserves_active_provider(tmp_path, monkeypatch):
|
|
"""A token-rotation sync-back is a side effect of refresh, not the user
|
|
picking a provider. ``_save_provider_state`` flips ``active_provider``;
|
|
using it on the sync-back path means every xAI/Codex/Nous refresh in a
|
|
multi-provider setup silently overrides the user's chosen active
|
|
provider (visible to ``hermes auth status``, ``hermes setup``, and the
|
|
``hermes`` no-arg dispatcher). Pin the ``set_active=False`` contract so
|
|
no future refactor regresses to the legacy semantic."""
|
|
from agent.credential_pool import load_pool
|
|
|
|
hermes_home = tmp_path / "hermes"
|
|
near_expiry = _jwt_with_exp(int(time.time()) + 30)
|
|
_setup_hermes_auth(hermes_home, access_token=near_expiry, refresh_token="rt-xai")
|
|
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
|
|
|
|
# Simulate a multi-provider user whose actual chosen provider is
|
|
# OpenRouter — xai-oauth tokens exist in the singleton but are NOT
|
|
# the active provider.
|
|
raw = json.loads((hermes_home / "auth.json").read_text())
|
|
raw["active_provider"] = "openrouter"
|
|
(hermes_home / "auth.json").write_text(json.dumps(raw))
|
|
|
|
new_access = _jwt_with_exp(int(time.time()) + 3600)
|
|
|
|
def _fake_refresh(access_token, refresh_token, **kwargs):
|
|
return {
|
|
"access_token": new_access,
|
|
"refresh_token": "rt-rotated",
|
|
"id_token": "",
|
|
"expires_in": 3600,
|
|
"token_type": "Bearer",
|
|
"last_refresh": "2026-05-15T10:00:00Z",
|
|
}
|
|
|
|
monkeypatch.setattr("hermes_cli.auth.refresh_xai_oauth_pure", _fake_refresh)
|
|
|
|
pool = load_pool("xai-oauth")
|
|
selected = pool.select()
|
|
assert selected is not None
|
|
assert selected.access_token == new_access
|
|
|
|
# The refresh wrote new tokens back into the singleton — the user's
|
|
# prior ``active_provider`` choice (openrouter) MUST survive.
|
|
raw_after = json.loads((hermes_home / "auth.json").read_text())
|
|
assert raw_after["active_provider"] == "openrouter", (
|
|
"pool sync-back must not flip active_provider; otherwise xAI/Codex/"
|
|
"Nous token rotations silently take over multi-provider users' "
|
|
"auth.json `active_provider` flag."
|
|
)
|
|
# Tokens were actually written so the next process won't replay the
|
|
# consumed refresh_token (preserves the original sync-back fix).
|
|
state = raw_after["providers"]["xai-oauth"]["tokens"]
|
|
assert state["access_token"] == new_access
|
|
assert state["refresh_token"] == "rt-rotated"
|