refactor(auth): Disable Nous legacy session key fallback

This commit is contained in:
Robin Fernandes 2026-05-28 18:30:37 +10:00 committed by kshitij
parent a87f0a82a5
commit 41ff6e5937
17 changed files with 484 additions and 860 deletions

View file

@ -1243,8 +1243,30 @@ def _read_nous_auth() -> Optional[dict]:
def _nous_api_key(provider: dict) -> str:
"""Extract the Nous runtime credential from the compatibility field."""
return provider.get("agent_key") or provider.get("access_token", "")
"""Extract a usable Nous inference JWT from stored auth state."""
try:
from hermes_cli.auth import _nous_invoke_jwt_is_usable
except Exception:
_nous_invoke_jwt_is_usable = None
for token_key, expiry_key in (
("agent_key", "agent_key_expires_at"),
("access_token", "expires_at"),
):
token = provider.get(token_key)
if not isinstance(token, str) or not token.strip():
continue
if _nous_invoke_jwt_is_usable is None:
if token.count(".") == 2:
return token
continue
if _nous_invoke_jwt_is_usable(
token,
scope=provider.get("scope"),
expires_at=provider.get(expiry_key),
):
return token
return ""
def _nous_base_url() -> str:
@ -1256,25 +1278,21 @@ def _resolve_nous_runtime_api(*, force_refresh: bool = False) -> Optional[tuple[
"""Return fresh Nous runtime credentials when available.
This mirrors the main agent's 401 recovery path and keeps auxiliary
clients aligned with the singleton auth store + JWT/mint flow instead of
clients aligned with the singleton auth store + JWT refresh flow instead of
relying only on whatever raw tokens happen to be sitting in auth.json
or the credential pool.
"""
try:
from hermes_cli.auth import (
NOUS_INFERENCE_AUTH_MODE_AUTO,
NOUS_INFERENCE_AUTH_MODE_LEGACY,
resolve_nous_runtime_credentials,
)
creds = resolve_nous_runtime_credentials(
min_key_ttl_seconds=max(60, int(os.getenv("HERMES_NOUS_MIN_KEY_TTL_SECONDS", "1800"))),
timeout_seconds=float(os.getenv("HERMES_NOUS_TIMEOUT_SECONDS", "15")),
inference_auth_mode=(
NOUS_INFERENCE_AUTH_MODE_LEGACY
if force_refresh
else NOUS_INFERENCE_AUTH_MODE_AUTO
),
inference_auth_mode=NOUS_INFERENCE_AUTH_MODE_AUTO,
force_refresh=force_refresh,
)
except Exception as exc:
logger.debug("Auxiliary Nous runtime credential resolution failed: %s", exc)
@ -1558,13 +1576,9 @@ def _try_nous(vision: bool = False) -> Tuple[Optional[OpenAI], Optional[str]]:
_mark_provider_unhealthy("nous", ttl=60)
return None, None
if runtime is None and nous:
# Runtime credential mint failed but stored Nous auth is still present.
# Falls back to the raw stored token below; surface a debug line so
# operators investigating expired/invalid sessions have a breadcrumb,
# without blocking the fallback path the rest of this function relies on.
logger.debug(
"Auxiliary Nous: runtime credential mint failed; falling back to "
"stored auth.json token."
"Auxiliary Nous: runtime JWT refresh failed; checking stored "
"auth.json token."
)
global auxiliary_is_nous
auxiliary_is_nous = True
@ -1602,6 +1616,13 @@ def _try_nous(vision: bool = False) -> Tuple[Optional[OpenAI], Optional[str]]:
api_key, base_url = runtime
else:
api_key = _nous_api_key(nous or {})
if not api_key:
logger.warning(
"Auxiliary Nous client unavailable: no usable inference JWT found "
"(run: hermes auth add nous)."
)
_mark_provider_unhealthy("nous", ttl=60)
return None, None
base_url = str((nous or {}).get("inference_base_url") or _nous_base_url()).rstrip("/")
return (
OpenAI(
@ -2725,15 +2746,12 @@ def _refresh_provider_credentials(provider: str) -> bool:
_evict_cached_clients(normalized)
return True
if normalized == "nous":
from hermes_cli.auth import (
NOUS_INFERENCE_AUTH_MODE_LEGACY,
resolve_nous_runtime_credentials,
)
from hermes_cli.auth import resolve_nous_runtime_credentials
creds = resolve_nous_runtime_credentials(
min_key_ttl_seconds=max(60, int(os.getenv("HERMES_NOUS_MIN_KEY_TTL_SECONDS", "1800"))),
timeout_seconds=float(os.getenv("HERMES_NOUS_TIMEOUT_SECONDS", "15")),
inference_auth_mode=NOUS_INFERENCE_AUTH_MODE_LEGACY,
force_refresh=True,
)
if not str(creds.get("api_key", "") or "").strip():
return False

View file

@ -203,15 +203,13 @@ def _print_billing_or_entitlement_guidance(
def _try_refresh_nous_paid_entitlement_credentials(agent) -> bool:
"""Refresh Nous runtime credentials after a fresh paid-entitlement check."""
try:
from hermes_cli.auth import NOUS_INFERENCE_AUTH_MODE_LEGACY
from hermes_cli.nous_account import get_nous_portal_account_info
account_info = get_nous_portal_account_info(force_fresh=True)
if account_info.paid_service_access is not True:
return False
return agent._try_refresh_nous_client_credentials(
force=False,
inference_auth_mode=NOUS_INFERENCE_AUTH_MODE_LEGACY,
force=True,
)
except Exception:
return False

View file

@ -203,7 +203,7 @@ class PooledCredential:
def runtime_api_key(self) -> str:
if self.provider == "nous":
# Nous stores the runtime inference credential in agent_key for
# compatibility. It may be a NAS invoke JWT or legacy opaque key.
# compatibility. It must be a NAS invoke JWT.
return str(self.agent_key or self.access_token or "")
return str(self.access_token or "")
@ -919,11 +919,8 @@ class CredentialPool:
entry = synced
auth_mod.resolve_nous_runtime_credentials(
min_key_ttl_seconds=DEFAULT_AGENT_KEY_MIN_TTL_SECONDS,
inference_auth_mode=(
auth_mod.NOUS_INFERENCE_AUTH_MODE_LEGACY
if force
else auth_mod.NOUS_INFERENCE_AUTH_MODE_AUTO
),
inference_auth_mode=auth_mod.NOUS_INFERENCE_AUTH_MODE_AUTO,
force_refresh=force,
)
updated = self._sync_nous_entry_from_auth_store(entry)
else:
@ -1205,7 +1202,7 @@ class CredentialPool:
auth_mod.XAI_ACCESS_TOKEN_REFRESH_SKEW_SECONDS,
)
if self.provider == "nous":
# Nous refresh/mint can require network access and should happen when
# Nous refresh can require network access and should happen when
# runtime credentials are actually resolved, not merely when the pool
# is enumerated for listing, migration, or selection.
return False
@ -1748,9 +1745,9 @@ def _seed_from_singletons(provider: str, entries: List[PooledCredential]) -> Tup
"inference_base_url": state.get("inference_base_url"),
"agent_key": state.get("agent_key"),
"agent_key_expires_at": state.get("agent_key_expires_at"),
# Carry the mint/refresh timestamps into the pool so
# Carry the refresh timestamps into the pool so
# freshness-sensitive consumers (self-heal hooks, pool
# pruning by age) can distinguish just-minted credentials
# pruning by age) can distinguish just-refreshed credentials
# from stale ones. Without these, fresh device_code
# entries get obtained_at=None and look older than they
# are (#15099).

View file

@ -9,14 +9,11 @@ Architecture:
- ProviderConfig registry defines known OAuth providers
- Auth store (auth.json) holds per-provider credential state
- resolve_provider() picks the active provider via priority chain
- resolve_*_runtime_credentials() handles token refresh and key minting
- resolve_*_runtime_credentials() handles token refresh and runtime keys
- logout_command() is the CLI entry point for clearing auth
Nous authentication paths:
- Invoke JWT (preferred): use a scoped access_token directly for inference.
- Legacy session key (fallback): mint an opaque 24h key when JWT auth is
unavailable, or when HERMES_AGENT_USE_LEGACY_SESSION_KEYS is set for
debugging or rollback.
"""
from __future__ import annotations
@ -73,22 +70,16 @@ AUTH_LOCK_TIMEOUT_SECONDS = 15.0
DEFAULT_NOUS_PORTAL_URL = "https://portal.nousresearch.com"
DEFAULT_NOUS_INFERENCE_URL = "https://inference-api.nousresearch.com/v1"
DEFAULT_NOUS_CLIENT_ID = "hermes-cli"
NOUS_LEGACY_AGENT_KEY_SCOPE = "inference:mint_agent_key"
NOUS_INFERENCE_INVOKE_SCOPE = "inference:invoke"
DEFAULT_NOUS_SCOPE = f"{NOUS_INFERENCE_INVOKE_SCOPE} {NOUS_LEGACY_AGENT_KEY_SCOPE}"
NOUS_LEGACY_SESSION_KEYS_ENV = "HERMES_AGENT_USE_LEGACY_SESSION_KEYS"
DEFAULT_NOUS_SCOPE = NOUS_INFERENCE_INVOKE_SCOPE
NOUS_DEVICE_CODE_SOURCE = "device_code"
NOUS_INFERENCE_AUTH_MODE_AUTO = "auto"
NOUS_INFERENCE_AUTH_MODE_FRESH = "fresh"
NOUS_INFERENCE_AUTH_MODE_LEGACY = "legacy"
NOUS_INFERENCE_AUTH_MODES = frozenset({
NOUS_INFERENCE_AUTH_MODE_AUTO,
NOUS_INFERENCE_AUTH_MODE_FRESH,
NOUS_INFERENCE_AUTH_MODE_LEGACY,
})
NOUS_AUTH_PATH_INVOKE_JWT = "invoke_jwt"
NOUS_AUTH_PATH_LEGACY_SESSION_KEY_CACHE = "legacy_session_key_cache"
NOUS_AUTH_PATH_LEGACY_SESSION_KEY_MINT = "legacy_session_key_mint"
DEFAULT_AGENT_KEY_MIN_TTL_SECONDS = 30 * 60 # 30 minutes
ACCESS_TOKEN_REFRESH_SKEW_SECONDS = 120 # refresh 2 min before expiry
NOUS_INVOKE_JWT_MIN_TTL_SECONDS = ACCESS_TOKEN_REFRESH_SKEW_SECONDS
@ -1653,12 +1644,11 @@ def _optional_base_url(value: Any) -> Optional[str]:
return cleaned if cleaned else None
# Allowlist of hosts the Nous Portal proxy is willing to forward minted
# bearer tokens to. The bearer is a long-lived agent_key minted by
# portal.nousresearch.com — sending it anywhere else would leak it.
# Allowlist of hosts the Nous Portal proxy is willing to forward inference
# JWTs to. Sending a bearer anywhere else would leak it.
#
# This is consulted only for URLs coming from the NETWORK side (Portal
# refresh / agent-key-mint responses). User-controlled env-var overrides
# refresh responses). User-controlled env-var overrides
# (NOUS_INFERENCE_BASE_URL) bypass validation — that's the documented
# dev/staging escape hatch and the env source is already trusted (the
# user set it themselves).
@ -1676,10 +1666,10 @@ def _validate_nous_inference_url_from_network(url: Optional[str]) -> Optional[st
unexpected host letting the caller fall back to the configured
default rather than persist or forward a poisoned value.
Defense-in-depth: a compromised refresh / mint response from the
Portal API (MITM, malicious response injection) could otherwise
redirect every subsequent proxy request bearing the user's
legitimately-minted agent_key to an attacker-controlled endpoint.
Defense-in-depth: a compromised refresh response from the Portal API
(MITM, malicious response injection) could otherwise redirect every
subsequent proxy request bearing the user's inference JWT — to an
attacker-controlled endpoint.
Validating scheme + host at the source closes that loop before the
poisoned URL ever lands in ``auth.json``.
@ -1743,14 +1733,6 @@ def _scope_values(raw_scope: Any) -> set[str]:
return scopes
def _nous_legacy_session_keys_forced() -> bool:
return is_truthy_value(os.getenv(NOUS_LEGACY_SESSION_KEYS_ENV), default=False)
def _nous_scope_has_invoke(raw_scope: Any) -> bool:
return NOUS_INFERENCE_INVOKE_SCOPE in _scope_values(raw_scope)
def _normalize_nous_inference_auth_mode(inference_auth_mode: Optional[str]) -> str:
mode = str(inference_auth_mode or NOUS_INFERENCE_AUTH_MODE_AUTO).strip().lower()
if mode not in NOUS_INFERENCE_AUTH_MODES:
@ -1809,23 +1791,6 @@ def _nous_invoke_jwt_is_usable(
)
def _nous_legacy_session_key_reason(
token: Any,
*,
scope: Any = None,
expires_at: Any = None,
inference_auth_mode: str = NOUS_INFERENCE_AUTH_MODE_AUTO,
) -> str:
if inference_auth_mode == NOUS_INFERENCE_AUTH_MODE_LEGACY:
return "forced_legacy_session_key"
if _nous_legacy_session_keys_forced():
return "forced_legacy_session_keys"
return (
_nous_invoke_jwt_status(token, scope=scope, expires_at=expires_at)
or "invoke_jwt_unavailable"
)
def _choose_nous_inference_auth_path(
state: Dict[str, Any],
*,
@ -1833,34 +1798,29 @@ def _choose_nous_inference_auth_path(
min_key_ttl_seconds: int = DEFAULT_AGENT_KEY_MIN_TTL_SECONDS,
inference_auth_mode: str = NOUS_INFERENCE_AUTH_MODE_AUTO,
) -> Tuple[str, Optional[str]]:
inference_auth_mode = _normalize_nous_inference_auth_mode(inference_auth_mode)
del min_key_ttl_seconds
_normalize_nous_inference_auth_mode(inference_auth_mode)
token = state.get("access_token") if access_token is None else access_token
if (
not _nous_legacy_session_keys_forced()
and inference_auth_mode != NOUS_INFERENCE_AUTH_MODE_LEGACY
and _nous_invoke_jwt_is_usable(
token,
scope=state.get("scope"),
expires_at=state.get("expires_at"),
)
if _nous_invoke_jwt_is_usable(
token,
scope=state.get("scope"),
expires_at=state.get("expires_at"),
):
return NOUS_AUTH_PATH_INVOKE_JWT, None
if (
inference_auth_mode == NOUS_INFERENCE_AUTH_MODE_AUTO
and _agent_key_is_usable(
state,
max(60, int(min_key_ttl_seconds)),
)
):
return NOUS_AUTH_PATH_LEGACY_SESSION_KEY_CACHE, None
return (
NOUS_AUTH_PATH_LEGACY_SESSION_KEY_MINT,
_nous_legacy_session_key_reason(
reason = (
_nous_invoke_jwt_status(
token,
scope=state.get("scope"),
expires_at=state.get("expires_at"),
inference_auth_mode=inference_auth_mode,
),
)
or "invoke_jwt_unavailable"
)
raise AuthError(
"Nous Portal access token is not a usable inference JWT "
f"({reason}). Re-authenticate with: hermes auth add nous",
provider="nous",
code=reason,
relogin_required=True,
)
@ -1877,24 +1837,6 @@ def _log_nous_invoke_jwt_selected(
)
def _log_nous_legacy_session_key_selected(
reason: str,
*,
access_token: Any,
sequence_id: Optional[str] = None,
) -> None:
logger.info(
"Nous inference auth: using legacy session key path (%s)",
reason,
)
_oauth_trace(
"nous_legacy_session_key_selected",
sequence_id=sequence_id,
reason=reason,
access_token_fp=_token_fingerprint(access_token),
)
def _nous_jwt_expires_at(token: Any, fallback_expires_at: Any = None) -> Optional[str]:
claims = _decode_jwt_claims(token)
exp = claims.get("exp")
@ -4304,85 +4246,6 @@ def _request_device_code(
return data
def _is_nous_invoke_scope_refusal(exc: Exception) -> bool:
if not isinstance(exc, httpx.HTTPStatusError):
return False
response = exc.response
if response.status_code not in {400, 401, 403}:
return False
try:
payload = response.json()
except Exception:
payload = {}
text = " ".join(
str(value)
for value in (
payload.get("error") if isinstance(payload, dict) else None,
payload.get("error_description") if isinstance(payload, dict) else None,
response.text,
)
if value
).lower()
if not text:
return False
return (
"invalid_scope" in text
or "unsupported_scope" in text
or "scope" in text and NOUS_INFERENCE_INVOKE_SCOPE in text
)
def _nous_device_scope_with_env_override(
requested_scope: Optional[str],
*,
default_scope: str = DEFAULT_NOUS_SCOPE,
) -> Tuple[str, bool]:
explicit_scope = requested_scope is not None
scope = requested_scope or default_scope
if _nous_legacy_session_keys_forced():
scope = NOUS_LEGACY_AGENT_KEY_SCOPE
return scope, explicit_scope
def _request_nous_device_code_with_scope_fallback(
*,
client: httpx.Client,
portal_base_url: str,
client_id: str,
scope: str,
allow_legacy_fallback: bool,
) -> Tuple[Dict[str, Any], str]:
try:
return (
_request_device_code(
client=client,
portal_base_url=portal_base_url,
client_id=client_id,
scope=scope,
),
scope,
)
except Exception as exc:
if (
allow_legacy_fallback
and _nous_scope_has_invoke(scope)
and _is_nous_invoke_scope_refusal(exc)
):
logger.info("Nous inference auth: NAS refused invoke scope, retrying legacy scope")
_oauth_trace("nous_device_code_invoke_scope_refused")
retry_scope = NOUS_LEGACY_AGENT_KEY_SCOPE
return (
_request_device_code(
client=client,
portal_base_url=portal_base_url,
client_id=client_id,
scope=retry_scope,
),
retry_scope,
)
raise
def _poll_for_token(
client: httpx.Client,
portal_base_url: str,
@ -4433,7 +4296,7 @@ def _poll_for_token(
# =============================================================================
# Nous Portal — token refresh, agent key minting, model discovery
# Nous Portal — token refresh and model discovery
# =============================================================================
# -----------------------------------------------------------------------------
@ -4512,9 +4375,9 @@ def _nous_shared_store_lock(timeout_seconds: float = AUTH_LOCK_TIMEOUT_SECONDS):
to be held, acquire ``_auth_store_lock`` FIRST. All runtime refresh
paths follow this order. The one exception is
``_try_import_shared_nous_state``, which holds this lock alone for
the entire refresh+mint cycle so concurrent imports on sibling
profiles can't race on the single-use shared refresh token; that
helper must NOT be called with ``_auth_store_lock`` already held.
the entire refresh cycle so concurrent imports on sibling profiles
can't race on the single-use shared refresh token; that helper must
NOT be called with ``_auth_store_lock`` already held.
"""
try:
lock_path = _nous_shared_store_path().with_suffix(".lock")
@ -4574,9 +4437,8 @@ def _write_shared_nous_state(state: Dict[str, Any]) -> None:
is a convenience layer; the per-profile auth.json remains the source
of truth.
We deliberately omit the runtime ``agent_key`` compatibility field
(either an invoke JWT or legacy opaque session key) only OAuth tokens
are cross-profile useful.
We deliberately omit the runtime ``agent_key`` compatibility field;
the OAuth tokens are the cross-profile source of truth.
"""
refresh_token = state.get("refresh_token")
access_token = state.get("access_token")
@ -4802,9 +4664,9 @@ def _try_import_shared_nous_state(
) -> Optional[Dict[str, Any]]:
"""Attempt to rehydrate Nous OAuth state from the shared store.
Reads the shared file (if present), runs a forced refresh+mint using
the stored refresh_token to produce a fresh access_token + agent_key
scoped to this profile, and returns the full auth_state dict ready
Reads the shared file (if present), runs a forced refresh using the
stored refresh_token to produce a fresh inference JWT scoped to this
profile, and returns the full auth_state dict ready
for ``persist_nous_credentials()``.
Returns ``None`` when no shared state is available or the rehydrate
@ -4820,7 +4682,7 @@ def _try_import_shared_nous_state(
# Build a full state dict so refresh_nous_oauth_from_state has every
# field it needs. force_refresh=True gets us a fresh access_token
# for this profile; fresh auth mode avoids stale cached legacy keys.
# for this profile.
state: Dict[str, Any] = {
"access_token": shared.get("access_token"),
"refresh_token": shared.get("refresh_token"),
@ -4927,39 +4789,6 @@ def _refresh_access_token(
raise AuthError(description, provider="nous", code=code, relogin_required=relogin)
def _mint_agent_key(
*,
client: httpx.Client,
portal_base_url: str,
access_token: str,
min_ttl_seconds: int,
) -> Dict[str, Any]:
"""Mint (or reuse) a short-lived inference API key."""
response = client.post(
f"{portal_base_url}/api/oauth/agent-key",
headers={"Authorization": f"Bearer {access_token}"},
json={"min_ttl_seconds": max(60, int(min_ttl_seconds))},
)
if response.status_code == 200:
payload = response.json()
if "api_key" not in payload:
raise AuthError("Mint response missing api_key",
provider="nous", code="server_error")
return payload
try:
error_payload = response.json()
except Exception as exc:
raise AuthError("Agent key mint request failed",
provider="nous", code="server_error") from exc
code = str(error_payload.get("error", "server_error"))
description = str(error_payload.get("error_description") or "Agent key mint request failed")
relogin = code in {"invalid_token", "invalid_grant"}
raise AuthError(description, provider="nous", code=code, relogin_required=relogin)
def fetch_nous_models(
*,
inference_base_url: str,
@ -5021,15 +4850,12 @@ def _agent_key_is_usable(state: Dict[str, Any], min_ttl_seconds: int) -> bool:
key = state.get("agent_key")
if not isinstance(key, str) or not key.strip():
return False
if _decode_jwt_claims(key):
if _nous_legacy_session_keys_forced():
return False
return _nous_invoke_jwt_is_usable(
key,
scope=state.get("scope"),
expires_at=state.get("agent_key_expires_at"),
)
return not _is_expiring(state.get("agent_key_expires_at"), min_ttl_seconds)
return _nous_invoke_jwt_is_usable(
key,
scope=state.get("scope"),
expires_at=state.get("agent_key_expires_at"),
min_ttl_seconds=max(0, int(min_ttl_seconds)),
)
def resolve_nous_access_token(
@ -5160,11 +4986,11 @@ def refresh_nous_oauth_pure(
) -> Dict[str, Any]:
"""Refresh Nous OAuth state without mutating auth.json directly.
``on_state_update`` is called after a successful access-token refresh and
before any subsequent agent-key mint. Callers that own persistent state can
use it to save the newly rotated refresh token before later work can fail.
``on_state_update`` is called after a successful access-token refresh.
Callers that own persistent state can use it to save the newly rotated
refresh token before later validation can fail.
"""
inference_auth_mode = _normalize_nous_inference_auth_mode(inference_auth_mode)
_normalize_nous_inference_auth_mode(inference_auth_mode)
state: Dict[str, Any] = {
"access_token": access_token,
"refresh_token": refresh_token,
@ -5186,33 +5012,39 @@ def refresh_nous_oauth_pure(
timeout = httpx.Timeout(timeout_seconds if timeout_seconds else 15.0)
with httpx.Client(timeout=timeout, headers={"Accept": "application/json"}, verify=verify) as client:
min_agent_key_ttl = max(60, int(min_key_ttl_seconds))
legacy_session_keys = _nous_legacy_session_keys_forced()
current_invoke_jwt_usable = (
not legacy_session_keys
and _nous_invoke_jwt_is_usable(
state.get("access_token"),
scope=state.get("scope"),
expires_at=state.get("expires_at"),
)
del min_key_ttl_seconds
current_invoke_jwt_status = _nous_invoke_jwt_status(
state.get("access_token"),
scope=state.get("scope"),
expires_at=state.get("expires_at"),
)
if (
force_refresh
or (
_is_expiring(state.get("expires_at"), ACCESS_TOKEN_REFRESH_SKEW_SECONDS)
and not current_invoke_jwt_usable
)
):
if force_refresh or current_invoke_jwt_status is not None:
refresh_token_value = state.get("refresh_token")
if not isinstance(refresh_token_value, str) or not refresh_token_value:
if current_invoke_jwt_status is not None:
raise AuthError(
"Nous Portal access token is not a usable inference JWT "
f"({current_invoke_jwt_status}) and no refresh token is available. "
"Re-authenticate with: hermes auth add nous",
provider="nous",
code=current_invoke_jwt_status,
relogin_required=True,
)
raise AuthError(
"No refresh token is available for Nous Portal.",
provider="nous",
relogin_required=True,
)
refreshed = _refresh_access_token(
client=client,
portal_base_url=state["portal_base_url"],
client_id=state["client_id"],
refresh_token=state["refresh_token"],
refresh_token=refresh_token_value,
)
now = datetime.now(timezone.utc)
access_ttl = _coerce_ttl_seconds(refreshed.get("expires_in"))
state["access_token"] = refreshed["access_token"]
state["refresh_token"] = refreshed.get("refresh_token") or state["refresh_token"]
state["refresh_token"] = refreshed.get("refresh_token") or refresh_token_value
state["token_type"] = refreshed.get("token_type") or state.get("token_type") or "Bearer"
state["scope"] = refreshed.get("scope") or state.get("scope")
refreshed_url = _validate_nous_inference_url_from_network(refreshed.get("inference_base_url"))
@ -5226,34 +5058,12 @@ def refresh_nous_oauth_pure(
if on_state_update is not None:
on_state_update(dict(state), "post_refresh_access_token")
selected_auth_path, fallback_reason = _choose_nous_inference_auth_path(
selected_auth_path, _ = _choose_nous_inference_auth_path(
state,
min_key_ttl_seconds=min_agent_key_ttl,
inference_auth_mode=inference_auth_mode,
)
if selected_auth_path == NOUS_AUTH_PATH_INVOKE_JWT:
_select_nous_invoke_jwt(state)
elif selected_auth_path == NOUS_AUTH_PATH_LEGACY_SESSION_KEY_MINT:
_log_nous_legacy_session_key_selected(
fallback_reason or "legacy_session_key_required",
access_token=state.get("access_token"),
)
mint_payload = _mint_agent_key(
client=client,
portal_base_url=state["portal_base_url"],
access_token=state["access_token"],
min_ttl_seconds=min_key_ttl_seconds,
)
now = datetime.now(timezone.utc)
state["agent_key"] = mint_payload.get("api_key")
state["agent_key_id"] = mint_payload.get("key_id")
state["agent_key_expires_at"] = mint_payload.get("expires_at")
state["agent_key_expires_in"] = mint_payload.get("expires_in")
state["agent_key_reused"] = bool(mint_payload.get("reused", False))
state["agent_key_obtained_at"] = now.isoformat()
minted_url = _validate_nous_inference_url_from_network(mint_payload.get("inference_base_url"))
if minted_url:
state["inference_base_url"] = minted_url
return state
@ -5296,7 +5106,7 @@ def persist_nous_credentials(
*,
label: Optional[str] = None,
):
"""Persist minted Nous OAuth credentials as the singleton provider state
"""Persist Nous OAuth credentials as the singleton provider state
and ensure the credential pool is in sync.
Nous credentials are read at runtime from two independent locations:
@ -5307,7 +5117,7 @@ def persist_nous_credentials(
- ``credential_pool.nous``: used by the runtime ``pool.select()`` path.
Historically ``hermes auth add nous`` wrote a ``manual:device_code`` pool
entry only, skipping ``providers.nous``. When the 24h agent_key TTL
entry only, skipping ``providers.nous``. When the runtime credential
expired, the recovery path read the empty singleton state and raised
``AuthError`` silently (``logger.debug`` at INFO level).
@ -5367,16 +5177,16 @@ def resolve_nous_runtime_credentials(
insecure: Optional[bool] = None,
ca_bundle: Optional[str] = None,
inference_auth_mode: str = NOUS_INFERENCE_AUTH_MODE_AUTO,
force_refresh: bool = False,
) -> Dict[str, Any]:
"""
Resolve Nous inference credentials for runtime use.
Ensures access_token is valid (refreshes if needed) and a short-lived
inference key is present with minimum TTL (mints/reuses as needed).
Concurrent processes coordinate through the auth store file lock.
Ensures access_token is a valid inference-scoped JWT, refreshing it when
needed. Concurrent processes coordinate through the auth store file lock.
Returns dict with: provider, base_url, api_key, key_id, expires_at,
expires_in, source ("invoke_jwt", "cache", or "portal"), and auth_path.
expires_in, source ("invoke_jwt"), and auth_path.
"""
inference_auth_mode = _normalize_nous_inference_auth_mode(inference_auth_mode)
min_key_ttl_seconds = max(60, int(min_key_ttl_seconds))
@ -5456,6 +5266,7 @@ def resolve_nous_runtime_credentials(
refresh_token_fp=_token_fingerprint(state.get("refresh_token")),
)
selected_auth_path = NOUS_AUTH_PATH_INVOKE_JWT
with httpx.Client(timeout=timeout, headers={"Accept": "application/json"}, verify=verify) as client:
access_token = state.get("access_token")
refresh_token = state.get("refresh_token")
@ -5464,43 +5275,40 @@ def resolve_nous_runtime_credentials(
raise AuthError("No access token found for Nous Portal login.",
provider="nous", relogin_required=True)
# Step 1: refresh access token if expiring. If the access token
# is already a valid invoke JWT, trust its own exp claim even when
# older auth.json metadata has a stale/missing expires_at.
current_invoke_jwt_usable = (
not _nous_legacy_session_keys_forced()
and _nous_invoke_jwt_is_usable(
access_token,
scope=state.get("scope"),
expires_at=state.get("expires_at"),
)
invoke_jwt_status = _nous_invoke_jwt_status(
access_token,
scope=state.get("scope"),
expires_at=state.get("expires_at"),
)
if (
_is_expiring(state.get("expires_at"), ACCESS_TOKEN_REFRESH_SKEW_SECONDS)
and not current_invoke_jwt_usable
):
if force_refresh or invoke_jwt_status is not None:
with _nous_shared_store_lock(timeout_seconds=max(timeout_seconds + 5.0, AUTH_LOCK_TIMEOUT_SECONDS)):
if _merge_shared_nous_oauth_state(state):
access_token = state.get("access_token")
refresh_token = state.get("refresh_token")
_persist_state("post_shared_merge_access_expiring")
if (
_is_expiring(state.get("expires_at"), ACCESS_TOKEN_REFRESH_SKEW_SECONDS)
and not _nous_invoke_jwt_is_usable(
invoke_jwt_status = _nous_invoke_jwt_status(
access_token,
scope=state.get("scope"),
expires_at=state.get("expires_at"),
)
):
if not isinstance(refresh_token, str) or not refresh_token:
raise AuthError("Session expired and no refresh token is available.",
provider="nous", relogin_required=True)
_persist_state("post_shared_merge_access_unusable")
if force_refresh or invoke_jwt_status is not None:
if not isinstance(refresh_token, str) or not refresh_token:
reason = invoke_jwt_status or "force_refresh"
raise AuthError(
"Nous Portal access token is not a usable inference JWT "
f"({reason}) and no refresh token is available. "
"Re-authenticate with: hermes auth add nous",
provider="nous",
code=reason,
relogin_required=True,
)
refresh_reason = "force_refresh" if force_refresh else (invoke_jwt_status or "access_unusable")
_oauth_trace(
"refresh_start",
sequence_id=sequence_id,
reason="access_expiring",
reason=refresh_reason,
refresh_token_fp=_token_fingerprint(refresh_token),
)
try:
@ -5542,166 +5350,25 @@ def resolve_nous_runtime_credentials(
_oauth_trace(
"refresh_success",
sequence_id=sequence_id,
reason="access_expiring",
reason=refresh_reason,
previous_refresh_token_fp=_token_fingerprint(previous_refresh_token),
new_refresh_token_fp=_token_fingerprint(refresh_token),
)
# Persist immediately so downstream mint failures cannot drop rotated refresh tokens.
_persist_state("post_refresh_access_expiring")
# Persist immediately so validation failures cannot drop rotated refresh tokens.
_persist_state("post_refresh_access_token")
# Step 2: resolve the compatibility ``agent_key`` field. Preferred
# path stores the NAS invoke JWT there; legacy path mints/reuses
# the opaque session key.
used_cached_key = False
mint_payload: Optional[Dict[str, Any]] = None
selected_auth_path, fallback_reason = _choose_nous_inference_auth_path(
selected_auth_path, _ = _choose_nous_inference_auth_path(
state,
access_token=access_token,
min_key_ttl_seconds=min_key_ttl_seconds,
inference_auth_mode=inference_auth_mode,
)
_select_nous_invoke_jwt(
state,
access_token=access_token,
sequence_id=sequence_id,
)
if selected_auth_path == NOUS_AUTH_PATH_INVOKE_JWT:
_select_nous_invoke_jwt(
state,
access_token=access_token,
sequence_id=sequence_id,
)
elif selected_auth_path == NOUS_AUTH_PATH_LEGACY_SESSION_KEY_CACHE:
used_cached_key = True
logger.info("Nous inference auth: using cached agent_key")
_oauth_trace("agent_key_reuse", sequence_id=sequence_id)
else:
_log_nous_legacy_session_key_selected(
fallback_reason or "legacy_session_key_required",
access_token=access_token,
sequence_id=sequence_id,
)
try:
_oauth_trace(
"mint_start",
sequence_id=sequence_id,
access_token_fp=_token_fingerprint(access_token),
)
mint_payload = _mint_agent_key(
client=client, portal_base_url=portal_base_url,
access_token=access_token, min_ttl_seconds=min_key_ttl_seconds,
)
except AuthError as exc:
_oauth_trace(
"mint_error",
sequence_id=sequence_id,
code=exc.code,
)
# Retry path: access token may be stale server-side despite local checks
latest_refresh_token = state.get("refresh_token")
if (
exc.code in {"invalid_token", "invalid_grant"}
and isinstance(latest_refresh_token, str)
and latest_refresh_token
):
with _nous_shared_store_lock(timeout_seconds=max(timeout_seconds + 5.0, AUTH_LOCK_TIMEOUT_SECONDS)):
if _merge_shared_nous_oauth_state(state):
access_token = state.get("access_token")
latest_refresh_token = state.get("refresh_token")
_persist_state("post_shared_merge_mint_retry")
else:
_oauth_trace(
"refresh_start",
sequence_id=sequence_id,
reason="mint_retry_after_invalid_token",
refresh_token_fp=_token_fingerprint(latest_refresh_token),
)
try:
refreshed = _refresh_access_token(
client=client, portal_base_url=portal_base_url,
client_id=client_id, refresh_token=latest_refresh_token,
)
except AuthError as exc:
if _is_terminal_nous_refresh_error(exc):
_quarantine_nous_oauth_state(
state,
exc,
reason="runtime_mint_retry_refresh_failure",
)
_quarantine_nous_pool_entries(
auth_store,
exc,
reason="runtime_mint_retry_refresh_failure",
)
_persist_state("terminal_runtime_mint_retry_refresh_failure")
raise
now = datetime.now(timezone.utc)
access_ttl = _coerce_ttl_seconds(refreshed.get("expires_in"))
state["access_token"] = refreshed["access_token"]
state["refresh_token"] = refreshed.get("refresh_token") or latest_refresh_token
state["token_type"] = refreshed.get("token_type") or state.get("token_type") or "Bearer"
state["scope"] = refreshed.get("scope") or state.get("scope")
refreshed_url = _validate_nous_inference_url_from_network(refreshed.get("inference_base_url"))
if refreshed_url:
inference_base_url = refreshed_url
state["obtained_at"] = now.isoformat()
state["expires_in"] = access_ttl
state["expires_at"] = datetime.fromtimestamp(
now.timestamp() + access_ttl, tz=timezone.utc
).isoformat()
access_token = state["access_token"]
refresh_token = state["refresh_token"]
_oauth_trace(
"refresh_success",
sequence_id=sequence_id,
reason="mint_retry_after_invalid_token",
previous_refresh_token_fp=_token_fingerprint(latest_refresh_token),
new_refresh_token_fp=_token_fingerprint(refresh_token),
)
# Persist retry refresh immediately for crash safety and cross-process visibility.
_persist_state("post_refresh_mint_retry")
retry_inference_auth_mode = (
NOUS_INFERENCE_AUTH_MODE_LEGACY
if inference_auth_mode == NOUS_INFERENCE_AUTH_MODE_LEGACY
else NOUS_INFERENCE_AUTH_MODE_FRESH
)
retry_auth_path, _ = _choose_nous_inference_auth_path(
state,
access_token=access_token,
min_key_ttl_seconds=min_key_ttl_seconds,
inference_auth_mode=retry_inference_auth_mode,
)
if retry_auth_path == NOUS_AUTH_PATH_INVOKE_JWT:
mint_payload = None
selected_auth_path = NOUS_AUTH_PATH_INVOKE_JWT
_select_nous_invoke_jwt(
state,
access_token=access_token,
sequence_id=sequence_id,
)
else:
mint_payload = _mint_agent_key(
client=client, portal_base_url=portal_base_url,
access_token=access_token, min_ttl_seconds=min_key_ttl_seconds,
)
else:
raise
if mint_payload is not None:
now = datetime.now(timezone.utc)
state["agent_key"] = mint_payload.get("api_key")
state["agent_key_id"] = mint_payload.get("key_id")
state["agent_key_expires_at"] = mint_payload.get("expires_at")
state["agent_key_expires_in"] = mint_payload.get("expires_in")
state["agent_key_reused"] = bool(mint_payload.get("reused", False))
state["agent_key_obtained_at"] = now.isoformat()
minted_url = _validate_nous_inference_url_from_network(mint_payload.get("inference_base_url"))
if minted_url:
inference_base_url = minted_url
_oauth_trace(
"mint_success",
sequence_id=sequence_id,
reused=bool(mint_payload.get("reused", False)),
)
# Persist routing and TLS metadata for non-interactive refresh/mint
# Persist routing and TLS metadata for non-interactive refresh.
state["portal_base_url"] = portal_base_url
state["inference_base_url"] = inference_base_url
state["client_id"] = client_id
@ -5735,11 +5402,7 @@ def resolve_nous_runtime_credentials(
"key_id": state.get("agent_key_id"),
"expires_at": expires_at,
"expires_in": expires_in,
"source": (
NOUS_AUTH_PATH_INVOKE_JWT
if selected_auth_path == NOUS_AUTH_PATH_INVOKE_JWT
else ("cache" if used_cached_key else "portal")
),
"source": NOUS_AUTH_PATH_INVOKE_JWT,
"auth_path": selected_auth_path,
}
@ -5765,8 +5428,7 @@ def _snapshot_nous_pool_status() -> Dict[str, Any]:
"""Best-effort status from the credential pool.
This is a fallback only. The auth-store provider state is the runtime source
of truth because it is what ``resolve_nous_runtime_credentials()`` refreshes
and mints against.
of truth because it is what ``resolve_nous_runtime_credentials()`` refreshes.
"""
try:
from agent.credential_pool import load_pool
@ -5858,7 +5520,7 @@ def get_nous_auth_status() -> Dict[str, Any]:
"""Status snapshot for Nous auth.
Prefer the auth-store provider state, because that is the live source of
truth for refresh + mint operations. When provider state exists, validate it
truth for refresh operations. When provider state exists, validate it
by resolving runtime credentials so revoked refresh sessions do not show up
as a healthy login. If provider state is absent, fall back to the credential
pool for the just-logged-in / not-yet-promoted case.
@ -7719,10 +7381,7 @@ def _nous_device_code_login(
or pconfig.inference_base_url
).rstrip("/")
client_id = client_id or pconfig.client_id
scope, explicit_scope = _nous_device_scope_with_env_override(
scope,
default_scope=pconfig.scope,
)
scope = scope or pconfig.scope
timeout = httpx.Timeout(timeout_seconds)
verify: bool | str = False if insecure else (ca_bundle if ca_bundle else True)
@ -7737,12 +7396,11 @@ def _nous_device_code_login(
print(f"TLS verification: custom CA bundle ({ca_bundle})")
with httpx.Client(timeout=timeout, headers={"Accept": "application/json"}, verify=verify) as client:
device_data, scope = _request_nous_device_code_with_scope_fallback(
device_data = _request_device_code(
client=client,
portal_base_url=portal_base_url,
client_id=client_id,
scope=scope,
allow_legacy_fallback=not explicit_scope,
)
verification_url = str(device_data["verification_uri_complete"])

View file

@ -3004,7 +3004,6 @@ def _model_flow_nous(config, current_model="", args=None):
"""Nous Portal provider: ensure logged in, then pick model."""
from hermes_cli.auth import (
get_provider_auth_state,
NOUS_INFERENCE_AUTH_MODE_LEGACY,
_prompt_model_selection,
_save_model_choice,
_update_config_for_provider,
@ -3107,13 +3106,13 @@ def _model_flow_nous(config, current_model="", args=None):
try:
refreshed_creds = resolve_nous_runtime_credentials(
min_key_ttl_seconds=5 * 60,
inference_auth_mode=NOUS_INFERENCE_AUTH_MODE_LEGACY,
force_refresh=True,
)
if refreshed_creds:
creds = refreshed_creds
except Exception:
# Runtime inference has its own paid-entitlement recovery path; do
# not block model selection if this opportunistic remint fails.
# not block model selection if this opportunistic refresh fails.
pass
# Resolve portal URL early — needed both for upgrade links and for the

View file

@ -69,11 +69,11 @@ class UpstreamAdapter(ABC):
@abstractmethod
def get_credential(self) -> UpstreamCredential:
"""Return a fresh credential, refreshing/minting if necessary.
"""Return a fresh credential, refreshing or rotating if necessary.
Implementations should:
- refresh the access token if it's near expiry
- mint/rotate the upstream bearer key if it's near expiry
- rotate the upstream bearer key if it's near expiry
- persist any refreshed state back to disk
Raises:
@ -90,8 +90,7 @@ class UpstreamAdapter(ABC):
"""Return an alternate credential after an upstream auth failure.
The default is no retry. Providers can override this for one-shot
fallback paths, such as switching from a preferred token type to a
legacy bearer after the upstream rejects the first request.
fallback paths after the upstream rejects the first request.
"""
_ = failed_credential, status_code
return None

View file

@ -1,13 +1,8 @@
"""Nous Portal upstream adapter.
Reads the user's Nous OAuth state from ``~/.hermes/auth.json`` through the
shared runtime resolver, refreshes the access token and resolves the
``agent_key`` compatibility credential when needed, then exposes the upstream
base URL plus bearer for the proxy server to forward to.
The ``agent_key`` field may hold either a NAS invoke JWT or the legacy
opaque session key. The refresh helper handles both see
:func:`hermes_cli.auth.resolve_nous_runtime_credentials`.
shared runtime resolver, validates or refreshes the inference JWT, then exposes
the upstream base URL plus bearer for the proxy server to forward to.
"""
from __future__ import annotations
@ -20,7 +15,6 @@ from hermes_cli.auth import (
AuthError,
DEFAULT_NOUS_INFERENCE_URL,
NOUS_INFERENCE_AUTH_MODE_AUTO,
NOUS_INFERENCE_AUTH_MODE_LEGACY,
_load_auth_store,
_auth_store_lock,
_is_terminal_nous_refresh_error,
@ -72,8 +66,8 @@ class NousPortalAdapter(UpstreamAdapter):
state = self._read_state()
if state is None:
return False
# We need either a usable agent_key OR (refresh_token + access_token)
# to recover. The refresh helper will mint/refresh as needed.
# We need either a usable inference JWT OR (refresh_token + access_token)
# to recover. The refresh helper validates and refreshes as needed.
return bool(
state.get("agent_key")
or (state.get("refresh_token") and state.get("access_token"))
@ -90,14 +84,8 @@ class NousPortalAdapter(UpstreamAdapter):
failed_credential: UpstreamCredential,
status_code: int,
) -> Optional[UpstreamCredential]:
if status_code != 401:
return None
if failed_credential.bearer.count(".") != 2:
return None
logger.info("proxy: Nous upstream rejected bearer; retrying with legacy session key")
return self._get_credential(
inference_auth_mode=NOUS_INFERENCE_AUTH_MODE_LEGACY,
)
_ = failed_credential, status_code
return None
def _get_credential(self, *, inference_auth_mode: str) -> UpstreamCredential:
with self._lock:
@ -131,10 +119,10 @@ class NousPortalAdapter(UpstreamAdapter):
f"Failed to refresh Nous Portal credentials: {exc}"
) from exc
agent_key = refreshed.get("api_key")
if not agent_key:
runtime_key = refreshed.get("api_key")
if not runtime_key:
raise RuntimeError(
"Nous Portal refresh did not return a usable agent_key. "
"Nous Portal refresh did not return a usable inference JWT. "
"Try `hermes auth add nous` to re-authenticate."
)
@ -145,7 +133,7 @@ class NousPortalAdapter(UpstreamAdapter):
base_url = base_url.rstrip("/")
return UpstreamCredential(
bearer=agent_key,
bearer=runtime_key,
base_url=base_url,
expires_at=refreshed.get("expires_at"),
)

View file

@ -1115,10 +1115,17 @@ def _resolve_explicit_runtime(
explicit_base_url
or str(state.get("inference_base_url") or auth_mod.DEFAULT_NOUS_INFERENCE_URL).strip().rstrip("/")
)
# Only use the agent_key compatibility field for inference. It may be
# either a NAS invoke JWT or a legacy opaque session key; raw OAuth
# access_token fallback is handled by resolve_nous_runtime_credentials().
api_key = explicit_api_key or str(state.get("agent_key") or "").strip()
# Only use the agent_key compatibility field for inference when it
# contains a NAS invoke JWT; raw OAuth access_token fallback is handled
# by resolve_nous_runtime_credentials().
api_key = explicit_api_key or (
str(state.get("agent_key") or "").strip()
if _agent_key_is_usable(
state,
max(60, int(os.getenv("HERMES_NOUS_MIN_KEY_TTL_SECONDS", "1800"))),
)
else ""
)
expires_at = state.get("agent_key_expires_at") or state.get("expires_at")
if not api_key:
creds = resolve_nous_runtime_credentials(
@ -1309,12 +1316,11 @@ def resolve_runtime_provider(
or getattr(entry, "access_token", "")
)
# For Nous, the pool entry's runtime_api_key is the agent_key
# compatibility field: either an invoke JWT or legacy opaque key.
# The pool doesn't
# compatibility field. It must be an invoke JWT. The pool doesn't
# refresh it during selection (that would trigger network calls in
# non-runtime contexts like `hermes auth list`). If the key is
# expired, clear pool_api_key so we fall through to
# resolve_nous_runtime_credentials() which handles refresh + fallback.
# resolve_nous_runtime_credentials() which handles refresh.
if provider == "nous" and entry is not None and pool_api_key:
min_ttl = max(60, int(os.getenv("HERMES_NOUS_MIN_KEY_TTL_SECONDS", "1800")))
nous_state = {

View file

@ -1898,8 +1898,7 @@ async def _start_device_code_flow(provider_id: str) -> Dict[str, Any]:
"""
if provider_id == "nous":
from hermes_cli.auth import (
_nous_device_scope_with_env_override,
_request_nous_device_code_with_scope_fallback,
_request_device_code,
PROVIDER_REGISTRY,
)
import httpx
@ -1910,22 +1909,21 @@ async def _start_device_code_flow(provider_id: str) -> Dict[str, Any]:
or pconfig.portal_base_url
).rstrip("/")
client_id = pconfig.client_id
scope, explicit_scope = _nous_device_scope_with_env_override(
None,
default_scope=pconfig.scope,
)
scope = pconfig.scope
def _do_nous_device_request():
with httpx.Client(
timeout=httpx.Timeout(15.0),
headers={"Accept": "application/json"},
) as client:
return _request_nous_device_code_with_scope_fallback(
client=client,
portal_base_url=portal_base_url,
client_id=client_id,
scope=scope,
allow_legacy_fallback=not explicit_scope,
return (
_request_device_code(
client=client,
portal_base_url=portal_base_url,
client_id=client_id,
scope=scope,
),
scope,
)
device_data, effective_scope = await asyncio.get_running_loop().run_in_executor(
@ -2093,7 +2091,7 @@ def _nous_poller(session_id: str) -> None:
expires_in=expires_in,
poll_interval=interval,
)
# Same post-processing as _nous_device_code_login (mint agent key)
# Same post-processing as _nous_device_code_login (validate/refresh JWT)
now = datetime.now(timezone.utc)
token_ttl = int(token_data.get("expires_in") or 0)
auth_state = {

View file

@ -3007,19 +3007,15 @@ class AIAgent:
try:
from hermes_cli.auth import (
NOUS_INFERENCE_AUTH_MODE_AUTO,
NOUS_INFERENCE_AUTH_MODE_LEGACY,
resolve_nous_runtime_credentials,
)
selected_auth_mode = inference_auth_mode or (
NOUS_INFERENCE_AUTH_MODE_LEGACY
if force
else NOUS_INFERENCE_AUTH_MODE_AUTO
)
selected_auth_mode = inference_auth_mode or NOUS_INFERENCE_AUTH_MODE_AUTO
creds = resolve_nous_runtime_credentials(
min_key_ttl_seconds=max(60, int(os.getenv("HERMES_NOUS_MIN_KEY_TTL_SECONDS", "1800"))),
timeout_seconds=float(os.getenv("HERMES_NOUS_TIMEOUT_SECONDS", "15")),
inference_auth_mode=selected_auth_mode,
force_refresh=force,
)
except Exception as exc:
logger.debug("Nous credential refresh failed: %s", exc)

View file

@ -1,5 +1,6 @@
"""Tests for agent.auxiliary_client resolution chain, provider overrides, and model overrides."""
import base64
import json
import logging
import time
@ -29,6 +30,12 @@ from agent.auxiliary_client import (
)
def _jwt_with_claims(claims: dict) -> str:
header = base64.urlsafe_b64encode(b'{"alg":"none","typ":"JWT"}').decode().rstrip("=")
payload = base64.urlsafe_b64encode(json.dumps(claims).encode()).decode().rstrip("=")
return f"{header}.{payload}.sig"
@pytest.fixture(autouse=True)
def _clean_env(monkeypatch):
"""Strip provider env vars so each test starts clean."""
@ -887,9 +894,16 @@ class TestVisionClientFallback:
class TestAuxiliaryPoolAwareness:
def test_try_nous_uses_pool_entry(self):
pooled_token = _jwt_with_claims({
"scope": "inference:invoke",
"exp": int(time.time() + 3600),
})
class _Entry:
access_token = "pooled-access-token"
agent_key = "pooled-agent-key"
agent_key = pooled_token
agent_key_expires_at = "2099-01-01T00:00:00+00:00"
scope = "inference:invoke"
inference_base_url = "https://inference.pool.example/v1"
class _Pool:
@ -910,7 +924,7 @@ class TestAuxiliaryPoolAwareness:
assert client is not None
assert model == "google/gemini-3-flash-preview"
assert mock_openai.call_args.kwargs["api_key"] == "pooled-agent-key"
assert mock_openai.call_args.kwargs["api_key"] == pooled_token
assert mock_openai.call_args.kwargs["base_url"] == "https://inference.pool.example/v1"
def test_try_nous_uses_portal_recommendation_for_text(self):

View file

@ -1225,7 +1225,7 @@ def test_load_pool_migrates_nous_provider_state(tmp_path, monkeypatch):
"inference_base_url": "https://inference.example.com/v1",
"client_id": "hermes-cli",
"token_type": "Bearer",
"scope": "inference:mint_agent_key",
"scope": "inference:invoke",
"access_token": "access-token",
"refresh_token": "refresh-token",
"expires_at": "2026-03-24T12:00:00+00:00",
@ -1252,7 +1252,7 @@ def test_load_pool_mirrors_nous_invoke_jwt_agent_key_runtime_api_key(tmp_path, m
expires_at = datetime.fromtimestamp(time.time() + 3600, tz=timezone.utc).isoformat()
token = _jwt_with_claims({
"sub": "test-user",
"scope": ["inference:invoke", "inference:mint_agent_key"],
"scope": ["inference:invoke"],
"exp": int(time.time() + 3600),
})
_write_auth_store(
@ -1266,7 +1266,7 @@ def test_load_pool_mirrors_nous_invoke_jwt_agent_key_runtime_api_key(tmp_path, m
"inference_base_url": "https://inference.example.com/v1",
"client_id": "hermes-cli",
"token_type": "Bearer",
"scope": "inference:invoke inference:mint_agent_key",
"scope": "inference:invoke",
"access_token": token,
"refresh_token": "refresh-token",
"expires_at": expires_at,
@ -1307,7 +1307,7 @@ def test_nous_pool_terminal_refresh_removes_device_code_entry(tmp_path, monkeypa
"inference_base_url": "https://inference.example.com/v1",
"client_id": "hermes-cli",
"token_type": "Bearer",
"scope": "inference:mint_agent_key",
"scope": "inference:invoke",
"access_token": "access-token",
"refresh_token": "refresh-token",
"expires_at": "2026-03-24T12:00:00+00:00",
@ -1479,7 +1479,7 @@ def test_load_pool_migrates_nous_provider_state_preserves_tls(tmp_path, monkeypa
"inference_base_url": "https://inference.example.com/v1",
"client_id": "hermes-cli",
"token_type": "Bearer",
"scope": "inference:mint_agent_key",
"scope": "inference:invoke",
"access_token": "access-token",
"refresh_token": "refresh-token",
"expires_at": "2026-03-24T12:00:00+00:00",
@ -2405,7 +2405,7 @@ def test_sync_nous_entry_from_auth_store_adopts_newer_tokens(tmp_path, monkeypat
"inference_base_url": "https://inference.example.com/v1",
"client_id": "hermes-cli",
"token_type": "Bearer",
"scope": "inference:mint_agent_key",
"scope": "inference:invoke",
"access_token": "access-OLD",
"refresh_token": "refresh-OLD",
"expires_at": "2026-03-24T12:00:00+00:00",
@ -2435,7 +2435,7 @@ def test_sync_nous_entry_from_auth_store_adopts_newer_tokens(tmp_path, monkeypat
"inference_base_url": "https://inference.example.com/v1",
"client_id": "hermes-cli",
"token_type": "Bearer",
"scope": "inference:mint_agent_key",
"scope": "inference:invoke",
"access_token": "access-NEW",
"refresh_token": "refresh-NEW",
"expires_at": "2026-03-24T12:30:00+00:00",
@ -2467,7 +2467,7 @@ def test_sync_nous_entry_noop_when_tokens_match(tmp_path, monkeypatch):
"inference_base_url": "https://inference.example.com/v1",
"client_id": "hermes-cli",
"token_type": "Bearer",
"scope": "inference:mint_agent_key",
"scope": "inference:invoke",
"access_token": "access-token",
"refresh_token": "refresh-token",
"expires_at": "2026-03-24T12:00:00+00:00",
@ -2504,7 +2504,7 @@ def test_nous_exhausted_entry_recovers_via_auth_store_sync(tmp_path, monkeypatch
"inference_base_url": "https://inference.example.com/v1",
"client_id": "hermes-cli",
"token_type": "Bearer",
"scope": "inference:mint_agent_key",
"scope": "inference:invoke",
"access_token": "access-OLD",
"refresh_token": "refresh-OLD",
"expires_at": "2026-03-24T12:00:00+00:00",
@ -2541,7 +2541,7 @@ def test_nous_exhausted_entry_recovers_via_auth_store_sync(tmp_path, monkeypatch
"inference_base_url": "https://inference.example.com/v1",
"client_id": "hermes-cli",
"token_type": "Bearer",
"scope": "inference:mint_agent_key",
"scope": "inference:invoke",
"access_token": "access-FRESH",
"refresh_token": "refresh-FRESH",
"expires_at": "2026-03-24T12:30:00+00:00",

View file

@ -107,15 +107,15 @@ def test_auth_add_nous_oauth_persists_pool_entry(tmp_path, monkeypatch):
"portal_base_url": "https://portal.example.com",
"inference_base_url": "https://inference.example.com/v1",
"client_id": "hermes-cli",
"scope": "inference:invoke inference:mint_agent_key",
"scope": "inference:invoke",
"token_type": "Bearer",
"access_token": token,
"refresh_token": "refresh-token",
"obtained_at": "2026-03-23T10:00:00+00:00",
"expires_at": "2026-03-23T11:00:00+00:00",
"expires_in": 3600,
"agent_key": "ak-test",
"agent_key_id": "ak-id",
"agent_key": token,
"agent_key_id": None,
"agent_key_expires_at": "2026-03-23T10:30:00+00:00",
"agent_key_expires_in": 1800,
"agent_key_reused": False,
@ -228,15 +228,15 @@ def test_auth_add_nous_oauth_honors_custom_label(tmp_path, monkeypatch):
"portal_base_url": "https://portal.example.com",
"inference_base_url": "https://inference.example.com/v1",
"client_id": "hermes-cli",
"scope": "inference:invoke inference:mint_agent_key",
"scope": "inference:invoke",
"token_type": "Bearer",
"access_token": token,
"refresh_token": "refresh-token",
"obtained_at": "2026-03-23T10:00:00+00:00",
"expires_at": "2026-03-23T11:00:00+00:00",
"expires_in": 3600,
"agent_key": "ak-test",
"agent_key_id": "ak-id",
"agent_key": token,
"agent_key_id": None,
"agent_key_expires_at": "2026-03-23T10:30:00+00:00",
"agent_key_expires_in": 1800,
"agent_key_reused": False,

View file

@ -1,4 +1,4 @@
"""Regression tests for Nous OAuth refresh + agent-key mint interactions."""
"""Regression tests for Nous OAuth refresh and inference JWT interactions."""
import base64
import json
@ -126,14 +126,15 @@ class TestResolveVerifyFallback:
def _setup_nous_auth(
hermes_home: Path,
*,
access_token: str = "access-old",
access_token: str = "",
refresh_token: str = "refresh-old",
scope: str = "inference:mint_agent_key",
scope: str = "inference:invoke",
expires_at: str = "2026-02-01T00:00:00+00:00",
expires_in: int = 0,
agent_key: str | None = None,
agent_key_expires_at: str | None = None,
) -> None:
access_token = access_token or _invoke_jwt(seconds=3600, scope=scope)
hermes_home.mkdir(parents=True, exist_ok=True)
auth_store = {
"version": 1,
@ -162,16 +163,6 @@ def _setup_nous_auth(
(hermes_home / "auth.json").write_text(json.dumps(auth_store, indent=2))
def _mint_payload(api_key: str = "agent-key") -> dict:
return {
"api_key": api_key,
"key_id": "key-id-1",
"expires_at": datetime.now(timezone.utc).isoformat(),
"expires_in": 1800,
"reused": False,
}
def _jwt_with_claims(claims: dict) -> str:
def _part(payload: dict) -> str:
raw = json.dumps(payload, separators=(",", ":")).encode("utf-8")
@ -184,7 +175,7 @@ def _future_iso(seconds: int = 3600) -> str:
return datetime.fromtimestamp(time.time() + seconds, tz=timezone.utc).isoformat()
def _invoke_jwt(*, seconds: int = 3600, scope: object = "inference:invoke inference:mint_agent_key") -> str:
def _invoke_jwt(*, seconds: int = 3600, scope: object = "inference:invoke") -> str:
return _jwt_with_claims({
"sub": "test-user",
"scope": scope,
@ -209,11 +200,6 @@ def test_resolve_nous_runtime_credentials_prefers_invoke_jwt_and_mirrors(
)
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
def _unexpected_mint(*args, **kwargs):
raise AssertionError("legacy agent-key mint should not run for invoke JWT")
monkeypatch.setattr(auth_mod, "_mint_agent_key", _unexpected_mint)
creds = auth_mod.resolve_nous_runtime_credentials(min_key_ttl_seconds=300)
assert creds["api_key"] == token
@ -278,15 +264,11 @@ def test_resolve_nous_runtime_credentials_invoke_jwt_is_idempotent(
before_mtime = auth_path.stat().st_mtime_ns
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
def _unexpected_mint(*args, **kwargs):
raise AssertionError("stable invoke JWT should not mint a legacy key")
def _unexpected_shared_write(*args, **kwargs):
raise AssertionError("unchanged invoke JWT resolution should not sync shared store")
sync_calls = []
monkeypatch.setattr(auth_mod, "_mint_agent_key", _unexpected_mint)
monkeypatch.setattr(auth_mod, "_write_shared_nous_state", _unexpected_shared_write)
monkeypatch.setattr(
auth_mod,
@ -330,11 +312,7 @@ def test_resolve_nous_runtime_credentials_trusts_invoke_jwt_exp_over_stale_metad
def _unexpected_refresh(*args, **kwargs):
raise AssertionError("valid invoke JWT should not be refreshed because metadata is stale")
def _unexpected_mint(*args, **kwargs):
raise AssertionError("valid invoke JWT should not fall back to legacy mint")
monkeypatch.setattr(auth_mod, "_refresh_access_token", _unexpected_refresh)
monkeypatch.setattr(auth_mod, "_mint_agent_key", _unexpected_mint)
creds = auth_mod.resolve_nous_runtime_credentials(min_key_ttl_seconds=300)
@ -347,7 +325,7 @@ def test_resolve_nous_runtime_credentials_trusts_invoke_jwt_exp_over_stale_metad
assert datetime.fromisoformat(singleton["agent_key_expires_at"]).timestamp() > time.time() + 300
def test_resolve_nous_runtime_credentials_does_not_apply_legacy_ttl_to_invoke_jwt(
def test_resolve_nous_runtime_credentials_does_not_apply_agent_key_ttl_to_invoke_jwt(
tmp_path,
monkeypatch,
):
@ -364,11 +342,6 @@ def test_resolve_nous_runtime_credentials_does_not_apply_legacy_ttl_to_invoke_jw
)
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
def _unexpected_mint(*args, **kwargs):
raise AssertionError("1800s legacy min TTL should not force opaque mint for invoke JWT")
monkeypatch.setattr(auth_mod, "_mint_agent_key", _unexpected_mint)
creds = auth_mod.resolve_nous_runtime_credentials(min_key_ttl_seconds=1800)
assert creds["api_key"] == token
@ -378,7 +351,56 @@ def test_resolve_nous_runtime_credentials_does_not_apply_legacy_ttl_to_invoke_jw
assert payload["credential_pool"]["nous"][0]["agent_key"] == token
def test_legacy_auth_mode_bypasses_usable_invoke_jwt(tmp_path, monkeypatch):
def test_resolve_nous_runtime_credentials_refreshes_legacy_agent_key_to_invoke_jwt(
tmp_path,
monkeypatch,
):
import hermes_cli.auth as auth_mod
hermes_home = tmp_path / "hermes"
refreshed_token = _invoke_jwt(seconds=3600)
_setup_nous_auth(
hermes_home,
access_token="legacy-access-token",
refresh_token="refresh-old",
scope=auth_mod.DEFAULT_NOUS_SCOPE,
expires_at=_future_iso(3600),
expires_in=3600,
agent_key="legacy-opaque-session-key",
agent_key_expires_at=_future_iso(3600),
)
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
refresh_calls = []
def _fake_refresh_access_token(*, client, portal_base_url, client_id, refresh_token):
del client, portal_base_url, client_id
refresh_calls.append(refresh_token)
return {
"access_token": refreshed_token,
"refresh_token": "refresh-new",
"expires_in": 3600,
"token_type": "Bearer",
"scope": auth_mod.DEFAULT_NOUS_SCOPE,
}
monkeypatch.setattr(auth_mod, "_refresh_access_token", _fake_refresh_access_token)
creds = auth_mod.resolve_nous_runtime_credentials(min_key_ttl_seconds=300)
assert refresh_calls == ["refresh-old"]
assert creds["api_key"] == refreshed_token
assert creds["source"] == auth_mod.NOUS_AUTH_PATH_INVOKE_JWT
payload = json.loads((hermes_home / "auth.json").read_text())
singleton = payload["providers"]["nous"]
assert singleton["access_token"] == refreshed_token
assert singleton["refresh_token"] == "refresh-new"
assert singleton["agent_key"] == refreshed_token
assert singleton["agent_key_id"] is None
assert payload["credential_pool"]["nous"][0]["agent_key"] == refreshed_token
def test_legacy_auth_mode_is_rejected(tmp_path, monkeypatch):
import hermes_cli.auth as auth_mod
hermes_home = tmp_path / "hermes"
@ -392,28 +414,14 @@ def test_legacy_auth_mode_bypasses_usable_invoke_jwt(tmp_path, monkeypatch):
)
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
mint_calls = []
def _fake_mint_agent_key(*, client, portal_base_url, access_token, min_ttl_seconds):
del client, portal_base_url, min_ttl_seconds
mint_calls.append(access_token)
return _mint_payload(api_key="legacy-after-jwt-401")
monkeypatch.setattr(auth_mod, "_mint_agent_key", _fake_mint_agent_key)
creds = auth_mod.resolve_nous_runtime_credentials(
min_key_ttl_seconds=300,
inference_auth_mode=auth_mod.NOUS_INFERENCE_AUTH_MODE_LEGACY,
)
assert mint_calls == [token]
assert creds["api_key"] == "legacy-after-jwt-401"
assert creds["auth_path"] == auth_mod.NOUS_AUTH_PATH_LEGACY_SESSION_KEY_MINT
payload = json.loads((hermes_home / "auth.json").read_text())
assert payload["providers"]["nous"]["agent_key"] == "legacy-after-jwt-401"
with pytest.raises(ValueError, match="Invalid Nous inference auth mode"):
auth_mod.resolve_nous_runtime_credentials(
min_key_ttl_seconds=300,
inference_auth_mode="legacy",
)
def test_resolve_nous_runtime_credentials_falls_back_when_invoke_scope_missing(
def test_resolve_nous_runtime_credentials_reauths_when_invoke_scope_missing(
tmp_path,
monkeypatch,
):
@ -428,32 +436,24 @@ def test_resolve_nous_runtime_credentials_falls_back_when_invoke_scope_missing(
_setup_nous_auth(
hermes_home,
access_token=token,
scope=auth_mod.NOUS_LEGACY_AGENT_KEY_SCOPE,
refresh_token="",
scope="inference:mint_agent_key",
expires_at=_future_iso(3600),
expires_in=3600,
)
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
calls = []
with pytest.raises(AuthError) as exc:
auth_mod.resolve_nous_runtime_credentials(min_key_ttl_seconds=300)
def _fake_mint_agent_key(*, client, portal_base_url, access_token, min_ttl_seconds):
del client, portal_base_url, min_ttl_seconds
calls.append(access_token)
return _mint_payload(api_key="opaque-agent-key")
monkeypatch.setattr(auth_mod, "_mint_agent_key", _fake_mint_agent_key)
creds = auth_mod.resolve_nous_runtime_credentials(min_key_ttl_seconds=300)
assert calls == [token]
assert creds["api_key"] == "opaque-agent-key"
assert creds["source"] == "portal"
assert exc.value.code == "missing_inference_invoke_scope"
assert exc.value.relogin_required is True
payload = json.loads((hermes_home / "auth.json").read_text())
assert payload["providers"]["nous"]["agent_key"] == "opaque-agent-key"
assert payload["credential_pool"]["nous"][0]["agent_key"] == "opaque-agent-key"
assert payload["providers"]["nous"]["agent_key"] is None
assert "credential_pool" not in payload or not payload["credential_pool"].get("nous")
def test_nous_device_code_login_retries_legacy_scope_when_invoke_refused(monkeypatch):
def test_nous_device_code_login_does_not_retry_legacy_scope_when_invoke_refused(monkeypatch):
import hermes_cli.auth as auth_mod
scopes = []
@ -461,59 +461,31 @@ def test_nous_device_code_login_retries_legacy_scope_when_invoke_refused(monkeyp
def _fake_request_device_code(*, client, portal_base_url, client_id, scope):
del client, portal_base_url, client_id
scopes.append(scope)
if len(scopes) == 1:
request = httpx.Request("POST", "https://portal.example.com/api/oauth/device/code")
response = httpx.Response(
400,
json={
"error": "invalid_scope",
"error_description": "unsupported inference:invoke",
},
request=request,
)
raise httpx.HTTPStatusError("invalid_scope", request=request, response=response)
return {
"device_code": "device",
"user_code": "user",
"verification_uri": "https://portal.example.com/device",
"verification_uri_complete": "https://portal.example.com/device?code=user",
"expires_in": 600,
"interval": 1,
}
def _fake_poll_for_token(**kwargs):
del kwargs
return {
"access_token": "access-legacy",
"refresh_token": "refresh-legacy",
"expires_in": 900,
"scope": auth_mod.NOUS_LEGACY_AGENT_KEY_SCOPE,
}
def _fake_refresh(state, **kwargs):
del kwargs
refreshed = dict(state)
refreshed["agent_key"] = "opaque-agent-key"
refreshed["agent_key_expires_at"] = _future_iso(1800)
return refreshed
request = httpx.Request("POST", "https://portal.example.com/api/oauth/device/code")
response = httpx.Response(
400,
json={
"error": "invalid_scope",
"error_description": "unsupported inference:invoke",
},
request=request,
)
raise httpx.HTTPStatusError("invalid_scope", request=request, response=response)
monkeypatch.setattr(auth_mod, "_request_device_code", _fake_request_device_code)
monkeypatch.setattr(auth_mod, "_poll_for_token", _fake_poll_for_token)
monkeypatch.setattr(auth_mod, "refresh_nous_oauth_from_state", _fake_refresh)
result = auth_mod._nous_device_code_login(
portal_base_url="https://portal.example.com",
inference_base_url="https://inference.example.com/v1",
open_browser=False,
timeout_seconds=1,
)
with pytest.raises(httpx.HTTPStatusError):
auth_mod._nous_device_code_login(
portal_base_url="https://portal.example.com",
inference_base_url="https://inference.example.com/v1",
open_browser=False,
timeout_seconds=1,
)
assert scopes == [auth_mod.DEFAULT_NOUS_SCOPE, auth_mod.NOUS_LEGACY_AGENT_KEY_SCOPE]
assert result["scope"] == auth_mod.NOUS_LEGACY_AGENT_KEY_SCOPE
assert result["agent_key"] == "opaque-agent-key"
assert scopes == [auth_mod.DEFAULT_NOUS_SCOPE]
def test_forced_legacy_env_skips_invoke_scope_and_jwt_storage(tmp_path, monkeypatch):
def test_legacy_session_env_is_ignored_for_invoke_scope_and_jwt_storage(tmp_path, monkeypatch):
import hermes_cli.auth as auth_mod
hermes_home = tmp_path / "hermes"
@ -526,25 +498,16 @@ def test_forced_legacy_env_skips_invoke_scope_and_jwt_storage(tmp_path, monkeypa
expires_in=3600,
)
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
monkeypatch.setenv(auth_mod.NOUS_LEGACY_SESSION_KEYS_ENV, "true")
mint_calls = []
def _fake_mint_agent_key(*, client, portal_base_url, access_token, min_ttl_seconds):
del client, portal_base_url, min_ttl_seconds
mint_calls.append(access_token)
return _mint_payload(api_key="forced-legacy-key")
monkeypatch.setattr(auth_mod, "_mint_agent_key", _fake_mint_agent_key)
monkeypatch.setenv("HERMES_AGENT_USE_LEGACY_SESSION_KEYS", "true")
creds = auth_mod.resolve_nous_runtime_credentials(min_key_ttl_seconds=300)
assert mint_calls == [token]
assert creds["api_key"] == "forced-legacy-key"
assert creds["api_key"] == token
payload = json.loads((hermes_home / "auth.json").read_text())
assert payload["providers"]["nous"]["agent_key"] == "forced-legacy-key"
assert payload["providers"]["nous"]["agent_key"] == token
requested_scopes = []
login_token = _invoke_jwt(seconds=3600)
def _fake_request_device_code(*, client, portal_base_url, client_id, scope):
del client, portal_base_url, client_id
@ -561,31 +524,24 @@ def test_forced_legacy_env_skips_invoke_scope_and_jwt_storage(tmp_path, monkeypa
def _fake_poll_for_token(**kwargs):
del kwargs
return {
"access_token": "access-legacy",
"refresh_token": "refresh-legacy",
"access_token": login_token,
"refresh_token": "refresh-token",
"expires_in": 900,
"scope": auth_mod.NOUS_LEGACY_AGENT_KEY_SCOPE,
"scope": auth_mod.DEFAULT_NOUS_SCOPE,
}
def _fake_refresh(state, **kwargs):
del kwargs
refreshed = dict(state)
refreshed["agent_key"] = "forced-legacy-login-key"
refreshed["agent_key_expires_at"] = _future_iso(1800)
return refreshed
monkeypatch.setattr(auth_mod, "_request_device_code", _fake_request_device_code)
monkeypatch.setattr(auth_mod, "_poll_for_token", _fake_poll_for_token)
monkeypatch.setattr(auth_mod, "refresh_nous_oauth_from_state", _fake_refresh)
auth_mod._nous_device_code_login(
result = auth_mod._nous_device_code_login(
portal_base_url="https://portal.example.com",
inference_base_url="https://inference.example.com/v1",
open_browser=False,
timeout_seconds=1,
)
assert requested_scopes == [auth_mod.NOUS_LEGACY_AGENT_KEY_SCOPE]
assert requested_scopes == [auth_mod.DEFAULT_NOUS_SCOPE]
assert result["agent_key"] == login_token
def test_nous_inference_auth_logs_do_not_include_secret_values(
@ -596,37 +552,42 @@ def test_nous_inference_auth_logs_do_not_include_secret_values(
import hermes_cli.auth as auth_mod
hermes_home = tmp_path / "hermes"
token = _jwt_with_claims({
"sub": "secret-user",
"scope": "inference:mint_agent_key",
"exp": int(time.time() + 3600),
})
token = _invoke_jwt(seconds=3600)
refreshed_token = _invoke_jwt(seconds=7200)
refresh_token = "refresh-secret-token"
opaque_key = "opaque-secret-agent-key"
_setup_nous_auth(
hermes_home,
access_token=token,
refresh_token=refresh_token,
scope=auth_mod.NOUS_LEGACY_AGENT_KEY_SCOPE,
scope=auth_mod.DEFAULT_NOUS_SCOPE,
expires_at=_future_iso(3600),
expires_in=3600,
)
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
def _fake_mint_agent_key(*, client, portal_base_url, access_token, min_ttl_seconds):
del client, portal_base_url, access_token, min_ttl_seconds
return _mint_payload(api_key=opaque_key)
def _fake_refresh_access_token(*, client, portal_base_url, client_id, refresh_token):
del client, portal_base_url, client_id, refresh_token
return {
"access_token": refreshed_token,
"refresh_token": "refresh-new",
"expires_in": 7200,
"token_type": "Bearer",
"scope": auth_mod.DEFAULT_NOUS_SCOPE,
}
monkeypatch.setattr(auth_mod, "_mint_agent_key", _fake_mint_agent_key)
monkeypatch.setattr(auth_mod, "_refresh_access_token", _fake_refresh_access_token)
caplog.set_level(logging.INFO, logger="hermes_cli.auth")
auth_mod.resolve_nous_runtime_credentials(min_key_ttl_seconds=300)
auth_mod.resolve_nous_runtime_credentials(
min_key_ttl_seconds=300,
force_refresh=True,
)
logged = caplog.text
assert "legacy session key path" in logged
assert "using NAS invoke JWT" in logged
assert token not in logged
assert refreshed_token not in logged
assert refresh_token not in logged
assert opaque_key not in logged
def test_get_nous_auth_status_checks_credential_pool(tmp_path, monkeypatch):
@ -805,68 +766,75 @@ def test_get_nous_auth_status_empty_returns_not_logged_in(tmp_path, monkeypatch)
assert status["logged_in"] is False
def test_refresh_token_persisted_when_mint_returns_insufficient_credits(tmp_path, monkeypatch):
def test_refresh_token_persisted_when_refreshed_jwt_lacks_invoke_scope(tmp_path, monkeypatch):
hermes_home = tmp_path / "hermes"
_setup_nous_auth(hermes_home, refresh_token="refresh-old")
_setup_nous_auth(
hermes_home,
access_token="access-old",
refresh_token="refresh-old",
)
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
refresh_calls = []
mint_calls = {"count": 0}
bad_jwt = _jwt_with_claims({
"sub": "test-user",
"scope": "profile",
"exp": int(time.time() + 3600),
})
good_jwt = _invoke_jwt(seconds=3600)
def _fake_refresh_access_token(*, client, portal_base_url, client_id, refresh_token):
refresh_calls.append(refresh_token)
idx = len(refresh_calls)
if len(refresh_calls) == 1:
token = bad_jwt
else:
token = good_jwt
return {
"access_token": f"access-{idx}",
"refresh_token": f"refresh-{idx}",
"expires_in": 0,
"access_token": token,
"refresh_token": f"refresh-{len(refresh_calls)}",
"expires_in": 3600,
"token_type": "Bearer",
"scope": "profile" if len(refresh_calls) == 1 else "inference:invoke",
}
def _fake_mint_agent_key(*, client, portal_base_url, access_token, min_ttl_seconds):
mint_calls["count"] += 1
if mint_calls["count"] == 1:
raise AuthError("credits exhausted", provider="nous", code="insufficient_credits")
return _mint_payload(api_key="agent-key-2")
monkeypatch.setattr("hermes_cli.auth._refresh_access_token", _fake_refresh_access_token)
monkeypatch.setattr("hermes_cli.auth._mint_agent_key", _fake_mint_agent_key)
with pytest.raises(AuthError) as exc:
resolve_nous_runtime_credentials(min_key_ttl_seconds=300)
assert exc.value.code == "insufficient_credits"
assert exc.value.code == "missing_inference_invoke_scope"
state_after_failure = get_provider_auth_state("nous")
assert state_after_failure is not None
assert state_after_failure["refresh_token"] == "refresh-1"
assert state_after_failure["access_token"] == "access-1"
assert state_after_failure["access_token"] == bad_jwt
creds = resolve_nous_runtime_credentials(min_key_ttl_seconds=300)
assert creds["api_key"] == "agent-key-2"
assert creds["api_key"] == good_jwt
assert refresh_calls == ["refresh-old", "refresh-1"]
def test_refresh_token_persisted_when_mint_times_out(tmp_path, monkeypatch):
def test_refresh_token_persisted_when_refreshed_token_is_not_jwt(tmp_path, monkeypatch):
hermes_home = tmp_path / "hermes"
_setup_nous_auth(hermes_home, refresh_token="refresh-old")
_setup_nous_auth(
hermes_home,
access_token="access-old",
refresh_token="refresh-old",
)
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
def _fake_refresh_access_token(*, client, portal_base_url, client_id, refresh_token):
return {
"access_token": "access-1",
"refresh_token": "refresh-1",
"expires_in": 0,
"expires_in": 3600,
"token_type": "Bearer",
}
def _fake_mint_agent_key(*, client, portal_base_url, access_token, min_ttl_seconds):
raise httpx.ReadTimeout("mint timeout")
monkeypatch.setattr("hermes_cli.auth._refresh_access_token", _fake_refresh_access_token)
monkeypatch.setattr("hermes_cli.auth._mint_agent_key", _fake_mint_agent_key)
with pytest.raises(httpx.ReadTimeout):
with pytest.raises(AuthError) as exc:
resolve_nous_runtime_credentials(min_key_ttl_seconds=300)
assert exc.value.code == "access_token_not_jwt"
state_after_failure = get_provider_auth_state("nous")
assert state_after_failure is not None
@ -881,7 +849,11 @@ def test_terminal_refresh_failure_quarantines_tokens(
from hermes_cli import auth as auth_mod
hermes_home = tmp_path / "hermes"
_setup_nous_auth(hermes_home, refresh_token="refresh-old")
_setup_nous_auth(
hermes_home,
access_token="access-old",
refresh_token="refresh-old",
)
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
from agent.credential_pool import load_pool
@ -967,35 +939,36 @@ def test_managed_access_token_refresh_failure_quarantines_tokens(
assert refresh_calls == ["refresh-old"]
def test_mint_retry_uses_latest_rotated_refresh_token(tmp_path, monkeypatch):
def test_unusable_access_token_refresh_uses_latest_rotated_refresh_token(tmp_path, monkeypatch):
hermes_home = tmp_path / "hermes"
_setup_nous_auth(hermes_home, refresh_token="refresh-old")
_setup_nous_auth(
hermes_home,
access_token="access-old",
refresh_token="refresh-old",
)
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
refresh_calls = []
mint_calls = {"count": 0}
good_jwt = _invoke_jwt(seconds=3600)
def _fake_refresh_access_token(*, client, portal_base_url, client_id, refresh_token):
refresh_calls.append(refresh_token)
idx = len(refresh_calls)
token = "access-still-not-jwt" if len(refresh_calls) == 1 else good_jwt
return {
"access_token": f"access-{idx}",
"refresh_token": f"refresh-{idx}",
"expires_in": 0,
"access_token": token,
"refresh_token": f"refresh-{len(refresh_calls)}",
"expires_in": 3600,
"token_type": "Bearer",
"scope": "inference:invoke",
}
def _fake_mint_agent_key(*, client, portal_base_url, access_token, min_ttl_seconds):
mint_calls["count"] += 1
if mint_calls["count"] == 1:
raise AuthError("stale access token", provider="nous", code="invalid_token")
return _mint_payload(api_key="agent-key")
monkeypatch.setattr("hermes_cli.auth._refresh_access_token", _fake_refresh_access_token)
monkeypatch.setattr("hermes_cli.auth._mint_agent_key", _fake_mint_agent_key)
with pytest.raises(AuthError) as exc:
resolve_nous_runtime_credentials(min_key_ttl_seconds=300)
assert exc.value.code == "access_token_not_jwt"
creds = resolve_nous_runtime_credentials(min_key_ttl_seconds=300)
assert creds["api_key"] == "agent-key"
assert creds["api_key"] == good_jwt
assert refresh_calls == ["refresh-old", "refresh-1"]
@ -1170,21 +1143,23 @@ class TestLoginNousSkipKeepsCurrent:
def _full_state_fixture() -> dict:
"""Shape of the dict returned by _nous_device_code_login /
refresh_nous_oauth_from_state. Used as helper input."""
token = _invoke_jwt(seconds=3600)
expires_at = _future_iso(3600)
return {
"portal_base_url": "https://portal.example.com",
"inference_base_url": "https://inference.example.com/v1",
"client_id": "hermes-cli",
"scope": "inference:mint_agent_key",
"scope": "inference:invoke",
"token_type": "Bearer",
"access_token": "access-tok",
"access_token": token,
"refresh_token": "refresh-tok",
"obtained_at": "2026-04-17T22:00:00+00:00",
"expires_at": "2026-04-17T22:15:00+00:00",
"expires_in": 900,
"agent_key": "agent-key-value",
"agent_key_id": "ak-id",
"agent_key_expires_at": "2026-04-18T22:00:00+00:00",
"agent_key_expires_in": 86400,
"expires_at": expires_at,
"expires_in": 3600,
"agent_key": token,
"agent_key_id": None,
"agent_key_expires_at": expires_at,
"agent_key_expires_in": 3600,
"agent_key_reused": False,
"agent_key_obtained_at": "2026-04-17T22:00:10+00:00",
"tls": {"insecure": False, "ca_bundle": None},
@ -1210,7 +1185,8 @@ def test_persist_nous_credentials_writes_both_pool_and_providers(tmp_path, monke
}))
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
entry = persist_nous_credentials(_full_state_fixture())
state = _full_state_fixture()
entry = persist_nous_credentials(state)
assert entry is not None
assert entry.provider == "nous"
@ -1220,17 +1196,17 @@ def test_persist_nous_credentials_writes_both_pool_and_providers(tmp_path, monke
# providers.nous populated with the full state (new behaviour)
singleton = payload["providers"]["nous"]
assert singleton["access_token"] == "access-tok"
assert singleton["access_token"] == state["access_token"]
assert singleton["refresh_token"] == "refresh-tok"
assert singleton["agent_key"] == "agent-key-value"
assert singleton["agent_key_expires_at"] == "2026-04-18T22:00:00+00:00"
assert singleton["agent_key"] == state["agent_key"]
assert singleton["agent_key_expires_at"] == state["agent_key_expires_at"]
# credential_pool.nous has exactly one canonical device_code entry
pool_entries = payload["credential_pool"]["nous"]
assert len(pool_entries) == 1, pool_entries
pool_entry = pool_entries[0]
assert pool_entry["source"] == NOUS_DEVICE_CODE_SOURCE
assert pool_entry["agent_key"] == "agent-key-value"
assert pool_entry["agent_key"] == state["agent_key"]
assert pool_entry["inference_base_url"] == "https://inference.example.com/v1"
@ -1243,7 +1219,6 @@ def test_persist_nous_credentials_allows_recovery_from_401(tmp_path, monkeypatch
providers.nous was empty.
"""
from hermes_cli.auth import (
NOUS_INFERENCE_AUTH_MODE_FRESH,
persist_nous_credentials,
resolve_nous_runtime_credentials,
)
@ -1256,29 +1231,27 @@ def test_persist_nous_credentials_allows_recovery_from_401(tmp_path, monkeypatch
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
persist_nous_credentials(_full_state_fixture())
new_jwt = _invoke_jwt(seconds=3600)
# Stub the network-touching steps so we don't actually contact the
# portal — the point of this test is that state lookup succeeds and
# doesn't raise "Hermes is not logged into Nous Portal".
def _fake_refresh_access_token(*, client, portal_base_url, client_id, refresh_token):
return {
"access_token": "access-new",
"access_token": new_jwt,
"refresh_token": "refresh-new",
"expires_in": 900,
"expires_in": 3600,
"token_type": "Bearer",
"scope": "inference:invoke",
}
def _fake_mint_agent_key(*, client, portal_base_url, access_token, min_ttl_seconds):
return _mint_payload(api_key="new-agent-key")
monkeypatch.setattr("hermes_cli.auth._refresh_access_token", _fake_refresh_access_token)
monkeypatch.setattr("hermes_cli.auth._mint_agent_key", _fake_mint_agent_key)
creds = resolve_nous_runtime_credentials(
min_key_ttl_seconds=300,
inference_auth_mode=NOUS_INFERENCE_AUTH_MODE_FRESH,
force_refresh=True,
)
assert creds["api_key"] == "new-agent-key"
assert creds["api_key"] == new_jwt
def test_persist_nous_credentials_idempotent_no_duplicate_pool_entries(tmp_path, monkeypatch):
@ -1304,21 +1277,23 @@ def test_persist_nous_credentials_idempotent_no_duplicate_pool_entries(tmp_path,
persist_nous_credentials(first)
second = _full_state_fixture()
second["access_token"] = "access-second"
second["agent_key"] = "agent-key-second"
second_token = _invoke_jwt(seconds=7200)
second["access_token"] = second_token
second["agent_key"] = second_token
second["agent_key_expires_at"] = _future_iso(7200)
persist_nous_credentials(second)
payload = json.loads((hermes_home / "auth.json").read_text())
# providers.nous reflects the latest write (singleton semantics)
assert payload["providers"]["nous"]["access_token"] == "access-second"
assert payload["providers"]["nous"]["agent_key"] == "agent-key-second"
assert payload["providers"]["nous"]["access_token"] == second_token
assert payload["providers"]["nous"]["agent_key"] == second_token
# credential_pool.nous has exactly one entry, carrying the latest agent_key
pool_entries = payload["credential_pool"]["nous"]
assert len(pool_entries) == 1, pool_entries
assert pool_entries[0]["source"] == NOUS_DEVICE_CODE_SOURCE
assert pool_entries[0]["agent_key"] == "agent-key-second"
assert pool_entries[0]["agent_key"] == second_token
# And no stray `manual:device_code` / `manual:dashboard_device_code` rows
assert not any(
e["source"].startswith("manual:") for e in pool_entries
@ -1339,13 +1314,14 @@ def test_persist_nous_credentials_reloads_pool_after_singleton_write(tmp_path, m
}))
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
entry = persist_nous_credentials(_full_state_fixture())
state = _full_state_fixture()
entry = persist_nous_credentials(state)
assert entry is not None
assert entry.source == NOUS_DEVICE_CODE_SOURCE
# Label derived by _seed_from_singletons via label_from_token; we don't
# assert its exact value, just that the helper returned a real entry.
assert entry.access_token == "access-tok"
assert entry.agent_key == "agent-key-value"
assert entry.access_token == state["access_token"]
assert entry.agent_key == state["agent_key"]
def test_persist_nous_credentials_embeds_custom_label(tmp_path, monkeypatch):
@ -1658,7 +1634,8 @@ def test_shared_store_write_and_read_roundtrip(shared_store_env):
_write_shared_nous_state,
)
_write_shared_nous_state(_full_state_fixture())
state = _full_state_fixture()
_write_shared_nous_state(state)
path = _nous_shared_store_path()
assert path.is_file()
@ -1670,7 +1647,7 @@ def test_shared_store_write_and_read_roundtrip(shared_store_env):
loaded = _read_shared_nous_state()
assert loaded is not None
assert loaded["refresh_token"] == "refresh-tok"
assert loaded["access_token"] == "access-tok"
assert loaded["access_token"] == state["access_token"]
assert loaded["portal_base_url"] == "https://portal.example.com"
assert loaded["inference_base_url"] == "https://inference.example.com/v1"
# Volatile agent_key MUST NOT be persisted to the shared store
@ -1760,12 +1737,12 @@ def test_try_import_shared_returns_none_on_refresh_failure(
assert auth_mod._read_shared_nous_state() is None
def test_try_import_shared_persists_rotated_token_when_mint_fails(
def test_try_import_shared_persists_rotated_token_when_jwt_validation_fails(
shared_store_env, monkeypatch,
):
"""A forced shared import refresh rotates the single-use token before minting.
"""A forced shared import refresh rotates the single-use token before validation.
If the later agent-key mint fails, the shared store must still keep the
If the later inference-JWT validation fails, the shared store must still keep the
rotated refresh token; otherwise the next import attempt replays the
consumed token and trips refresh-token reuse.
"""
@ -1785,12 +1762,7 @@ def test_try_import_shared_persists_rotated_token_when_mint_fails(
"token_type": "Bearer",
}
def _fake_mint_agent_key(*, client, portal_base_url, access_token, min_ttl_seconds):
assert access_token == "access-new"
raise AuthError("credits exhausted", provider="nous", code="insufficient_credits")
monkeypatch.setattr(auth_mod, "_refresh_access_token", _fake_refresh_access_token)
monkeypatch.setattr(auth_mod, "_mint_agent_key", _fake_mint_agent_key)
assert auth_mod._try_import_shared_nous_state() is None
@ -1801,16 +1773,17 @@ def test_try_import_shared_persists_rotated_token_when_mint_fails(
def test_try_import_shared_rehydrates_on_success(shared_store_env, monkeypatch):
"""Happy path: stored refresh_token is accepted, forced refresh+mint
returns a fresh access_token + agent_key, and the returned dict has
"""Happy path: stored refresh_token is accepted, forced refresh
returns a fresh access_token JWT, and the returned dict has
every field persist_nous_credentials() needs.
"""
from hermes_cli import auth as auth_mod
auth_mod._write_shared_nous_state(_full_state_fixture())
fresh_jwt = _invoke_jwt(seconds=7200)
def _fake_refresh(state, **kwargs):
# Simulate portal returning fresh tokens + a new agent_key
# Simulate portal returning a fresh inference JWT.
assert kwargs.get("force_refresh") is True
assert (
kwargs.get("inference_auth_mode")
@ -1818,10 +1791,10 @@ def test_try_import_shared_rehydrates_on_success(shared_store_env, monkeypatch):
)
return {
**state,
"access_token": "fresh-access-tok",
"access_token": fresh_jwt,
"refresh_token": "fresh-refresh-tok", # rotated
"agent_key": "new-agent-key",
"agent_key_expires_at": "2026-04-19T22:00:00+00:00",
"agent_key": fresh_jwt,
"agent_key_expires_at": _future_iso(7200),
}
monkeypatch.setattr(auth_mod, "refresh_nous_oauth_from_state", _fake_refresh)
@ -1829,9 +1802,9 @@ def test_try_import_shared_rehydrates_on_success(shared_store_env, monkeypatch):
result = auth_mod._try_import_shared_nous_state()
assert result is not None
assert result["access_token"] == "fresh-access-tok"
assert result["access_token"] == fresh_jwt
assert result["refresh_token"] == "fresh-refresh-tok"
assert result["agent_key"] == "new-agent-key"
assert result["agent_key"] == fresh_jwt
# Preserved from shared state
assert result["portal_base_url"] == "https://portal.example.com"
assert result["client_id"] == "hermes-cli"
@ -1878,13 +1851,15 @@ def test_shared_store_survives_across_profile_switch(
assert shared["refresh_token"] == "refresh-tok"
# And a successful rehydrate + persist lands nous into profile B
b_jwt = _invoke_jwt(seconds=7200)
def _fake_refresh(state, **kwargs):
return {
**state,
"access_token": "b-access-tok",
"access_token": b_jwt,
"refresh_token": "b-refresh-tok",
"agent_key": "b-agent-key",
"agent_key_expires_at": "2026-04-19T22:00:00+00:00",
"agent_key": b_jwt,
"agent_key_expires_at": _future_iso(7200),
}
monkeypatch.setattr(auth_mod, "refresh_nous_oauth_from_state", _fake_refresh)
@ -1924,35 +1899,29 @@ def test_runtime_refresh_uses_newer_shared_token_before_local_stale_token(
monkeypatch.setenv("HERMES_HOME", str(profile_b))
shared_state = _full_state_fixture()
shared_state["access_token"] = "shared-fresh-access"
shared_token = _invoke_jwt(seconds=3600)
shared_state["access_token"] = shared_token
shared_state["refresh_token"] = "shared-fresh-refresh"
shared_state["expires_at"] = "2099-01-01T00:00:00+00:00"
shared_state["scope"] = "inference:invoke"
auth_mod._write_shared_nous_state(shared_state)
def _refresh_should_not_happen(**_kwargs):
raise AssertionError("stale profile-local refresh token was used")
minted_with: list[str] = []
def _fake_mint_agent_key(*, client, portal_base_url, access_token, min_ttl_seconds):
minted_with.append(access_token)
return _mint_payload(api_key="agent-key-from-shared-token")
monkeypatch.setattr(auth_mod, "_refresh_access_token", _refresh_should_not_happen)
monkeypatch.setattr(auth_mod, "_mint_agent_key", _fake_mint_agent_key)
creds = auth_mod.resolve_nous_runtime_credentials(
min_key_ttl_seconds=300,
inference_auth_mode=auth_mod.NOUS_INFERENCE_AUTH_MODE_FRESH,
)
assert creds["api_key"] == "agent-key-from-shared-token"
assert minted_with == ["shared-fresh-access"]
assert creds["api_key"] == shared_token
profile_state = auth_mod.get_provider_auth_state("nous")
assert profile_state is not None
assert profile_state["refresh_token"] == "shared-fresh-refresh"
assert profile_state["access_token"] == "shared-fresh-access"
assert profile_state["access_token"] == shared_token
def test_managed_gateway_access_token_uses_newer_shared_token(

View file

@ -144,7 +144,7 @@ def test_nous_adapter_get_credential_uses_runtime_resolver(tmp_path, monkeypatch
assert cred.token_type == "Bearer"
def test_nous_adapter_retry_credential_forces_legacy_mint(tmp_path, monkeypatch):
def test_nous_adapter_retry_credential_does_not_fallback_on_jwt_401(tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
_write_auth_store(tmp_path, {
"access_token": "jwt-access",
@ -155,15 +155,8 @@ def test_nous_adapter_retry_credential_forces_legacy_mint(tmp_path, monkeypatch)
"agent_key": "jwt-access",
})
refreshed_state = {
"api_key": "legacy-bearer",
"base_url": "https://inference-api.nousresearch.com/v1",
"expires_at": "2099-01-01T00:00:00Z",
}
with patch(
"hermes_cli.proxy.adapters.nous_portal.resolve_nous_runtime_credentials",
return_value=refreshed_state,
) as mock_resolve:
adapter = NousPortalAdapter()
cred = adapter.get_retry_credential(
@ -174,9 +167,8 @@ def test_nous_adapter_retry_credential_forces_legacy_mint(tmp_path, monkeypatch)
status_code=401,
)
assert cred is not None
assert cred.bearer == "legacy-bearer"
assert mock_resolve.call_args.kwargs["inference_auth_mode"] == "legacy"
assert cred is None
mock_resolve.assert_not_called()
def test_nous_adapter_retry_credential_skips_opaque_bearer(tmp_path, monkeypatch):

View file

@ -25,6 +25,7 @@ from datetime import datetime, timezone
from unittest.mock import patch
import httpx
import pytest
from fastapi.testclient import TestClient
from hermes_cli.web_server import _SESSION_TOKEN, app
@ -99,7 +100,7 @@ def test_minimax_login_does_not_launch_anthropic_flow():
assert body["expires_in"] == 600
def test_nous_dashboard_device_flow_honors_legacy_scope_override(monkeypatch):
def test_nous_dashboard_device_flow_ignores_legacy_scope_override(monkeypatch):
from hermes_cli import auth as auth_mod
from hermes_cli import web_server as ws
@ -109,24 +110,24 @@ def test_nous_dashboard_device_flow_honors_legacy_scope_override(monkeypatch):
requested_scopes.append(kwargs["scope"])
return _fake_nous_device_data()
monkeypatch.setenv(auth_mod.NOUS_LEGACY_SESSION_KEYS_ENV, "true")
monkeypatch.setenv("HERMES_AGENT_USE_LEGACY_SESSION_KEYS", "true")
monkeypatch.setattr(auth_mod, "_request_device_code", fake_request_device_code)
monkeypatch.setattr(ws, "_nous_poller", lambda sid: None)
result = asyncio.run(ws._start_device_code_flow("nous"))
try:
assert requested_scopes == [auth_mod.NOUS_LEGACY_AGENT_KEY_SCOPE]
assert requested_scopes == [auth_mod.DEFAULT_NOUS_SCOPE]
assert result["flow"] == "device_code"
assert result["user_code"] == "NOUS-1234"
assert (
ws._oauth_sessions[result["session_id"]]["scope"]
== auth_mod.NOUS_LEGACY_AGENT_KEY_SCOPE
== auth_mod.DEFAULT_NOUS_SCOPE
)
finally:
ws._oauth_sessions.pop(result["session_id"], None)
def test_nous_dashboard_device_flow_retries_legacy_scope_on_invoke_refusal(monkeypatch):
def test_nous_dashboard_device_flow_does_not_retry_legacy_scope_on_invoke_refusal(monkeypatch):
from hermes_cli import auth as auth_mod
from hermes_cli import web_server as ws
@ -134,26 +135,15 @@ def test_nous_dashboard_device_flow_retries_legacy_scope_on_invoke_refusal(monke
def fake_request_device_code(**kwargs):
requested_scopes.append(kwargs["scope"])
if len(requested_scopes) == 1:
raise _invoke_scope_refusal()
return _fake_nous_device_data()
raise _invoke_scope_refusal()
monkeypatch.delenv(auth_mod.NOUS_LEGACY_SESSION_KEYS_ENV, raising=False)
monkeypatch.delenv("HERMES_AGENT_USE_LEGACY_SESSION_KEYS", raising=False)
monkeypatch.setattr(auth_mod, "_request_device_code", fake_request_device_code)
monkeypatch.setattr(ws, "_nous_poller", lambda sid: None)
result = asyncio.run(ws._start_device_code_flow("nous"))
try:
assert requested_scopes == [
auth_mod.DEFAULT_NOUS_SCOPE,
auth_mod.NOUS_LEGACY_AGENT_KEY_SCOPE,
]
assert (
ws._oauth_sessions[result["session_id"]]["scope"]
== auth_mod.NOUS_LEGACY_AGENT_KEY_SCOPE
)
finally:
ws._oauth_sessions.pop(result["session_id"], None)
with pytest.raises(httpx.HTTPStatusError):
asyncio.run(ws._start_device_code_flow("nous"))
assert requested_scopes == [auth_mod.DEFAULT_NOUS_SCOPE]
def test_nous_dashboard_poller_preserves_effective_scope_when_token_omits_scope(monkeypatch):
@ -173,13 +163,13 @@ def test_nous_dashboard_poller_preserves_effective_scope_when_token_omits_scope(
"device_code": "device-code",
"interval": 5,
"expires_at": time.time() + 600,
"scope": auth_mod.NOUS_LEGACY_AGENT_KEY_SCOPE,
"scope": auth_mod.DEFAULT_NOUS_SCOPE,
}
captured_state = {}
def fake_refresh_nous_oauth_from_state(state, **kwargs):
captured_state.update(state)
return {**state, "agent_key": "legacy-agent-key"}
return {**state, "agent_key": "jwt-agent-key"}
monkeypatch.setattr(
auth_mod,
@ -200,7 +190,7 @@ def test_nous_dashboard_poller_preserves_effective_scope_when_token_omits_scope(
try:
ws._nous_poller(session_id)
assert captured_state["scope"] == auth_mod.NOUS_LEGACY_AGENT_KEY_SCOPE
assert captured_state["scope"] == auth_mod.DEFAULT_NOUS_SCOPE
assert ws._oauth_sessions[session_id]["status"] == "approved"
finally:
ws._oauth_sessions.pop(session_id, None)

View file

@ -4063,7 +4063,8 @@ class TestNousCredentialRefresh:
assert ok is True
assert closed["value"] is True
assert captured["inference_auth_mode"] == "legacy"
assert captured["inference_auth_mode"] == "auto"
assert captured["force_refresh"] is True
assert rebuilt["kwargs"]["api_key"] == "new-nous-key"
assert (
rebuilt["kwargs"]["base_url"] == "https://inference-api.nousresearch.com/v1"
@ -4092,11 +4093,12 @@ class TestNousCredentialRefresh:
with patch("run_agent.OpenAI", return_value=MagicMock()):
ok = agent._try_refresh_nous_client_credentials(
force=False,
inference_auth_mode="legacy",
inference_auth_mode="fresh",
)
assert ok is True
assert captured["inference_auth_mode"] == "legacy"
assert captured["inference_auth_mode"] == "fresh"
assert captured["force_refresh"] is False
class TestCredentialPoolRecovery: