diff --git a/hermes_cli/auth.py b/hermes_cli/auth.py index 207dc719f..ed6b8f462 100644 --- a/hermes_cli/auth.py +++ b/hermes_cli/auth.py @@ -2519,59 +2519,116 @@ def resolve_nous_runtime_credentials( # Status helpers # ============================================================================= -def get_nous_auth_status() -> Dict[str, Any]: - """Status snapshot for `hermes status` output. +def _empty_nous_auth_status() -> Dict[str, Any]: + return { + "logged_in": False, + "portal_base_url": None, + "inference_base_url": None, + "access_expires_at": None, + "agent_key_expires_at": None, + "has_refresh_token": False, + } - Checks the credential pool first (where the dashboard device-code flow - and ``hermes auth`` store credentials), then falls back to the legacy - auth-store provider state. + +def _snapshot_nous_pool_status() -> Dict[str, Any]: + """Best-effort status from the credential pool. + + This is a fallback only. The auth-store provider state is the runtime source + of truth because it is what ``resolve_nous_runtime_credentials()`` refreshes + and mints against. """ - # Check credential pool first — the dashboard device-code flow saves - # here but may not have written to the auth store yet. try: from agent.credential_pool import load_pool - pool = load_pool("nous") - if pool and pool.has_credentials(): - entry = pool.select() - if entry is not None: - access_token = ( - getattr(entry, "access_token", None) - or getattr(entry, "runtime_api_key", "") - ) - if access_token: - return { - "logged_in": True, - "portal_base_url": getattr(entry, "portal_base_url", None) - or getattr(entry, "base_url", None), - "inference_base_url": getattr(entry, "inference_base_url", None) - or getattr(entry, "base_url", None), - "access_token": access_token, - "access_expires_at": getattr(entry, "expires_at", None), - "agent_key_expires_at": getattr(entry, "agent_key_expires_at", None), - "has_refresh_token": bool(getattr(entry, "refresh_token", None)), - } - except Exception: - pass - # Fall back to auth-store provider state - state = get_provider_auth_state("nous") - if not state: + pool = load_pool("nous") + if not pool or not pool.has_credentials(): + return _empty_nous_auth_status() + + entries = list(pool.entries()) + if not entries: + return _empty_nous_auth_status() + + def _entry_sort_key(entry: Any) -> tuple[float, float, int]: + agent_exp = _parse_iso_timestamp(getattr(entry, "agent_key_expires_at", None)) or 0.0 + access_exp = _parse_iso_timestamp(getattr(entry, "expires_at", None)) or 0.0 + priority = int(getattr(entry, "priority", 0) or 0) + return (agent_exp, access_exp, -priority) + + entry = max(entries, key=_entry_sort_key) + access_token = ( + getattr(entry, "access_token", None) + or getattr(entry, "runtime_api_key", "") + ) + if not access_token: + return _empty_nous_auth_status() + return { - "logged_in": False, - "portal_base_url": None, - "inference_base_url": None, - "access_expires_at": None, - "agent_key_expires_at": None, - "has_refresh_token": False, + "logged_in": True, + "portal_base_url": getattr(entry, "portal_base_url", None) + or getattr(entry, "base_url", None), + "inference_base_url": getattr(entry, "inference_base_url", None) + or getattr(entry, "base_url", None), + "access_token": access_token, + "access_expires_at": getattr(entry, "expires_at", None), + "agent_key_expires_at": getattr(entry, "agent_key_expires_at", None), + "has_refresh_token": bool(getattr(entry, "refresh_token", None)), + "source": f"pool:{getattr(entry, 'label', 'unknown')}", } - return { - "logged_in": bool(state.get("access_token")), - "portal_base_url": state.get("portal_base_url"), - "inference_base_url": state.get("inference_base_url"), - "access_expires_at": state.get("expires_at"), - "agent_key_expires_at": state.get("agent_key_expires_at"), - "has_refresh_token": bool(state.get("refresh_token")), - } + except Exception: + return _empty_nous_auth_status() + + +def get_nous_auth_status() -> Dict[str, Any]: + """Status snapshot for Nous auth. + + Prefer the auth-store provider state, because that is the live source of + truth for refresh + mint operations. When provider state exists, validate it + by resolving runtime credentials so revoked refresh sessions do not show up + as a healthy login. If provider state is absent, fall back to the credential + pool for the just-logged-in / not-yet-promoted case. + """ + state = get_provider_auth_state("nous") + if state: + base_status = { + "logged_in": bool(state.get("access_token")), + "portal_base_url": state.get("portal_base_url"), + "inference_base_url": state.get("inference_base_url"), + "access_expires_at": state.get("expires_at"), + "agent_key_expires_at": state.get("agent_key_expires_at"), + "has_refresh_token": bool(state.get("refresh_token")), + "access_token": state.get("access_token"), + "source": "auth_store", + } + try: + creds = resolve_nous_runtime_credentials(min_key_ttl_seconds=60) + refreshed_state = get_provider_auth_state("nous") or state + base_status.update( + { + "logged_in": True, + "portal_base_url": refreshed_state.get("portal_base_url") or base_status.get("portal_base_url"), + "inference_base_url": creds.get("base_url") + or refreshed_state.get("inference_base_url") + or base_status.get("inference_base_url"), + "access_expires_at": refreshed_state.get("expires_at") or base_status.get("access_expires_at"), + "agent_key_expires_at": creds.get("expires_at") + or refreshed_state.get("agent_key_expires_at") + or base_status.get("agent_key_expires_at"), + "has_refresh_token": bool(refreshed_state.get("refresh_token")), + "source": f"runtime:{creds.get('source', 'portal')}", + "key_id": creds.get("key_id"), + } + ) + return base_status + except AuthError as exc: + base_status.update({ + "logged_in": False, + "error": str(exc), + "relogin_required": bool(getattr(exc, "relogin_required", False)), + "error_code": getattr(exc, "code", None), + }) + return base_status + + return _snapshot_nous_pool_status() def get_codex_auth_status() -> Dict[str, Any]: diff --git a/hermes_cli/status.py b/hermes_cli/status.py index 8541f0a05..d07e1a822 100644 --- a/hermes_cli/status.py +++ b/hermes_cli/status.py @@ -164,19 +164,26 @@ def show_status(args): qwen_status = {} nous_logged_in = bool(nous_status.get("logged_in")) + nous_error = nous_status.get("error") + nous_label = "logged in" if nous_logged_in else "not logged in (run: hermes auth add nous --type oauth)" print( f" {'Nous Portal':<12} {check_mark(nous_logged_in)} " - f"{'logged in' if nous_logged_in else 'not logged in (run: hermes model)'}" + f"{nous_label}" ) - if nous_logged_in: - portal_url = nous_status.get("portal_base_url") or "(unknown)" - access_exp = _format_iso_timestamp(nous_status.get("access_expires_at")) - key_exp = _format_iso_timestamp(nous_status.get("agent_key_expires_at")) - refresh_label = "yes" if nous_status.get("has_refresh_token") else "no" + portal_url = nous_status.get("portal_base_url") or "(unknown)" + access_exp = _format_iso_timestamp(nous_status.get("access_expires_at")) + key_exp = _format_iso_timestamp(nous_status.get("agent_key_expires_at")) + refresh_label = "yes" if nous_status.get("has_refresh_token") else "no" + if nous_logged_in or portal_url != "(unknown)" or nous_error: print(f" Portal URL: {portal_url}") + if nous_logged_in or nous_status.get("access_expires_at"): print(f" Access exp: {access_exp}") + if nous_logged_in or nous_status.get("agent_key_expires_at"): print(f" Key exp: {key_exp}") + if nous_logged_in or nous_status.get("has_refresh_token"): print(f" Refresh: {refresh_label}") + if nous_error and not nous_logged_in: + print(f" Error: {nous_error}") codex_logged_in = bool(codex_status.get("logged_in")) print( diff --git a/tests/hermes_cli/test_auth_nous_provider.py b/tests/hermes_cli/test_auth_nous_provider.py index cf74e97f5..75221b16a 100644 --- a/tests/hermes_cli/test_auth_nous_provider.py +++ b/tests/hermes_cli/test_auth_nous_provider.py @@ -198,12 +198,82 @@ def test_get_nous_auth_status_auth_store_fallback(tmp_path, monkeypatch): hermes_home = tmp_path / "hermes" _setup_nous_auth(hermes_home, access_token="at-123") monkeypatch.setenv("HERMES_HOME", str(hermes_home)) + monkeypatch.setattr( + "hermes_cli.auth.resolve_nous_runtime_credentials", + lambda min_key_ttl_seconds=60: { + "base_url": "https://inference.example.com/v1", + "expires_at": "2099-01-01T00:00:00+00:00", + "key_id": "key-1", + "source": "cache", + }, + ) status = get_nous_auth_status() assert status["logged_in"] is True assert status["portal_base_url"] == "https://portal.example.com" +def test_get_nous_auth_status_prefers_runtime_auth_store_over_stale_pool(tmp_path, monkeypatch): + from hermes_cli.auth import get_nous_auth_status + from agent.credential_pool import PooledCredential, load_pool + + hermes_home = tmp_path / "hermes" + _setup_nous_auth(hermes_home, access_token="at-fresh") + monkeypatch.setenv("HERMES_HOME", str(hermes_home)) + + pool = load_pool("nous") + stale = PooledCredential.from_dict("nous", { + "access_token": "at-stale", + "refresh_token": "rt-stale", + "portal_base_url": "https://portal.stale.example.com", + "inference_base_url": "https://inference.stale.example.com/v1", + "agent_key": "agent-stale", + "agent_key_expires_at": "2020-01-01T00:00:00+00:00", + "expires_at": "2020-01-01T00:00:00+00:00", + "label": "dashboard device_code", + "auth_type": "oauth", + "source": "manual:dashboard_device_code", + "base_url": "https://inference.stale.example.com/v1", + "priority": 0, + }) + pool.add_entry(stale) + + monkeypatch.setattr( + "hermes_cli.auth.resolve_nous_runtime_credentials", + lambda min_key_ttl_seconds=60: { + "base_url": "https://inference.example.com/v1", + "expires_at": "2099-01-01T00:00:00+00:00", + "key_id": "key-fresh", + "source": "portal", + }, + ) + + status = get_nous_auth_status() + assert status["logged_in"] is True + assert status["portal_base_url"] == "https://portal.example.com" + assert status["inference_base_url"] == "https://inference.example.com/v1" + assert status["source"] == "runtime:portal" + + +def test_get_nous_auth_status_reports_revoked_refresh_session(tmp_path, monkeypatch): + from hermes_cli.auth import get_nous_auth_status + + hermes_home = tmp_path / "hermes" + _setup_nous_auth(hermes_home, access_token="at-123") + monkeypatch.setenv("HERMES_HOME", str(hermes_home)) + + def _boom(min_key_ttl_seconds=60): + raise AuthError("Refresh session has been revoked", provider="nous", relogin_required=True) + + monkeypatch.setattr("hermes_cli.auth.resolve_nous_runtime_credentials", _boom) + + status = get_nous_auth_status() + assert status["logged_in"] is False + assert status["relogin_required"] is True + assert "revoked" in status["error"].lower() + assert status["portal_base_url"] == "https://portal.example.com" + + def test_get_nous_auth_status_empty_returns_not_logged_in(tmp_path, monkeypatch): """get_nous_auth_status() returns logged_in=False when both pool and auth store are empty. diff --git a/tests/hermes_cli/test_status.py b/tests/hermes_cli/test_status.py index c24b72dd4..216687660 100644 --- a/tests/hermes_cli/test_status.py +++ b/tests/hermes_cli/test_status.py @@ -42,3 +42,40 @@ def test_show_status_termux_gateway_section_skips_systemctl(monkeypatch, capsys, assert "Manager: Termux / manual process" in output assert "Start with: hermes gateway" in output assert "systemd (user)" not in output + + +def test_show_status_reports_nous_auth_error(monkeypatch, capsys, tmp_path): + from hermes_cli import status as status_mod + import hermes_cli.auth as auth_mod + import hermes_cli.gateway as gateway_mod + + monkeypatch.setattr(status_mod, "get_env_path", lambda: tmp_path / ".env", raising=False) + monkeypatch.setattr(status_mod, "get_hermes_home", lambda: tmp_path, raising=False) + monkeypatch.setattr(status_mod, "load_config", lambda: {"model": "gpt-5.4"}, raising=False) + monkeypatch.setattr(status_mod, "resolve_requested_provider", lambda requested=None: "openai-codex", raising=False) + monkeypatch.setattr(status_mod, "resolve_provider", lambda requested=None, **kwargs: "openai-codex", raising=False) + monkeypatch.setattr(status_mod, "provider_label", lambda provider: "OpenAI Codex", raising=False) + monkeypatch.setattr( + auth_mod, + "get_nous_auth_status", + lambda: { + "logged_in": False, + "portal_base_url": "https://portal.nousresearch.com", + "access_expires_at": "2026-04-20T01:00:51+00:00", + "agent_key_expires_at": "2026-04-20T04:54:24+00:00", + "has_refresh_token": True, + "error": "Refresh session has been revoked", + }, + raising=False, + ) + monkeypatch.setattr(auth_mod, "get_codex_auth_status", lambda: {}, raising=False) + monkeypatch.setattr(auth_mod, "get_qwen_auth_status", lambda: {}, raising=False) + monkeypatch.setattr(gateway_mod, "find_gateway_pids", lambda exclude_pids=None: [], raising=False) + + status_mod.show_status(SimpleNamespace(all=False, deep=False)) + + output = capsys.readouterr().out + assert "Nous Portal ✗ not logged in (run: hermes auth add nous --type oauth)" in output + assert "Error: Refresh session has been revoked" in output + assert "Access exp:" in output + assert "Key exp:" in output