Switch to JWT token for inference against Nous, falling back to old opaque token on failure.

This commit is contained in:
Robin Fernandes 2026-05-17 19:34:44 +10:00 committed by Teknium
parent c905562623
commit 89a3d038cf
10 changed files with 780 additions and 45 deletions

View file

@ -755,7 +755,8 @@ class _CodexCompletionsAdapter:
def _check_cancelled() -> None:
if deadline is not None and time.monotonic() >= deadline:
timed_out.set()
if not timed_out.is_set():
_close_client_on_timeout()
raise TimeoutError(_timeout_message())
try:
from tools.interrupt import is_interrupted
@ -1233,7 +1234,7 @@ def _read_nous_auth() -> Optional[dict]:
def _nous_api_key(provider: dict) -> str:
"""Extract the best API key from a Nous provider state dict."""
"""Extract the Nous runtime credential from the compatibility field."""
return provider.get("agent_key") or provider.get("access_token", "")
@ -1246,7 +1247,7 @@ def _resolve_nous_runtime_api(*, force_refresh: bool = False) -> Optional[tuple[
"""Return fresh Nous runtime credentials when available.
This mirrors the main agent's 401 recovery path and keeps auxiliary
clients aligned with the singleton auth store + mint flow instead of
clients aligned with the singleton auth store + JWT/mint flow instead of
relying only on whatever raw tokens happen to be sitting in auth.json
or the credential pool.
"""

View file

@ -166,6 +166,8 @@ class PooledCredential:
@property
def runtime_api_key(self) -> str:
if self.provider == "nous":
# Nous stores the runtime inference credential in agent_key for
# compatibility. It may be a NAS invoke JWT or legacy opaque key.
return str(self.agent_key or self.access_token or "")
return str(self.access_token or "")

View file

@ -67,9 +67,13 @@ AUTH_LOCK_TIMEOUT_SECONDS = 15.0
DEFAULT_NOUS_PORTAL_URL = "https://portal.nousresearch.com"
DEFAULT_NOUS_INFERENCE_URL = "https://inference-api.nousresearch.com/v1"
DEFAULT_NOUS_CLIENT_ID = "hermes-cli"
DEFAULT_NOUS_SCOPE = "inference:mint_agent_key"
NOUS_LEGACY_AGENT_KEY_SCOPE = "inference:mint_agent_key"
NOUS_INFERENCE_INVOKE_SCOPE = "inference:invoke"
DEFAULT_NOUS_SCOPE = f"{NOUS_INFERENCE_INVOKE_SCOPE} {NOUS_LEGACY_AGENT_KEY_SCOPE}"
NOUS_LEGACY_SESSION_KEYS_ENV = "HERMES_AGENT_USE_LEGACY_SESSION_KEYS"
DEFAULT_AGENT_KEY_MIN_TTL_SECONDS = 30 * 60 # 30 minutes
ACCESS_TOKEN_REFRESH_SKEW_SECONDS = 120 # refresh 2 min before expiry
NOUS_INVOKE_JWT_MIN_TTL_SECONDS = ACCESS_TOKEN_REFRESH_SKEW_SECONDS
DEVICE_AUTH_POLL_INTERVAL_CAP_SECONDS = 1 # poll at most every 1s
DEFAULT_CODEX_BASE_URL = "https://chatgpt.com/backend-api/codex"
DEFAULT_XAI_OAUTH_BASE_URL = "https://api.x.ai/v1"
@ -1549,6 +1553,117 @@ def _decode_jwt_claims(token: Any) -> Dict[str, Any]:
return claims if isinstance(claims, dict) else {}
def _scope_values(raw_scope: Any) -> set[str]:
scopes: set[str] = set()
if isinstance(raw_scope, str):
for part in raw_scope.replace(",", " ").split():
cleaned = part.strip()
if cleaned:
scopes.add(cleaned)
elif isinstance(raw_scope, (list, tuple, set, frozenset)):
for item in raw_scope:
if isinstance(item, str):
scopes.update(_scope_values(item))
return scopes
def _nous_legacy_session_keys_forced() -> bool:
return is_truthy_value(os.getenv(NOUS_LEGACY_SESSION_KEYS_ENV), default=False)
def _nous_scope_has_invoke(raw_scope: Any) -> bool:
return NOUS_INFERENCE_INVOKE_SCOPE in _scope_values(raw_scope)
def _nous_invoke_jwt_is_usable(
token: Any,
*,
scope: Any = None,
expires_at: Any = None,
min_ttl_seconds: int = NOUS_INVOKE_JWT_MIN_TTL_SECONDS,
) -> bool:
claims = _decode_jwt_claims(token)
if not claims:
return False
scopes = (
_scope_values(scope)
| _scope_values(claims.get("scope"))
| _scope_values(claims.get("scp"))
)
if NOUS_INFERENCE_INVOKE_SCOPE not in scopes:
return False
exp = claims.get("exp")
skew = max(0, int(min_ttl_seconds))
if isinstance(exp, (int, float)):
return float(exp) > (time.time() + skew)
return not _is_expiring(expires_at, skew)
def _nous_invoke_jwt_unavailable_reason(
token: Any,
*,
scope: Any = None,
expires_at: Any = None,
min_ttl_seconds: int = NOUS_INVOKE_JWT_MIN_TTL_SECONDS,
) -> str:
claims = _decode_jwt_claims(token)
if not claims:
return "access_token_not_jwt"
scopes = (
_scope_values(scope)
| _scope_values(claims.get("scope"))
| _scope_values(claims.get("scp"))
)
if NOUS_INFERENCE_INVOKE_SCOPE not in scopes:
return "missing_inference_invoke_scope"
exp = claims.get("exp")
skew = max(0, int(min_ttl_seconds))
if isinstance(exp, (int, float)) and float(exp) <= (time.time() + skew):
return "invoke_jwt_expiring"
if not isinstance(exp, (int, float)) and _is_expiring(expires_at, skew):
return "invoke_jwt_expiry_unknown_or_expiring"
return "invoke_jwt_unavailable"
def _nous_jwt_expires_at(token: Any, fallback_expires_at: Any = None) -> Optional[str]:
claims = _decode_jwt_claims(token)
exp = claims.get("exp")
if isinstance(exp, (int, float)):
try:
return datetime.fromtimestamp(float(exp), tz=timezone.utc).isoformat()
except Exception:
pass
return fallback_expires_at if isinstance(fallback_expires_at, str) else None
def _set_nous_agent_key_from_invoke_jwt(
state: Dict[str, Any],
*,
obtained_at: Optional[str] = None,
) -> None:
access_token = state.get("access_token")
if not isinstance(access_token, str) or not access_token.strip():
return
now = datetime.now(timezone.utc)
effective_obtained_at = obtained_at or now.isoformat()
expires_at = _nous_jwt_expires_at(access_token, state.get("expires_at"))
expires_epoch = _parse_iso_timestamp(expires_at)
expires_in = (
max(0, int(expires_epoch - time.time()))
if expires_epoch is not None
else _coerce_ttl_seconds(state.get("expires_in"))
)
if expires_at:
state["expires_at"] = expires_at
state["expires_in"] = expires_in
state["agent_key"] = access_token
state["agent_key_id"] = None
state["agent_key_expires_at"] = expires_at
state["agent_key_expires_in"] = expires_in
state["agent_key_reused"] = False
state["agent_key_obtained_at"] = effective_obtained_at
def _codex_access_token_is_expiring(access_token: Any, skew_seconds: int) -> bool:
claims = _decode_jwt_claims(access_token)
exp = claims.get("exp")
@ -3333,6 +3448,34 @@ def _request_device_code(
return data
def _is_nous_invoke_scope_refusal(exc: Exception) -> bool:
if not isinstance(exc, httpx.HTTPStatusError):
return False
response = exc.response
if response.status_code not in {400, 401, 403}:
return False
try:
payload = response.json()
except Exception:
payload = {}
text = " ".join(
str(value)
for value in (
payload.get("error") if isinstance(payload, dict) else None,
payload.get("error_description") if isinstance(payload, dict) else None,
response.text,
)
if value
).lower()
if not text:
return False
return (
"invalid_scope" in text
or "unsupported_scope" in text
or "scope" in text and NOUS_INFERENCE_INVOKE_SCOPE in text
)
def _poll_for_token(
client: httpx.Client,
portal_base_url: str,
@ -3524,8 +3667,9 @@ def _write_shared_nous_state(state: Dict[str, Any]) -> None:
is a convenience layer; the per-profile auth.json remains the source
of truth.
We deliberately omit the short-lived ``agent_key`` (24h TTL, profile-
specific) only the long-lived OAuth tokens are cross-profile useful.
We deliberately omit the runtime ``agent_key`` compatibility field
(either an invoke JWT or legacy opaque session key) only OAuth tokens
are cross-profile useful.
"""
refresh_token = state.get("refresh_token")
access_token = state.get("access_token")
@ -3894,6 +4038,14 @@ def _agent_key_is_usable(state: Dict[str, Any], min_ttl_seconds: int) -> bool:
key = state.get("agent_key")
if not isinstance(key, str) or not key.strip():
return False
if _decode_jwt_claims(key):
if _nous_legacy_session_keys_forced():
return False
return _nous_invoke_jwt_is_usable(
key,
scope=state.get("scope"),
expires_at=state.get("agent_key_expires_at"),
)
return not _is_expiring(state.get("agent_key_expires_at"), min_ttl_seconds)
@ -4039,7 +4191,23 @@ def refresh_nous_oauth_pure(
timeout = httpx.Timeout(timeout_seconds if timeout_seconds else 15.0)
with httpx.Client(timeout=timeout, headers={"Accept": "application/json"}, verify=verify) as client:
if force_refresh or _is_expiring(state.get("expires_at"), ACCESS_TOKEN_REFRESH_SKEW_SECONDS):
min_agent_key_ttl = max(60, int(min_key_ttl_seconds))
legacy_session_keys = _nous_legacy_session_keys_forced()
current_invoke_jwt_usable = (
not legacy_session_keys
and _nous_invoke_jwt_is_usable(
state.get("access_token"),
scope=state.get("scope"),
expires_at=state.get("expires_at"),
)
)
if (
force_refresh
or (
_is_expiring(state.get("expires_at"), ACCESS_TOKEN_REFRESH_SKEW_SECONDS)
and not current_invoke_jwt_usable
)
):
refreshed = _refresh_access_token(
client=client,
portal_base_url=state["portal_base_url"],
@ -4061,7 +4229,39 @@ def refresh_nous_oauth_pure(
now.timestamp() + access_ttl, tz=timezone.utc
).isoformat()
if force_mint or not _agent_key_is_usable(state, max(60, int(min_key_ttl_seconds))):
if (
not legacy_session_keys
and _nous_invoke_jwt_is_usable(
state.get("access_token"),
scope=state.get("scope"),
expires_at=state.get("expires_at"),
)
):
_set_nous_agent_key_from_invoke_jwt(state)
logger.info("Nous inference auth: using NAS invoke JWT")
_oauth_trace(
"nous_invoke_jwt_selected",
access_token_fp=_token_fingerprint(state.get("access_token")),
)
elif force_mint or not _agent_key_is_usable(state, min_agent_key_ttl):
fallback_reason = (
"forced_legacy_session_keys"
if legacy_session_keys
else _nous_invoke_jwt_unavailable_reason(
state.get("access_token"),
scope=state.get("scope"),
expires_at=state.get("expires_at"),
)
)
logger.info(
"Nous inference auth: using legacy session key path (%s)",
fallback_reason,
)
_oauth_trace(
"nous_legacy_session_key_selected",
reason=fallback_reason,
access_token_fp=_token_fingerprint(state.get("access_token")),
)
mint_payload = _mint_agent_key(
client=client,
portal_base_url=state["portal_base_url"],
@ -4175,6 +4375,15 @@ def persist_nous_credentials(
)
def _sync_nous_pool_from_auth_store() -> None:
try:
from agent.credential_pool import load_pool
load_pool("nous")
except Exception as exc:
logger.debug("Failed to sync Nous credential pool from auth store: %s", exc)
def resolve_nous_runtime_credentials(
*,
min_key_ttl_seconds: int = DEFAULT_AGENT_KEY_MIN_TTL_SECONDS,
@ -4191,7 +4400,7 @@ def resolve_nous_runtime_credentials(
Concurrent processes coordinate through the auth store file lock.
Returns dict with: provider, base_url, api_key, key_id, expires_at,
expires_in, source ("cache" or "portal").
expires_in, source ("invoke_jwt", "cache", or "portal"), and auth_path.
"""
min_key_ttl_seconds = max(60, int(min_key_ttl_seconds))
sequence_id = uuid.uuid4().hex[:12]
@ -4260,15 +4469,35 @@ def resolve_nous_runtime_credentials(
raise AuthError("No access token found for Nous Portal login.",
provider="nous", relogin_required=True)
# Step 1: refresh access token if expiring
if _is_expiring(state.get("expires_at"), ACCESS_TOKEN_REFRESH_SKEW_SECONDS):
# Step 1: refresh access token if expiring. If the access token
# is already a valid invoke JWT, trust its own exp claim even when
# older auth.json metadata has a stale/missing expires_at.
current_invoke_jwt_usable = (
not _nous_legacy_session_keys_forced()
and _nous_invoke_jwt_is_usable(
access_token,
scope=state.get("scope"),
expires_at=state.get("expires_at"),
)
)
if (
_is_expiring(state.get("expires_at"), ACCESS_TOKEN_REFRESH_SKEW_SECONDS)
and not current_invoke_jwt_usable
):
with _nous_shared_store_lock(timeout_seconds=max(timeout_seconds + 5.0, AUTH_LOCK_TIMEOUT_SECONDS)):
if _merge_shared_nous_oauth_state(state):
access_token = state.get("access_token")
refresh_token = state.get("refresh_token")
_persist_state("post_shared_merge_access_expiring")
if _is_expiring(state.get("expires_at"), ACCESS_TOKEN_REFRESH_SKEW_SECONDS):
if (
_is_expiring(state.get("expires_at"), ACCESS_TOKEN_REFRESH_SKEW_SECONDS)
and not _nous_invoke_jwt_is_usable(
access_token,
scope=state.get("scope"),
expires_at=state.get("expires_at"),
)
):
if not isinstance(refresh_token, str) or not refresh_token:
raise AuthError("Session expired and no refresh token is available.",
provider="nous", relogin_required=True)
@ -4320,14 +4549,56 @@ def resolve_nous_runtime_credentials(
# Persist immediately so downstream mint failures cannot drop rotated refresh tokens.
_persist_state("post_refresh_access_expiring")
# Step 2: mint agent key if missing/expiring
# Step 2: resolve the compatibility ``agent_key`` field. Preferred
# path stores the NAS invoke JWT there; legacy path mints/reuses
# the opaque session key.
used_cached_key = False
mint_payload: Optional[Dict[str, Any]] = None
selected_auth_path = "legacy_session_key"
legacy_session_keys = _nous_legacy_session_keys_forced()
if not force_mint and _agent_key_is_usable(state, min_key_ttl_seconds):
if (
not legacy_session_keys
and _nous_invoke_jwt_is_usable(
access_token,
scope=state.get("scope"),
expires_at=state.get("expires_at"),
)
):
_set_nous_agent_key_from_invoke_jwt(state)
selected_auth_path = "invoke_jwt"
logger.info("Nous inference auth: using NAS invoke JWT")
_oauth_trace(
"nous_invoke_jwt_selected",
sequence_id=sequence_id,
access_token_fp=_token_fingerprint(access_token),
)
elif not force_mint and _agent_key_is_usable(state, min_key_ttl_seconds):
used_cached_key = True
selected_auth_path = "legacy_session_key_cache"
logger.info("Nous inference auth: using cached legacy session key")
_oauth_trace("agent_key_reuse", sequence_id=sequence_id)
else:
fallback_reason = (
"forced_legacy_session_keys"
if legacy_session_keys
else _nous_invoke_jwt_unavailable_reason(
access_token,
scope=state.get("scope"),
expires_at=state.get("expires_at"),
)
)
selected_auth_path = "legacy_session_key_mint"
logger.info(
"Nous inference auth: using legacy session key path (%s)",
fallback_reason,
)
_oauth_trace(
"nous_legacy_session_key_selected",
sequence_id=sequence_id,
reason=fallback_reason,
access_token_fp=_token_fingerprint(access_token),
)
try:
_oauth_trace(
"mint_start",
@ -4403,10 +4674,28 @@ def resolve_nous_runtime_credentials(
# Persist retry refresh immediately for crash safety and cross-process visibility.
_persist_state("post_refresh_mint_retry")
mint_payload = _mint_agent_key(
client=client, portal_base_url=portal_base_url,
access_token=access_token, min_ttl_seconds=min_key_ttl_seconds,
)
if (
not legacy_session_keys
and _nous_invoke_jwt_is_usable(
access_token,
scope=state.get("scope"),
expires_at=state.get("expires_at"),
)
):
_set_nous_agent_key_from_invoke_jwt(state)
mint_payload = None
selected_auth_path = "invoke_jwt"
logger.info("Nous inference auth: using NAS invoke JWT")
_oauth_trace(
"nous_invoke_jwt_selected",
sequence_id=sequence_id,
access_token_fp=_token_fingerprint(access_token),
)
else:
mint_payload = _mint_agent_key(
client=client, portal_base_url=portal_base_url,
access_token=access_token, min_ttl_seconds=min_key_ttl_seconds,
)
else:
raise
@ -4438,6 +4727,8 @@ def resolve_nous_runtime_credentials(
_persist_state("resolve_nous_runtime_credentials_final")
_sync_nous_pool_from_auth_store()
api_key = state.get("agent_key")
if not isinstance(api_key, str) or not api_key:
raise AuthError("Failed to resolve a Nous inference API key",
@ -4458,7 +4749,12 @@ def resolve_nous_runtime_credentials(
"key_id": state.get("agent_key_id"),
"expires_at": expires_at,
"expires_in": expires_in,
"source": "cache" if used_cached_key else "portal",
"source": (
"invoke_jwt"
if selected_auth_path == "invoke_jwt"
else ("cache" if used_cached_key else "portal")
),
"auth_path": selected_auth_path,
}
@ -6137,7 +6433,10 @@ def _nous_device_code_login(
or pconfig.inference_base_url
).rstrip("/")
client_id = client_id or pconfig.client_id
explicit_scope = scope is not None
scope = scope or pconfig.scope
if _nous_legacy_session_keys_forced():
scope = NOUS_LEGACY_AGENT_KEY_SCOPE
timeout = httpx.Timeout(timeout_seconds)
verify: bool | str = False if insecure else (ca_bundle if ca_bundle else True)
@ -6152,12 +6451,30 @@ def _nous_device_code_login(
print(f"TLS verification: custom CA bundle ({ca_bundle})")
with httpx.Client(timeout=timeout, headers={"Accept": "application/json"}, verify=verify) as client:
device_data = _request_device_code(
client=client,
portal_base_url=portal_base_url,
client_id=client_id,
scope=scope,
)
try:
device_data = _request_device_code(
client=client,
portal_base_url=portal_base_url,
client_id=client_id,
scope=scope,
)
except Exception as exc:
if (
not explicit_scope
and _nous_scope_has_invoke(scope)
and _is_nous_invoke_scope_refusal(exc)
):
logger.info("Nous inference auth: NAS refused invoke scope, retrying legacy scope")
_oauth_trace("nous_device_code_invoke_scope_refused")
scope = NOUS_LEGACY_AGENT_KEY_SCOPE
device_data = _request_device_code(
client=client,
portal_base_url=portal_base_url,
client_id=client_id,
scope=scope,
)
else:
raise
verification_url = str(device_data["verification_uri_complete"])
user_code = str(device_data["user_code"])
@ -6287,7 +6604,7 @@ def _login_nous(args, pconfig: ProviderConfig) -> None:
portal_base_url=getattr(args, "portal_url", None),
inference_base_url=getattr(args, "inference_url", None),
client_id=getattr(args, "client_id", None) or pconfig.client_id,
scope=getattr(args, "scope", None) or pconfig.scope,
scope=getattr(args, "scope", None),
open_browser=not getattr(args, "no_browser", False),
timeout_seconds=timeout_seconds,
insecure=insecure,
@ -6314,6 +6631,7 @@ def _login_nous(args, pconfig: ProviderConfig) -> None:
# these credentials. Best-effort: any I/O failure is logged and
# swallowed inside the helper.
_write_shared_nous_state(auth_state)
_sync_nous_pool_from_auth_store()
print()
print("Login successful!")

View file

@ -1,12 +1,13 @@
"""Nous Portal upstream adapter.
Reads the user's Nous OAuth state from ``~/.hermes/auth.json``, refreshes
the access token and mints a fresh agent key when needed, and exposes the
upstream base URL plus minted bearer for the proxy server to forward to.
the access token and resolves the ``agent_key`` compatibility credential
when needed, then exposes the upstream base URL plus bearer for the proxy
server to forward to.
The minted ``agent_key`` (not the OAuth ``access_token``) is what
``inference-api.nousresearch.com`` accepts as a bearer. The refresh helper
already handles both see :func:`hermes_cli.auth.refresh_nous_oauth_from_state`.
The ``agent_key`` field may hold either a NAS invoke JWT or the legacy
opaque session key. The refresh helper handles both see
:func:`hermes_cli.auth.refresh_nous_oauth_from_state`.
"""
from __future__ import annotations

View file

@ -875,10 +875,9 @@ def _resolve_explicit_runtime(
explicit_base_url
or str(state.get("inference_base_url") or auth_mod.DEFAULT_NOUS_INFERENCE_URL).strip().rstrip("/")
)
# Only use agent_key for inference — access_token is an OAuth token for the
# portal API (minting keys, refreshing tokens), not for the inference API.
# Falling back to access_token sends an OAuth bearer token to the inference
# endpoint, which returns 404 because it is not a valid inference credential.
# Only use the agent_key compatibility field for inference. It may be
# either a NAS invoke JWT or a legacy opaque session key; raw OAuth
# access_token fallback is handled by resolve_nous_runtime_credentials().
api_key = explicit_api_key or str(state.get("agent_key") or "").strip()
expires_at = state.get("agent_key_expires_at") or state.get("expires_at")
if not api_key:
@ -1069,17 +1068,19 @@ def resolve_runtime_provider(
getattr(entry, "runtime_api_key", None)
or getattr(entry, "access_token", "")
)
# For Nous, the pool entry's runtime_api_key is the agent_key — a
# short-lived inference credential (~30 min TTL). The pool doesn't
# For Nous, the pool entry's runtime_api_key is the agent_key
# compatibility field: either an invoke JWT or legacy opaque key.
# The pool doesn't
# refresh it during selection (that would trigger network calls in
# non-runtime contexts like `hermes auth list`). If the key is
# expired, clear pool_api_key so we fall through to
# resolve_nous_runtime_credentials() which handles refresh + mint.
# resolve_nous_runtime_credentials() which handles refresh + fallback.
if provider == "nous" and entry is not None and pool_api_key:
min_ttl = max(60, int(os.getenv("HERMES_NOUS_MIN_KEY_TTL_SECONDS", "1800")))
nous_state = {
"agent_key": getattr(entry, "agent_key", None),
"agent_key_expires_at": getattr(entry, "agent_key_expires_at", None),
"scope": getattr(entry, "scope", None),
}
if not _agent_key_is_usable(nous_state, min_ttl):
logger.debug("Nous pool entry agent_key expired/missing, falling through to runtime resolution")

View file

@ -673,6 +673,8 @@ class TestGetTextAuxiliaryClient:
def test_custom_endpoint_uses_codex_wrapper_when_runtime_requests_responses_api(self):
with patch("agent.auxiliary_client._resolve_custom_runtime",
return_value=("https://api.openai.com/v1", "sk-test", "codex_responses")), \
patch("agent.auxiliary_client._read_nous_auth", return_value=None), \
patch("agent.auxiliary_client._resolve_nous_runtime_api", return_value=None), \
patch("agent.auxiliary_client._read_main_model", return_value="gpt-5.3-codex"), \
patch("agent.auxiliary_client.OpenAI") as mock_openai:
client, model = get_text_auxiliary_client()

View file

@ -2,8 +2,10 @@
from __future__ import annotations
import base64
import json
import time
from datetime import datetime, timezone
import pytest
@ -14,6 +16,14 @@ def _write_auth_store(tmp_path, payload: dict) -> None:
(hermes_home / "auth.json").write_text(json.dumps(payload, indent=2))
def _jwt_with_claims(claims: dict) -> str:
def _part(payload: dict) -> str:
raw = json.dumps(payload, separators=(",", ":")).encode("utf-8")
return base64.urlsafe_b64encode(raw).decode("ascii").rstrip("=")
return f"{_part({'alg': 'none', 'typ': 'JWT'})}.{_part(claims)}.sig"
def test_fill_first_selection_skips_recently_exhausted_entry(tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes"))
_write_auth_store(
@ -510,6 +520,52 @@ def test_load_pool_migrates_nous_provider_state(tmp_path, monkeypatch):
assert entry.agent_key == "agent-key"
def test_load_pool_mirrors_nous_invoke_jwt_agent_key_runtime_api_key(tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes"))
expires_at = datetime.fromtimestamp(time.time() + 3600, tz=timezone.utc).isoformat()
token = _jwt_with_claims({
"sub": "test-user",
"scope": ["inference:invoke", "inference:mint_agent_key"],
"exp": int(time.time() + 3600),
})
_write_auth_store(
tmp_path,
{
"version": 1,
"active_provider": "nous",
"providers": {
"nous": {
"portal_base_url": "https://portal.example.com",
"inference_base_url": "https://inference.example.com/v1",
"client_id": "hermes-cli",
"token_type": "Bearer",
"scope": "inference:invoke inference:mint_agent_key",
"access_token": token,
"refresh_token": "refresh-token",
"expires_at": expires_at,
"agent_key": token,
"agent_key_expires_at": expires_at,
}
},
},
)
from agent.credential_pool import load_pool
pool = load_pool("nous")
entry = pool.select()
assert entry is not None
assert entry.source == "device_code"
assert entry.agent_key == token
assert entry.runtime_api_key == token
auth_payload = json.loads((tmp_path / "hermes" / "auth.json").read_text())
pool_entry = auth_payload["credential_pool"]["nous"][0]
assert pool_entry["agent_key"] == token
assert pool_entry["agent_key_expires_at"] == expires_at
def test_nous_pool_terminal_refresh_clears_tokens(tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes"))
monkeypatch.setenv("HERMES_SHARED_AUTH_DIR", str(tmp_path / "shared"))

View file

@ -187,6 +187,7 @@ _HERMES_BEHAVIORAL_VARS = frozenset({
"HERMES_BACKGROUND_NOTIFICATIONS",
"HERMES_EXEC_ASK",
"HERMES_HOME_MODE",
"HERMES_AGENT_USE_LEGACY_SESSION_KEYS",
# Kanban path/board pins must never leak from a developer shell or
# dispatched worker into tests; otherwise tests can write fake tasks to
# the real ~/.hermes/kanban.db instead of the per-test HERMES_HOME.

View file

@ -107,7 +107,7 @@ def test_auth_add_nous_oauth_persists_pool_entry(tmp_path, monkeypatch):
"portal_base_url": "https://portal.example.com",
"inference_base_url": "https://inference.example.com/v1",
"client_id": "hermes-cli",
"scope": "inference:mint_agent_key",
"scope": "inference:invoke inference:mint_agent_key",
"token_type": "Bearer",
"access_token": token,
"refresh_token": "refresh-token",
@ -228,7 +228,7 @@ def test_auth_add_nous_oauth_honors_custom_label(tmp_path, monkeypatch):
"portal_base_url": "https://portal.example.com",
"inference_base_url": "https://inference.example.com/v1",
"client_id": "hermes-cli",
"scope": "inference:mint_agent_key",
"scope": "inference:invoke inference:mint_agent_key",
"token_type": "Bearer",
"access_token": token,
"refresh_token": "refresh-token",

View file

@ -1,6 +1,9 @@
"""Regression tests for Nous OAuth refresh + agent-key mint interactions."""
import base64
import json
import logging
import time
from datetime import datetime, timezone
from pathlib import Path
@ -125,6 +128,11 @@ def _setup_nous_auth(
*,
access_token: str = "access-old",
refresh_token: str = "refresh-old",
scope: str = "inference:mint_agent_key",
expires_at: str = "2026-02-01T00:00:00+00:00",
expires_in: int = 0,
agent_key: str | None = None,
agent_key_expires_at: str | None = None,
) -> None:
hermes_home.mkdir(parents=True, exist_ok=True)
auth_store = {
@ -136,15 +144,15 @@ def _setup_nous_auth(
"inference_base_url": "https://inference.example.com/v1",
"client_id": "hermes-cli",
"token_type": "Bearer",
"scope": "inference:mint_agent_key",
"scope": scope,
"access_token": access_token,
"refresh_token": refresh_token,
"obtained_at": "2026-02-01T00:00:00+00:00",
"expires_in": 0,
"expires_at": "2026-02-01T00:00:00+00:00",
"agent_key": None,
"expires_in": expires_in,
"expires_at": expires_at,
"agent_key": agent_key,
"agent_key_id": None,
"agent_key_expires_at": None,
"agent_key_expires_at": agent_key_expires_at,
"agent_key_expires_in": None,
"agent_key_reused": None,
"agent_key_obtained_at": None,
@ -164,6 +172,351 @@ def _mint_payload(api_key: str = "agent-key") -> dict:
}
def _jwt_with_claims(claims: dict) -> str:
def _part(payload: dict) -> str:
raw = json.dumps(payload, separators=(",", ":")).encode("utf-8")
return base64.urlsafe_b64encode(raw).decode("ascii").rstrip("=")
return f"{_part({'alg': 'none', 'typ': 'JWT'})}.{_part(claims)}.sig"
def _future_iso(seconds: int = 3600) -> str:
return datetime.fromtimestamp(time.time() + seconds, tz=timezone.utc).isoformat()
def _invoke_jwt(*, seconds: int = 3600, scope: object = "inference:invoke inference:mint_agent_key") -> str:
return _jwt_with_claims({
"sub": "test-user",
"scope": scope,
"exp": int(time.time() + seconds),
})
def test_resolve_nous_runtime_credentials_prefers_invoke_jwt_and_mirrors(
tmp_path,
monkeypatch,
):
import hermes_cli.auth as auth_mod
hermes_home = tmp_path / "hermes"
token = _invoke_jwt(seconds=3600)
_setup_nous_auth(
hermes_home,
access_token=token,
scope=auth_mod.DEFAULT_NOUS_SCOPE,
expires_at=_future_iso(3600),
expires_in=3600,
)
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
def _unexpected_mint(*args, **kwargs):
raise AssertionError("legacy agent-key mint should not run for invoke JWT")
monkeypatch.setattr(auth_mod, "_mint_agent_key", _unexpected_mint)
creds = auth_mod.resolve_nous_runtime_credentials(min_key_ttl_seconds=300)
assert creds["api_key"] == token
assert creds["source"] == "invoke_jwt"
assert creds["auth_path"] == "invoke_jwt"
payload = json.loads((hermes_home / "auth.json").read_text())
singleton = payload["providers"]["nous"]
assert singleton["agent_key"] == token
assert datetime.fromisoformat(singleton["agent_key_expires_at"]).timestamp() > time.time() + 300
pool_entries = payload["credential_pool"]["nous"]
assert len(pool_entries) == 1
assert pool_entries[0]["agent_key"] == token
assert pool_entries[0]["source"] == auth_mod.NOUS_DEVICE_CODE_SOURCE
def test_resolve_nous_runtime_credentials_trusts_invoke_jwt_exp_over_stale_metadata(
tmp_path,
monkeypatch,
):
import hermes_cli.auth as auth_mod
hermes_home = tmp_path / "hermes"
token = _invoke_jwt(seconds=3600)
_setup_nous_auth(
hermes_home,
access_token=token,
scope=auth_mod.DEFAULT_NOUS_SCOPE,
expires_at="2000-01-01T00:00:00+00:00",
expires_in=0,
agent_key=token,
agent_key_expires_at="2000-01-01T00:00:00+00:00",
)
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
def _unexpected_refresh(*args, **kwargs):
raise AssertionError("valid invoke JWT should not be refreshed because metadata is stale")
def _unexpected_mint(*args, **kwargs):
raise AssertionError("valid invoke JWT should not fall back to legacy mint")
monkeypatch.setattr(auth_mod, "_refresh_access_token", _unexpected_refresh)
monkeypatch.setattr(auth_mod, "_mint_agent_key", _unexpected_mint)
creds = auth_mod.resolve_nous_runtime_credentials(min_key_ttl_seconds=300)
assert creds["api_key"] == token
assert creds["source"] == "invoke_jwt"
payload = json.loads((hermes_home / "auth.json").read_text())
singleton = payload["providers"]["nous"]
assert singleton["agent_key"] == token
assert datetime.fromisoformat(singleton["expires_at"]).timestamp() > time.time() + 300
assert datetime.fromisoformat(singleton["agent_key_expires_at"]).timestamp() > time.time() + 300
def test_resolve_nous_runtime_credentials_does_not_apply_legacy_ttl_to_invoke_jwt(
tmp_path,
monkeypatch,
):
import hermes_cli.auth as auth_mod
hermes_home = tmp_path / "hermes"
token = _invoke_jwt(seconds=900)
_setup_nous_auth(
hermes_home,
access_token=token,
scope=auth_mod.DEFAULT_NOUS_SCOPE,
expires_at=_future_iso(900),
expires_in=900,
)
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
def _unexpected_mint(*args, **kwargs):
raise AssertionError("1800s legacy min TTL should not force opaque mint for invoke JWT")
monkeypatch.setattr(auth_mod, "_mint_agent_key", _unexpected_mint)
creds = auth_mod.resolve_nous_runtime_credentials(min_key_ttl_seconds=1800)
assert creds["api_key"] == token
assert creds["source"] == "invoke_jwt"
payload = json.loads((hermes_home / "auth.json").read_text())
assert payload["providers"]["nous"]["agent_key"] == token
assert payload["credential_pool"]["nous"][0]["agent_key"] == token
def test_resolve_nous_runtime_credentials_falls_back_when_invoke_scope_missing(
tmp_path,
monkeypatch,
):
import hermes_cli.auth as auth_mod
hermes_home = tmp_path / "hermes"
token = _jwt_with_claims({
"sub": "test-user",
"scope": "inference:mint_agent_key",
"exp": int(time.time() + 3600),
})
_setup_nous_auth(
hermes_home,
access_token=token,
scope=auth_mod.NOUS_LEGACY_AGENT_KEY_SCOPE,
expires_at=_future_iso(3600),
expires_in=3600,
)
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
calls = []
def _fake_mint_agent_key(*, client, portal_base_url, access_token, min_ttl_seconds):
del client, portal_base_url, min_ttl_seconds
calls.append(access_token)
return _mint_payload(api_key="opaque-agent-key")
monkeypatch.setattr(auth_mod, "_mint_agent_key", _fake_mint_agent_key)
creds = auth_mod.resolve_nous_runtime_credentials(min_key_ttl_seconds=300)
assert calls == [token]
assert creds["api_key"] == "opaque-agent-key"
assert creds["source"] == "portal"
payload = json.loads((hermes_home / "auth.json").read_text())
assert payload["providers"]["nous"]["agent_key"] == "opaque-agent-key"
assert payload["credential_pool"]["nous"][0]["agent_key"] == "opaque-agent-key"
def test_nous_device_code_login_retries_legacy_scope_when_invoke_refused(monkeypatch):
import hermes_cli.auth as auth_mod
scopes = []
def _fake_request_device_code(*, client, portal_base_url, client_id, scope):
del client, portal_base_url, client_id
scopes.append(scope)
if len(scopes) == 1:
request = httpx.Request("POST", "https://portal.example.com/api/oauth/device/code")
response = httpx.Response(
400,
json={
"error": "invalid_scope",
"error_description": "unsupported inference:invoke",
},
request=request,
)
raise httpx.HTTPStatusError("invalid_scope", request=request, response=response)
return {
"device_code": "device",
"user_code": "user",
"verification_uri": "https://portal.example.com/device",
"verification_uri_complete": "https://portal.example.com/device?code=user",
"expires_in": 600,
"interval": 1,
}
def _fake_poll_for_token(**kwargs):
del kwargs
return {
"access_token": "access-legacy",
"refresh_token": "refresh-legacy",
"expires_in": 900,
"scope": auth_mod.NOUS_LEGACY_AGENT_KEY_SCOPE,
}
def _fake_refresh(state, **kwargs):
del kwargs
refreshed = dict(state)
refreshed["agent_key"] = "opaque-agent-key"
refreshed["agent_key_expires_at"] = _future_iso(1800)
return refreshed
monkeypatch.setattr(auth_mod, "_request_device_code", _fake_request_device_code)
monkeypatch.setattr(auth_mod, "_poll_for_token", _fake_poll_for_token)
monkeypatch.setattr(auth_mod, "refresh_nous_oauth_from_state", _fake_refresh)
result = auth_mod._nous_device_code_login(
portal_base_url="https://portal.example.com",
inference_base_url="https://inference.example.com/v1",
open_browser=False,
timeout_seconds=1,
)
assert scopes == [auth_mod.DEFAULT_NOUS_SCOPE, auth_mod.NOUS_LEGACY_AGENT_KEY_SCOPE]
assert result["scope"] == auth_mod.NOUS_LEGACY_AGENT_KEY_SCOPE
assert result["agent_key"] == "opaque-agent-key"
def test_forced_legacy_env_skips_invoke_scope_and_jwt_storage(tmp_path, monkeypatch):
import hermes_cli.auth as auth_mod
hermes_home = tmp_path / "hermes"
token = _invoke_jwt(seconds=3600)
_setup_nous_auth(
hermes_home,
access_token=token,
scope=auth_mod.DEFAULT_NOUS_SCOPE,
expires_at=_future_iso(3600),
expires_in=3600,
)
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
monkeypatch.setenv(auth_mod.NOUS_LEGACY_SESSION_KEYS_ENV, "true")
mint_calls = []
def _fake_mint_agent_key(*, client, portal_base_url, access_token, min_ttl_seconds):
del client, portal_base_url, min_ttl_seconds
mint_calls.append(access_token)
return _mint_payload(api_key="forced-legacy-key")
monkeypatch.setattr(auth_mod, "_mint_agent_key", _fake_mint_agent_key)
creds = auth_mod.resolve_nous_runtime_credentials(min_key_ttl_seconds=300)
assert mint_calls == [token]
assert creds["api_key"] == "forced-legacy-key"
payload = json.loads((hermes_home / "auth.json").read_text())
assert payload["providers"]["nous"]["agent_key"] == "forced-legacy-key"
requested_scopes = []
def _fake_request_device_code(*, client, portal_base_url, client_id, scope):
del client, portal_base_url, client_id
requested_scopes.append(scope)
return {
"device_code": "device",
"user_code": "user",
"verification_uri": "https://portal.example.com/device",
"verification_uri_complete": "https://portal.example.com/device?code=user",
"expires_in": 600,
"interval": 1,
}
def _fake_poll_for_token(**kwargs):
del kwargs
return {
"access_token": "access-legacy",
"refresh_token": "refresh-legacy",
"expires_in": 900,
"scope": auth_mod.NOUS_LEGACY_AGENT_KEY_SCOPE,
}
def _fake_refresh(state, **kwargs):
del kwargs
refreshed = dict(state)
refreshed["agent_key"] = "forced-legacy-login-key"
refreshed["agent_key_expires_at"] = _future_iso(1800)
return refreshed
monkeypatch.setattr(auth_mod, "_request_device_code", _fake_request_device_code)
monkeypatch.setattr(auth_mod, "_poll_for_token", _fake_poll_for_token)
monkeypatch.setattr(auth_mod, "refresh_nous_oauth_from_state", _fake_refresh)
auth_mod._nous_device_code_login(
portal_base_url="https://portal.example.com",
inference_base_url="https://inference.example.com/v1",
open_browser=False,
timeout_seconds=1,
)
assert requested_scopes == [auth_mod.NOUS_LEGACY_AGENT_KEY_SCOPE]
def test_nous_inference_auth_logs_do_not_include_secret_values(
tmp_path,
monkeypatch,
caplog,
):
import hermes_cli.auth as auth_mod
hermes_home = tmp_path / "hermes"
token = _jwt_with_claims({
"sub": "secret-user",
"scope": "inference:mint_agent_key",
"exp": int(time.time() + 3600),
})
refresh_token = "refresh-secret-token"
opaque_key = "opaque-secret-agent-key"
_setup_nous_auth(
hermes_home,
access_token=token,
refresh_token=refresh_token,
scope=auth_mod.NOUS_LEGACY_AGENT_KEY_SCOPE,
expires_at=_future_iso(3600),
expires_in=3600,
)
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
def _fake_mint_agent_key(*, client, portal_base_url, access_token, min_ttl_seconds):
del client, portal_base_url, access_token, min_ttl_seconds
return _mint_payload(api_key=opaque_key)
monkeypatch.setattr(auth_mod, "_mint_agent_key", _fake_mint_agent_key)
caplog.set_level(logging.INFO, logger="hermes_cli.auth")
auth_mod.resolve_nous_runtime_credentials(min_key_ttl_seconds=300)
logged = caplog.text
assert "legacy session key path" in logged
assert token not in logged
assert refresh_token not in logged
assert opaque_key not in logged
def test_get_nous_auth_status_checks_credential_pool(tmp_path, monkeypatch):
"""get_nous_auth_status() should find Nous credentials in the pool
even when the auth store has no Nous provider entry this is the