fix: validate nous auth status against runtime credentials

This commit is contained in:
Michael Steuer 2026-04-23 19:47:35 +02:00 committed by Teknium
parent 1fc77f995b
commit cd221080ec
4 changed files with 223 additions and 52 deletions

View file

@ -2519,59 +2519,116 @@ def resolve_nous_runtime_credentials(
# Status helpers
# =============================================================================
def get_nous_auth_status() -> Dict[str, Any]:
"""Status snapshot for `hermes status` output.
def _empty_nous_auth_status() -> Dict[str, Any]:
return {
"logged_in": False,
"portal_base_url": None,
"inference_base_url": None,
"access_expires_at": None,
"agent_key_expires_at": None,
"has_refresh_token": False,
}
Checks the credential pool first (where the dashboard device-code flow
and ``hermes auth`` store credentials), then falls back to the legacy
auth-store provider state.
def _snapshot_nous_pool_status() -> Dict[str, Any]:
"""Best-effort status from the credential pool.
This is a fallback only. The auth-store provider state is the runtime source
of truth because it is what ``resolve_nous_runtime_credentials()`` refreshes
and mints against.
"""
# Check credential pool first — the dashboard device-code flow saves
# here but may not have written to the auth store yet.
try:
from agent.credential_pool import load_pool
pool = load_pool("nous")
if pool and pool.has_credentials():
entry = pool.select()
if entry is not None:
access_token = (
getattr(entry, "access_token", None)
or getattr(entry, "runtime_api_key", "")
)
if access_token:
return {
"logged_in": True,
"portal_base_url": getattr(entry, "portal_base_url", None)
or getattr(entry, "base_url", None),
"inference_base_url": getattr(entry, "inference_base_url", None)
or getattr(entry, "base_url", None),
"access_token": access_token,
"access_expires_at": getattr(entry, "expires_at", None),
"agent_key_expires_at": getattr(entry, "agent_key_expires_at", None),
"has_refresh_token": bool(getattr(entry, "refresh_token", None)),
}
except Exception:
pass
# Fall back to auth-store provider state
state = get_provider_auth_state("nous")
if not state:
pool = load_pool("nous")
if not pool or not pool.has_credentials():
return _empty_nous_auth_status()
entries = list(pool.entries())
if not entries:
return _empty_nous_auth_status()
def _entry_sort_key(entry: Any) -> tuple[float, float, int]:
agent_exp = _parse_iso_timestamp(getattr(entry, "agent_key_expires_at", None)) or 0.0
access_exp = _parse_iso_timestamp(getattr(entry, "expires_at", None)) or 0.0
priority = int(getattr(entry, "priority", 0) or 0)
return (agent_exp, access_exp, -priority)
entry = max(entries, key=_entry_sort_key)
access_token = (
getattr(entry, "access_token", None)
or getattr(entry, "runtime_api_key", "")
)
if not access_token:
return _empty_nous_auth_status()
return {
"logged_in": False,
"portal_base_url": None,
"inference_base_url": None,
"access_expires_at": None,
"agent_key_expires_at": None,
"has_refresh_token": False,
"logged_in": True,
"portal_base_url": getattr(entry, "portal_base_url", None)
or getattr(entry, "base_url", None),
"inference_base_url": getattr(entry, "inference_base_url", None)
or getattr(entry, "base_url", None),
"access_token": access_token,
"access_expires_at": getattr(entry, "expires_at", None),
"agent_key_expires_at": getattr(entry, "agent_key_expires_at", None),
"has_refresh_token": bool(getattr(entry, "refresh_token", None)),
"source": f"pool:{getattr(entry, 'label', 'unknown')}",
}
return {
"logged_in": bool(state.get("access_token")),
"portal_base_url": state.get("portal_base_url"),
"inference_base_url": state.get("inference_base_url"),
"access_expires_at": state.get("expires_at"),
"agent_key_expires_at": state.get("agent_key_expires_at"),
"has_refresh_token": bool(state.get("refresh_token")),
}
except Exception:
return _empty_nous_auth_status()
def get_nous_auth_status() -> Dict[str, Any]:
"""Status snapshot for Nous auth.
Prefer the auth-store provider state, because that is the live source of
truth for refresh + mint operations. When provider state exists, validate it
by resolving runtime credentials so revoked refresh sessions do not show up
as a healthy login. If provider state is absent, fall back to the credential
pool for the just-logged-in / not-yet-promoted case.
"""
state = get_provider_auth_state("nous")
if state:
base_status = {
"logged_in": bool(state.get("access_token")),
"portal_base_url": state.get("portal_base_url"),
"inference_base_url": state.get("inference_base_url"),
"access_expires_at": state.get("expires_at"),
"agent_key_expires_at": state.get("agent_key_expires_at"),
"has_refresh_token": bool(state.get("refresh_token")),
"access_token": state.get("access_token"),
"source": "auth_store",
}
try:
creds = resolve_nous_runtime_credentials(min_key_ttl_seconds=60)
refreshed_state = get_provider_auth_state("nous") or state
base_status.update(
{
"logged_in": True,
"portal_base_url": refreshed_state.get("portal_base_url") or base_status.get("portal_base_url"),
"inference_base_url": creds.get("base_url")
or refreshed_state.get("inference_base_url")
or base_status.get("inference_base_url"),
"access_expires_at": refreshed_state.get("expires_at") or base_status.get("access_expires_at"),
"agent_key_expires_at": creds.get("expires_at")
or refreshed_state.get("agent_key_expires_at")
or base_status.get("agent_key_expires_at"),
"has_refresh_token": bool(refreshed_state.get("refresh_token")),
"source": f"runtime:{creds.get('source', 'portal')}",
"key_id": creds.get("key_id"),
}
)
return base_status
except AuthError as exc:
base_status.update({
"logged_in": False,
"error": str(exc),
"relogin_required": bool(getattr(exc, "relogin_required", False)),
"error_code": getattr(exc, "code", None),
})
return base_status
return _snapshot_nous_pool_status()
def get_codex_auth_status() -> Dict[str, Any]:

View file

@ -164,19 +164,26 @@ def show_status(args):
qwen_status = {}
nous_logged_in = bool(nous_status.get("logged_in"))
nous_error = nous_status.get("error")
nous_label = "logged in" if nous_logged_in else "not logged in (run: hermes auth add nous --type oauth)"
print(
f" {'Nous Portal':<12} {check_mark(nous_logged_in)} "
f"{'logged in' if nous_logged_in else 'not logged in (run: hermes model)'}"
f"{nous_label}"
)
if nous_logged_in:
portal_url = nous_status.get("portal_base_url") or "(unknown)"
access_exp = _format_iso_timestamp(nous_status.get("access_expires_at"))
key_exp = _format_iso_timestamp(nous_status.get("agent_key_expires_at"))
refresh_label = "yes" if nous_status.get("has_refresh_token") else "no"
portal_url = nous_status.get("portal_base_url") or "(unknown)"
access_exp = _format_iso_timestamp(nous_status.get("access_expires_at"))
key_exp = _format_iso_timestamp(nous_status.get("agent_key_expires_at"))
refresh_label = "yes" if nous_status.get("has_refresh_token") else "no"
if nous_logged_in or portal_url != "(unknown)" or nous_error:
print(f" Portal URL: {portal_url}")
if nous_logged_in or nous_status.get("access_expires_at"):
print(f" Access exp: {access_exp}")
if nous_logged_in or nous_status.get("agent_key_expires_at"):
print(f" Key exp: {key_exp}")
if nous_logged_in or nous_status.get("has_refresh_token"):
print(f" Refresh: {refresh_label}")
if nous_error and not nous_logged_in:
print(f" Error: {nous_error}")
codex_logged_in = bool(codex_status.get("logged_in"))
print(