hermes-agent/tests/hermes_cli/test_auth_nous_provider.py
2026-05-07 05:07:06 -07:00

1265 lines
49 KiB
Python

"""Regression tests for Nous OAuth refresh + agent-key mint interactions."""
import json
import os
from datetime import datetime, timezone
from pathlib import Path
import httpx
import pytest
from hermes_cli.auth import AuthError, get_provider_auth_state, resolve_nous_runtime_credentials
# =============================================================================
# _resolve_verify: CA bundle path validation
# =============================================================================
class TestResolveVerifyFallback:
"""Verify _resolve_verify falls back to True when CA bundle path doesn't exist."""
@pytest.fixture(autouse=True)
def _pin_platform_to_linux(self, monkeypatch):
"""Pin sys.platform so the macOS certifi fallback doesn't alter the
generic "default trust" return value asserted by these tests."""
monkeypatch.setattr("sys.platform", "linux")
def test_missing_ca_bundle_in_auth_state_falls_back(self):
from hermes_cli.auth import _resolve_verify
result = _resolve_verify(auth_state={
"tls": {"insecure": False, "ca_bundle": "/nonexistent/ca-bundle.pem"},
})
assert result is True
def test_valid_ca_bundle_in_auth_state_is_returned(self, tmp_path, monkeypatch):
import ssl
from hermes_cli.auth import _resolve_verify
ca_file = tmp_path / "ca-bundle.pem"
ca_file.write_text("fake cert")
# Avoid loading actual PEM — just verify the return type
mock_ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
monkeypatch.setattr(ssl, "create_default_context", lambda **kw: mock_ctx)
result = _resolve_verify(auth_state={
"tls": {"insecure": False, "ca_bundle": str(ca_file)},
})
assert isinstance(result, ssl.SSLContext), (
f"Expected ssl.SSLContext but got {type(result).__name__}: {result!r}"
)
def test_missing_ssl_cert_file_env_falls_back(self, monkeypatch):
from hermes_cli.auth import _resolve_verify
monkeypatch.setenv("SSL_CERT_FILE", "/nonexistent/ssl-cert.pem")
monkeypatch.delenv("HERMES_CA_BUNDLE", raising=False)
result = _resolve_verify(auth_state={"tls": {}})
assert result is True
def test_missing_hermes_ca_bundle_env_falls_back(self, monkeypatch):
from hermes_cli.auth import _resolve_verify
monkeypatch.setenv("HERMES_CA_BUNDLE", "/nonexistent/hermes-ca.pem")
monkeypatch.delenv("SSL_CERT_FILE", raising=False)
result = _resolve_verify(auth_state={"tls": {}})
assert result is True
def test_insecure_takes_precedence_over_missing_ca(self):
from hermes_cli.auth import _resolve_verify
result = _resolve_verify(
insecure=True,
auth_state={"tls": {"ca_bundle": "/nonexistent/ca.pem"}},
)
assert result is False
def test_string_false_in_auth_state_does_not_disable_tls_verify(self):
import ssl
from hermes_cli.auth import _resolve_verify
result = _resolve_verify(auth_state={"tls": {"insecure": "false"}})
assert result is not False
assert result is True or isinstance(result, ssl.SSLContext)
def test_string_true_in_auth_state_disables_tls_verify(self):
from hermes_cli.auth import _resolve_verify
result = _resolve_verify(auth_state={"tls": {"insecure": "true"}})
assert result is False
def test_no_ca_bundle_returns_true(self, monkeypatch):
from hermes_cli.auth import _resolve_verify
monkeypatch.delenv("HERMES_CA_BUNDLE", raising=False)
monkeypatch.delenv("SSL_CERT_FILE", raising=False)
result = _resolve_verify(auth_state={"tls": {}})
assert result is True
def test_explicit_ca_bundle_param_missing_falls_back(self):
from hermes_cli.auth import _resolve_verify
result = _resolve_verify(ca_bundle="/nonexistent/explicit-ca.pem")
assert result is True
def test_explicit_ca_bundle_param_valid_is_returned(self, tmp_path, monkeypatch):
import ssl
from hermes_cli.auth import _resolve_verify
ca_file = tmp_path / "explicit-ca.pem"
ca_file.write_text("fake cert")
# Avoid loading actual PEM — just verify the return type
mock_ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
monkeypatch.setattr(ssl, "create_default_context", lambda **kw: mock_ctx)
result = _resolve_verify(ca_bundle=str(ca_file))
assert isinstance(result, ssl.SSLContext), (
f"Expected ssl.SSLContext but got {type(result).__name__}: {result!r}"
)
def _setup_nous_auth(
hermes_home: Path,
*,
access_token: str = "access-old",
refresh_token: str = "refresh-old",
) -> None:
hermes_home.mkdir(parents=True, exist_ok=True)
auth_store = {
"version": 1,
"active_provider": "nous",
"providers": {
"nous": {
"portal_base_url": "https://portal.example.com",
"inference_base_url": "https://inference.example.com/v1",
"client_id": "hermes-cli",
"token_type": "Bearer",
"scope": "inference:mint_agent_key",
"access_token": access_token,
"refresh_token": refresh_token,
"obtained_at": "2026-02-01T00:00:00+00:00",
"expires_in": 0,
"expires_at": "2026-02-01T00:00:00+00:00",
"agent_key": None,
"agent_key_id": None,
"agent_key_expires_at": None,
"agent_key_expires_in": None,
"agent_key_reused": None,
"agent_key_obtained_at": None,
}
},
}
(hermes_home / "auth.json").write_text(json.dumps(auth_store, indent=2))
def _mint_payload(api_key: str = "agent-key") -> dict:
return {
"api_key": api_key,
"key_id": "key-id-1",
"expires_at": datetime.now(timezone.utc).isoformat(),
"expires_in": 1800,
"reused": False,
}
def test_get_nous_auth_status_checks_credential_pool(tmp_path, monkeypatch):
"""get_nous_auth_status() should find Nous credentials in the pool
even when the auth store has no Nous provider entry — this is the
case when login happened via the dashboard device-code flow which
saves to the pool only.
"""
from hermes_cli.auth import get_nous_auth_status
hermes_home = tmp_path / "hermes"
hermes_home.mkdir(parents=True, exist_ok=True)
# Empty auth store — no Nous provider entry
(hermes_home / "auth.json").write_text(json.dumps({
"version": 1, "providers": {},
}))
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
# Seed the credential pool with a Nous entry
from agent.credential_pool import PooledCredential, load_pool
pool = load_pool("nous")
entry = PooledCredential.from_dict("nous", {
"access_token": "test-access-token",
"refresh_token": "test-refresh-token",
"portal_base_url": "https://portal.example.com",
"inference_base_url": "https://inference.example.com/v1",
"agent_key": "test-agent-key",
"agent_key_expires_at": "2099-01-01T00:00:00+00:00",
"label": "dashboard device_code",
"auth_type": "oauth",
"source": "manual:dashboard_device_code",
"base_url": "https://inference.example.com/v1",
})
pool.add_entry(entry)
status = get_nous_auth_status()
assert status["logged_in"] is True
assert "example.com" in str(status.get("portal_base_url", ""))
def test_get_nous_auth_status_auth_store_fallback(tmp_path, monkeypatch):
"""get_nous_auth_status() falls back to auth store when credential
pool is empty.
"""
from hermes_cli.auth import get_nous_auth_status
hermes_home = tmp_path / "hermes"
_setup_nous_auth(hermes_home, access_token="at-123")
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
monkeypatch.setattr(
"hermes_cli.auth.resolve_nous_runtime_credentials",
lambda min_key_ttl_seconds=60: {
"base_url": "https://inference.example.com/v1",
"expires_at": "2099-01-01T00:00:00+00:00",
"key_id": "key-1",
"source": "cache",
},
)
status = get_nous_auth_status()
assert status["logged_in"] is True
assert status["portal_base_url"] == "https://portal.example.com"
def test_get_nous_auth_status_prefers_runtime_auth_store_over_stale_pool(tmp_path, monkeypatch):
from hermes_cli.auth import get_nous_auth_status
from agent.credential_pool import PooledCredential, load_pool
hermes_home = tmp_path / "hermes"
_setup_nous_auth(hermes_home, access_token="at-fresh")
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
pool = load_pool("nous")
stale = PooledCredential.from_dict("nous", {
"access_token": "at-stale",
"refresh_token": "rt-stale",
"portal_base_url": "https://portal.stale.example.com",
"inference_base_url": "https://inference.stale.example.com/v1",
"agent_key": "agent-stale",
"agent_key_expires_at": "2020-01-01T00:00:00+00:00",
"expires_at": "2020-01-01T00:00:00+00:00",
"label": "dashboard device_code",
"auth_type": "oauth",
"source": "manual:dashboard_device_code",
"base_url": "https://inference.stale.example.com/v1",
"priority": 0,
})
pool.add_entry(stale)
monkeypatch.setattr(
"hermes_cli.auth.resolve_nous_runtime_credentials",
lambda min_key_ttl_seconds=60: {
"base_url": "https://inference.example.com/v1",
"expires_at": "2099-01-01T00:00:00+00:00",
"key_id": "key-fresh",
"source": "portal",
},
)
status = get_nous_auth_status()
assert status["logged_in"] is True
assert status["portal_base_url"] == "https://portal.example.com"
assert status["inference_base_url"] == "https://inference.example.com/v1"
assert status["source"] == "runtime:portal"
def test_get_nous_auth_status_reports_revoked_refresh_session(tmp_path, monkeypatch):
from hermes_cli.auth import get_nous_auth_status
hermes_home = tmp_path / "hermes"
_setup_nous_auth(hermes_home, access_token="at-123")
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
def _boom(min_key_ttl_seconds=60):
raise AuthError("Refresh session has been revoked", provider="nous", relogin_required=True)
monkeypatch.setattr("hermes_cli.auth.resolve_nous_runtime_credentials", _boom)
status = get_nous_auth_status()
assert status["logged_in"] is False
assert status["relogin_required"] is True
assert "revoked" in status["error"].lower()
assert status["portal_base_url"] == "https://portal.example.com"
def test_get_nous_auth_status_empty_returns_not_logged_in(tmp_path, monkeypatch):
"""get_nous_auth_status() returns logged_in=False when both pool
and auth store are empty.
"""
from hermes_cli.auth import get_nous_auth_status
hermes_home = tmp_path / "hermes"
hermes_home.mkdir(parents=True, exist_ok=True)
(hermes_home / "auth.json").write_text(json.dumps({
"version": 1, "providers": {},
}))
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
status = get_nous_auth_status()
assert status["logged_in"] is False
def test_refresh_token_persisted_when_mint_returns_insufficient_credits(tmp_path, monkeypatch):
hermes_home = tmp_path / "hermes"
_setup_nous_auth(hermes_home, refresh_token="refresh-old")
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
refresh_calls = []
mint_calls = {"count": 0}
def _fake_refresh_access_token(*, client, portal_base_url, client_id, refresh_token):
refresh_calls.append(refresh_token)
idx = len(refresh_calls)
return {
"access_token": f"access-{idx}",
"refresh_token": f"refresh-{idx}",
"expires_in": 0,
"token_type": "Bearer",
}
def _fake_mint_agent_key(*, client, portal_base_url, access_token, min_ttl_seconds):
mint_calls["count"] += 1
if mint_calls["count"] == 1:
raise AuthError("credits exhausted", provider="nous", code="insufficient_credits")
return _mint_payload(api_key="agent-key-2")
monkeypatch.setattr("hermes_cli.auth._refresh_access_token", _fake_refresh_access_token)
monkeypatch.setattr("hermes_cli.auth._mint_agent_key", _fake_mint_agent_key)
with pytest.raises(AuthError) as exc:
resolve_nous_runtime_credentials(min_key_ttl_seconds=300)
assert exc.value.code == "insufficient_credits"
state_after_failure = get_provider_auth_state("nous")
assert state_after_failure is not None
assert state_after_failure["refresh_token"] == "refresh-1"
assert state_after_failure["access_token"] == "access-1"
creds = resolve_nous_runtime_credentials(min_key_ttl_seconds=300)
assert creds["api_key"] == "agent-key-2"
assert refresh_calls == ["refresh-old", "refresh-1"]
def test_refresh_token_persisted_when_mint_times_out(tmp_path, monkeypatch):
hermes_home = tmp_path / "hermes"
_setup_nous_auth(hermes_home, refresh_token="refresh-old")
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
def _fake_refresh_access_token(*, client, portal_base_url, client_id, refresh_token):
return {
"access_token": "access-1",
"refresh_token": "refresh-1",
"expires_in": 0,
"token_type": "Bearer",
}
def _fake_mint_agent_key(*, client, portal_base_url, access_token, min_ttl_seconds):
raise httpx.ReadTimeout("mint timeout")
monkeypatch.setattr("hermes_cli.auth._refresh_access_token", _fake_refresh_access_token)
monkeypatch.setattr("hermes_cli.auth._mint_agent_key", _fake_mint_agent_key)
with pytest.raises(httpx.ReadTimeout):
resolve_nous_runtime_credentials(min_key_ttl_seconds=300)
state_after_failure = get_provider_auth_state("nous")
assert state_after_failure is not None
assert state_after_failure["refresh_token"] == "refresh-1"
assert state_after_failure["access_token"] == "access-1"
def test_mint_retry_uses_latest_rotated_refresh_token(tmp_path, monkeypatch):
hermes_home = tmp_path / "hermes"
_setup_nous_auth(hermes_home, refresh_token="refresh-old")
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
refresh_calls = []
mint_calls = {"count": 0}
def _fake_refresh_access_token(*, client, portal_base_url, client_id, refresh_token):
refresh_calls.append(refresh_token)
idx = len(refresh_calls)
return {
"access_token": f"access-{idx}",
"refresh_token": f"refresh-{idx}",
"expires_in": 0,
"token_type": "Bearer",
}
def _fake_mint_agent_key(*, client, portal_base_url, access_token, min_ttl_seconds):
mint_calls["count"] += 1
if mint_calls["count"] == 1:
raise AuthError("stale access token", provider="nous", code="invalid_token")
return _mint_payload(api_key="agent-key")
monkeypatch.setattr("hermes_cli.auth._refresh_access_token", _fake_refresh_access_token)
monkeypatch.setattr("hermes_cli.auth._mint_agent_key", _fake_mint_agent_key)
creds = resolve_nous_runtime_credentials(min_key_ttl_seconds=300)
assert creds["api_key"] == "agent-key"
assert refresh_calls == ["refresh-old", "refresh-1"]
# =============================================================================
# _login_nous: "Skip (keep current)" must preserve prior provider + model
# =============================================================================
class TestLoginNousSkipKeepsCurrent:
"""When a user runs `hermes model` → Nous Portal → Skip (keep current) after
a successful OAuth login, the prior provider and model MUST be preserved.
Regression: previously, _update_config_for_provider was called
unconditionally after login, which flipped model.provider to "nous" while
keeping the old model.default (e.g. anthropic/claude-opus-4.6 from
OpenRouter), leaving the user with a mismatched provider/model pair.
"""
def _setup_home_with_openrouter(self, tmp_path, monkeypatch):
import yaml
hermes_home = tmp_path / "hermes"
hermes_home.mkdir(parents=True, exist_ok=True)
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
config_path = hermes_home / "config.yaml"
config_path.write_text(yaml.safe_dump({
"model": {
"provider": "openrouter",
"default": "anthropic/claude-opus-4.6",
},
}, sort_keys=False))
auth_path = hermes_home / "auth.json"
auth_path.write_text(json.dumps({
"version": 1,
"active_provider": "openrouter",
"providers": {"openrouter": {"api_key": "sk-or-fake"}},
}))
return hermes_home, config_path, auth_path
def _patch_login_internals(self, monkeypatch, *, prompt_returns):
"""Patch OAuth + model-list + prompt so _login_nous doesn't hit network."""
import hermes_cli.auth as auth_mod
import hermes_cli.models as models_mod
import hermes_cli.nous_subscription as ns
fake_auth_state = {
"access_token": "fake-nous-token",
"agent_key": "fake-agent-key",
"inference_base_url": "https://inference-api.nousresearch.com",
"portal_base_url": "https://portal.nousresearch.com",
"refresh_token": "fake-refresh",
"token_expires_at": 9999999999,
}
monkeypatch.setattr(
auth_mod, "_nous_device_code_login",
lambda **kwargs: dict(fake_auth_state),
)
monkeypatch.setattr(
auth_mod, "_prompt_model_selection",
lambda *a, **kw: prompt_returns,
)
monkeypatch.setattr(models_mod, "get_pricing_for_provider", lambda p: {})
monkeypatch.setattr(models_mod, "check_nous_free_tier", lambda: None)
monkeypatch.setattr(
models_mod, "partition_nous_models_by_tier",
lambda ids, p, free_tier=False: (ids, []),
)
monkeypatch.setattr(ns, "prompt_enable_tool_gateway", lambda cfg: None)
def test_skip_keep_current_preserves_provider_and_model(self, tmp_path, monkeypatch):
"""User picks Skip → config.yaml untouched, Nous creds still saved."""
import argparse
import yaml
from hermes_cli.auth import PROVIDER_REGISTRY, _login_nous
hermes_home, config_path, auth_path = self._setup_home_with_openrouter(
tmp_path, monkeypatch,
)
self._patch_login_internals(monkeypatch, prompt_returns=None)
args = argparse.Namespace(
portal_url=None, inference_url=None, client_id=None, scope=None,
no_browser=True, timeout=15.0, ca_bundle=None, insecure=False,
)
_login_nous(args, PROVIDER_REGISTRY["nous"])
# config.yaml model section must be unchanged
cfg_after = yaml.safe_load(config_path.read_text())
assert cfg_after["model"]["provider"] == "openrouter"
assert cfg_after["model"]["default"] == "anthropic/claude-opus-4.6"
assert "base_url" not in cfg_after["model"]
# auth.json: active_provider restored to openrouter, but Nous creds saved
auth_after = json.loads(auth_path.read_text())
assert auth_after["active_provider"] == "openrouter"
assert "nous" in auth_after["providers"]
assert auth_after["providers"]["nous"]["access_token"] == "fake-nous-token"
# Existing openrouter creds still intact
assert auth_after["providers"]["openrouter"]["api_key"] == "sk-or-fake"
def test_picking_model_switches_to_nous(self, tmp_path, monkeypatch):
"""User picks a Nous model → provider flips to nous with that model."""
import argparse
import yaml
from hermes_cli.auth import PROVIDER_REGISTRY, _login_nous
hermes_home, config_path, auth_path = self._setup_home_with_openrouter(
tmp_path, monkeypatch,
)
self._patch_login_internals(
monkeypatch, prompt_returns="xiaomi/mimo-v2-pro",
)
args = argparse.Namespace(
portal_url=None, inference_url=None, client_id=None, scope=None,
no_browser=True, timeout=15.0, ca_bundle=None, insecure=False,
)
_login_nous(args, PROVIDER_REGISTRY["nous"])
cfg_after = yaml.safe_load(config_path.read_text())
assert cfg_after["model"]["provider"] == "nous"
assert cfg_after["model"]["default"] == "xiaomi/mimo-v2-pro"
auth_after = json.loads(auth_path.read_text())
assert auth_after["active_provider"] == "nous"
def test_skip_with_no_prior_active_provider_clears_it(self, tmp_path, monkeypatch):
"""Fresh install (no prior active_provider) → Skip clears active_provider
instead of leaving it as nous."""
import argparse
import yaml
from hermes_cli.auth import PROVIDER_REGISTRY, _login_nous
hermes_home = tmp_path / "hermes"
hermes_home.mkdir(parents=True, exist_ok=True)
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
config_path = hermes_home / "config.yaml"
config_path.write_text(yaml.safe_dump({"model": {}}, sort_keys=False))
# No auth.json yet — simulates first-run before any OAuth
self._patch_login_internals(monkeypatch, prompt_returns=None)
args = argparse.Namespace(
portal_url=None, inference_url=None, client_id=None, scope=None,
no_browser=True, timeout=15.0, ca_bundle=None, insecure=False,
)
_login_nous(args, PROVIDER_REGISTRY["nous"])
auth_path = hermes_home / "auth.json"
auth_after = json.loads(auth_path.read_text())
# active_provider should NOT be set to "nous" after Skip
assert auth_after.get("active_provider") in (None, "")
# But Nous creds are still saved
assert "nous" in auth_after.get("providers", {})
# =============================================================================
# persist_nous_credentials: shared helper for CLI + web dashboard login paths
# =============================================================================
def _full_state_fixture() -> dict:
"""Shape of the dict returned by _nous_device_code_login /
refresh_nous_oauth_from_state. Used as helper input."""
return {
"portal_base_url": "https://portal.example.com",
"inference_base_url": "https://inference.example.com/v1",
"client_id": "hermes-cli",
"scope": "inference:mint_agent_key",
"token_type": "Bearer",
"access_token": "access-tok",
"refresh_token": "refresh-tok",
"obtained_at": "2026-04-17T22:00:00+00:00",
"expires_at": "2026-04-17T22:15:00+00:00",
"expires_in": 900,
"agent_key": "agent-key-value",
"agent_key_id": "ak-id",
"agent_key_expires_at": "2026-04-18T22:00:00+00:00",
"agent_key_expires_in": 86400,
"agent_key_reused": False,
"agent_key_obtained_at": "2026-04-17T22:00:10+00:00",
"tls": {"insecure": False, "ca_bundle": None},
}
def test_persist_nous_credentials_writes_both_pool_and_providers(tmp_path, monkeypatch):
"""Helper must populate BOTH credential_pool.nous AND providers.nous.
Regression guard: before this helper existed, `hermes auth add nous`
wrote only the pool. After the Nous agent_key's 24h TTL expired, the
401-recovery path in run_agent.py called resolve_nous_runtime_credentials
which reads providers.nous, found it empty, raised AuthError, and the
agent failed with "Non-retryable client error". Both stores must stay
in sync at write time.
"""
from hermes_cli.auth import persist_nous_credentials, NOUS_DEVICE_CODE_SOURCE
hermes_home = tmp_path / "hermes"
hermes_home.mkdir(parents=True, exist_ok=True)
(hermes_home / "auth.json").write_text(json.dumps({
"version": 1, "providers": {},
}))
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
entry = persist_nous_credentials(_full_state_fixture())
assert entry is not None
assert entry.provider == "nous"
assert entry.source == NOUS_DEVICE_CODE_SOURCE
payload = json.loads((hermes_home / "auth.json").read_text())
# providers.nous populated with the full state (new behaviour)
singleton = payload["providers"]["nous"]
assert singleton["access_token"] == "access-tok"
assert singleton["refresh_token"] == "refresh-tok"
assert singleton["agent_key"] == "agent-key-value"
assert singleton["agent_key_expires_at"] == "2026-04-18T22:00:00+00:00"
# credential_pool.nous has exactly one canonical device_code entry
pool_entries = payload["credential_pool"]["nous"]
assert len(pool_entries) == 1, pool_entries
pool_entry = pool_entries[0]
assert pool_entry["source"] == NOUS_DEVICE_CODE_SOURCE
assert pool_entry["agent_key"] == "agent-key-value"
assert pool_entry["inference_base_url"] == "https://inference.example.com/v1"
def test_persist_nous_credentials_allows_recovery_from_401(tmp_path, monkeypatch):
"""End-to-end: after persisting via the helper, resolve_nous_runtime_credentials
must succeed (not raise "Hermes is not logged into Nous Portal").
This is the exact path that run_agent.py's `_try_refresh_nous_client_credentials`
calls after a Nous 401 — before the fix it would raise AuthError because
providers.nous was empty.
"""
from hermes_cli.auth import persist_nous_credentials, resolve_nous_runtime_credentials
hermes_home = tmp_path / "hermes"
hermes_home.mkdir(parents=True, exist_ok=True)
(hermes_home / "auth.json").write_text(json.dumps({
"version": 1, "providers": {},
}))
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
persist_nous_credentials(_full_state_fixture())
# Stub the network-touching steps so we don't actually contact the
# portal — the point of this test is that state lookup succeeds and
# doesn't raise "Hermes is not logged into Nous Portal".
def _fake_refresh_access_token(*, client, portal_base_url, client_id, refresh_token):
return {
"access_token": "access-new",
"refresh_token": "refresh-new",
"expires_in": 900,
"token_type": "Bearer",
}
def _fake_mint_agent_key(*, client, portal_base_url, access_token, min_ttl_seconds):
return _mint_payload(api_key="new-agent-key")
monkeypatch.setattr("hermes_cli.auth._refresh_access_token", _fake_refresh_access_token)
monkeypatch.setattr("hermes_cli.auth._mint_agent_key", _fake_mint_agent_key)
creds = resolve_nous_runtime_credentials(min_key_ttl_seconds=300, force_mint=True)
assert creds["api_key"] == "new-agent-key"
def test_persist_nous_credentials_idempotent_no_duplicate_pool_entries(tmp_path, monkeypatch):
"""Re-running persist must upsert — not accumulate duplicate device_code rows.
Regression guard for the review comment on PR #11858: before normalisation,
the helper wrote `manual:device_code` while `_seed_from_singletons` wrote
`device_code`, so the pool grew a second duplicate entry on every
``load_pool()``. The helper now writes providers.nous and lets seeding
materialise the pool entry under the canonical ``device_code`` source, so
two persists still leave the pool with exactly one row.
"""
from hermes_cli.auth import persist_nous_credentials, NOUS_DEVICE_CODE_SOURCE
hermes_home = tmp_path / "hermes"
hermes_home.mkdir(parents=True, exist_ok=True)
(hermes_home / "auth.json").write_text(json.dumps({
"version": 1, "providers": {},
}))
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
first = _full_state_fixture()
persist_nous_credentials(first)
second = _full_state_fixture()
second["access_token"] = "access-second"
second["agent_key"] = "agent-key-second"
persist_nous_credentials(second)
payload = json.loads((hermes_home / "auth.json").read_text())
# providers.nous reflects the latest write (singleton semantics)
assert payload["providers"]["nous"]["access_token"] == "access-second"
assert payload["providers"]["nous"]["agent_key"] == "agent-key-second"
# credential_pool.nous has exactly one entry, carrying the latest agent_key
pool_entries = payload["credential_pool"]["nous"]
assert len(pool_entries) == 1, pool_entries
assert pool_entries[0]["source"] == NOUS_DEVICE_CODE_SOURCE
assert pool_entries[0]["agent_key"] == "agent-key-second"
# And no stray `manual:device_code` / `manual:dashboard_device_code` rows
assert not any(
e["source"].startswith("manual:") for e in pool_entries
)
def test_persist_nous_credentials_reloads_pool_after_singleton_write(tmp_path, monkeypatch):
"""The entry returned by the helper must come from a fresh ``load_pool`` so
callers observe the canonical seeded state, including any legacy entries
that ``_seed_from_singletons`` pruned or upserted.
"""
from hermes_cli.auth import persist_nous_credentials, NOUS_DEVICE_CODE_SOURCE
hermes_home = tmp_path / "hermes"
hermes_home.mkdir(parents=True, exist_ok=True)
(hermes_home / "auth.json").write_text(json.dumps({
"version": 1, "providers": {},
}))
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
entry = persist_nous_credentials(_full_state_fixture())
assert entry is not None
assert entry.source == NOUS_DEVICE_CODE_SOURCE
# Label derived by _seed_from_singletons via label_from_token; we don't
# assert its exact value, just that the helper returned a real entry.
assert entry.access_token == "access-tok"
assert entry.agent_key == "agent-key-value"
def test_persist_nous_credentials_embeds_custom_label(tmp_path, monkeypatch):
"""User-supplied ``--label`` round-trips through providers.nous and the pool.
Previously `hermes auth add nous --type oauth --label <name>` silently
dropped the label because persist_nous_credentials() ignored it and
_seed_from_singletons always auto-derived via label_from_token(). The
fix stashes the label inside providers.nous so seeding prefers it.
"""
from hermes_cli.auth import persist_nous_credentials, NOUS_DEVICE_CODE_SOURCE
hermes_home = tmp_path / "hermes"
hermes_home.mkdir(parents=True, exist_ok=True)
(hermes_home / "auth.json").write_text(json.dumps({
"version": 1, "providers": {},
}))
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
entry = persist_nous_credentials(_full_state_fixture(), label="my-personal")
assert entry is not None
assert entry.source == NOUS_DEVICE_CODE_SOURCE
assert entry.label == "my-personal"
# providers.nous carries the label so re-seeding on the next load_pool
# doesn't overwrite it with the auto-derived fingerprint.
payload = json.loads((hermes_home / "auth.json").read_text())
assert payload["providers"]["nous"]["label"] == "my-personal"
def test_persist_nous_credentials_custom_label_survives_reseed(tmp_path, monkeypatch):
"""Reopening the pool (which re-runs _seed_from_singletons) must keep the
user-chosen label instead of clobbering it with label_from_token output.
"""
from hermes_cli.auth import persist_nous_credentials
from agent.credential_pool import load_pool
hermes_home = tmp_path / "hermes"
hermes_home.mkdir(parents=True, exist_ok=True)
(hermes_home / "auth.json").write_text(json.dumps({
"version": 1, "providers": {},
}))
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
persist_nous_credentials(_full_state_fixture(), label="work-acct")
# Second load_pool triggers _seed_from_singletons again. Without the
# fix, this call overwrote the label with label_from_token(access_token).
pool = load_pool("nous")
entries = pool.entries()
assert len(entries) == 1
assert entries[0].label == "work-acct"
def test_persist_nous_credentials_no_label_uses_auto_derived(tmp_path, monkeypatch):
"""When the caller doesn't pass ``label``, the auto-derived fingerprint
is used (unchanged default behaviour — regression guard).
"""
from hermes_cli.auth import persist_nous_credentials
hermes_home = tmp_path / "hermes"
hermes_home.mkdir(parents=True, exist_ok=True)
(hermes_home / "auth.json").write_text(json.dumps({
"version": 1, "providers": {},
}))
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
entry = persist_nous_credentials(_full_state_fixture())
assert entry is not None
# label_from_token derives from the access_token; exact value depends on
# the fingerprinter but it must not be empty and must not equal an
# arbitrary user string we never passed.
assert entry.label
assert entry.label != "my-personal"
# No "label" key embedded in providers.nous when the caller didn't supply one.
payload = json.loads((hermes_home / "auth.json").read_text())
assert "label" not in payload["providers"]["nous"]
def test_refresh_token_reuse_detection_surfaces_actionable_message():
"""Regression for #15099.
When the Nous Portal server returns ``invalid_grant`` with
``error_description`` containing "reuse detected", Hermes must surface an
actionable message explaining that an external process consumed the
refresh token. The default opaque "Refresh token reuse detected; please
re-authenticate" string led users to report this as a Hermes persistence
bug when the true cause is external RT consumption (monitoring scripts,
custom self-heal hooks).
"""
from hermes_cli.auth import _refresh_access_token
class _FakeResponse:
status_code = 400
def json(self):
return {
"error": "invalid_grant",
"error_description": "Refresh token reuse detected; please re-authenticate",
}
class _FakeClient:
def post(self, *args, **kwargs):
return _FakeResponse()
with pytest.raises(AuthError) as exc_info:
_refresh_access_token(
client=_FakeClient(),
portal_base_url="https://portal.nousresearch.com",
client_id="hermes-cli",
refresh_token="rt_consumed_elsewhere",
)
message = str(exc_info.value)
assert "refresh-token reuse" in message.lower() or "refresh token reuse" in message.lower()
# The message must mention the external-process cause and give next steps.
assert "external process" in message.lower() or "monitoring script" in message.lower()
assert "hermes auth add nous" in message.lower()
# Must still be classified as invalid_grant + relogin_required.
assert exc_info.value.code == "invalid_grant"
assert exc_info.value.relogin_required is True
def test_refresh_non_reuse_error_keeps_original_description():
"""Non-reuse invalid_grant errors must keep their original description untouched.
Only the "reuse detected" signature should trigger the actionable message;
generic ``invalid_grant: Refresh session has been revoked`` (the
downstream consequence) keeps its original text so we don't overwrite
useful server context for unrelated failure modes.
"""
from hermes_cli.auth import _refresh_access_token
class _FakeResponse:
status_code = 400
def json(self):
return {
"error": "invalid_grant",
"error_description": "Refresh session has been revoked",
}
class _FakeClient:
def post(self, *args, **kwargs):
return _FakeResponse()
with pytest.raises(AuthError) as exc_info:
_refresh_access_token(
client=_FakeClient(),
portal_base_url="https://portal.nousresearch.com",
client_id="hermes-cli",
refresh_token="rt_anything",
)
assert "Refresh session has been revoked" in str(exc_info.value)
# Must not have been rewritten with the reuse message.
assert "external process" not in str(exc_info.value).lower()
# =============================================================================
# Shared Nous token store — cross-profile persistence (Codex-style auto-import)
# =============================================================================
@pytest.fixture
def shared_store_env(tmp_path, monkeypatch):
"""Redirect HERMES_SHARED_AUTH_DIR to a tmp_path.
Required for every test that exercises the shared Nous store — the
in-auth.py seat belt refuses to touch the real user's shared store
under pytest, so tests that forget this fixture fail loudly instead
of corrupting real state.
"""
shared_dir = tmp_path / "shared"
monkeypatch.setenv("HERMES_SHARED_AUTH_DIR", str(shared_dir))
return shared_dir
def test_shared_store_seat_belt_refuses_real_home_under_pytest(monkeypatch):
"""Without HERMES_SHARED_AUTH_DIR override, the seat belt must trip.
Mirrors the existing ``_auth_file_path`` seat belt: forgetting to
redirect this store in a test must fail loudly instead of silently
writing to the user's real ``~/.hermes/shared/`` across CI runs.
"""
from hermes_cli.auth import _nous_shared_store_path
monkeypatch.delenv("HERMES_SHARED_AUTH_DIR", raising=False)
with pytest.raises(RuntimeError, match="shared Nous auth store"):
_nous_shared_store_path()
def test_shared_store_honors_env_override(tmp_path, monkeypatch):
"""HERMES_SHARED_AUTH_DIR must redirect the path."""
from hermes_cli.auth import _nous_shared_store_path, NOUS_SHARED_STORE_FILENAME
custom_dir = tmp_path / "custom_shared"
monkeypatch.setenv("HERMES_SHARED_AUTH_DIR", str(custom_dir))
path = _nous_shared_store_path()
assert path == custom_dir / NOUS_SHARED_STORE_FILENAME
def test_shared_store_read_missing_returns_none(shared_store_env):
"""Missing file → ``_read_shared_nous_state()`` returns None."""
from hermes_cli.auth import _read_shared_nous_state
assert _read_shared_nous_state() is None
def test_shared_store_read_malformed_returns_none(shared_store_env):
"""Unreadable / non-JSON file → None, not an exception."""
from hermes_cli.auth import _nous_shared_store_path, _read_shared_nous_state
path = _nous_shared_store_path()
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text("{ not json")
assert _read_shared_nous_state() is None
def test_shared_store_read_missing_required_fields_returns_none(shared_store_env):
"""Payload without refresh_token → None (nothing worth importing)."""
from hermes_cli.auth import _nous_shared_store_path, _read_shared_nous_state
path = _nous_shared_store_path()
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps({"_schema": 1, "access_token": "abc"}))
assert _read_shared_nous_state() is None
def test_shared_store_write_and_read_roundtrip(shared_store_env):
"""Write → read must preserve refresh_token + OAuth URLs."""
from hermes_cli.auth import (
_nous_shared_store_path,
_read_shared_nous_state,
_write_shared_nous_state,
)
_write_shared_nous_state(_full_state_fixture())
path = _nous_shared_store_path()
assert path.is_file()
# Permissions should be 0600 where the platform supports it.
mode = path.stat().st_mode & 0o777
assert mode == 0o600 or mode == 0o644 # 0o644 on platforms without chmod
loaded = _read_shared_nous_state()
assert loaded is not None
assert loaded["refresh_token"] == "refresh-tok"
assert loaded["access_token"] == "access-tok"
assert loaded["portal_base_url"] == "https://portal.example.com"
assert loaded["inference_base_url"] == "https://inference.example.com/v1"
# Volatile agent_key MUST NOT be persisted to the shared store
# (24h TTL, profile-specific — only long-lived OAuth tokens are
# cross-profile useful).
assert "agent_key" not in loaded
def test_shared_store_write_skips_when_refresh_token_missing(shared_store_env):
"""Write is a no-op when refresh_token is absent (nothing to share)."""
from hermes_cli.auth import _nous_shared_store_path, _write_shared_nous_state
state = dict(_full_state_fixture())
state["refresh_token"] = ""
_write_shared_nous_state(state)
assert not _nous_shared_store_path().is_file()
def test_persist_nous_credentials_mirrors_to_shared_store(
tmp_path, monkeypatch, shared_store_env,
):
"""persist_nous_credentials must populate BOTH per-profile auth.json
AND the shared store, so a future profile's `hermes auth add nous
--type oauth` can one-tap import instead of redoing device-code.
"""
from hermes_cli.auth import (
_nous_shared_store_path,
_read_shared_nous_state,
persist_nous_credentials,
)
hermes_home = tmp_path / "hermes"
hermes_home.mkdir(parents=True, exist_ok=True)
(hermes_home / "auth.json").write_text(
json.dumps({"version": 1, "providers": {}})
)
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
persist_nous_credentials(_full_state_fixture())
# Per-profile auth.json populated
payload = json.loads((hermes_home / "auth.json").read_text())
assert "nous" in payload.get("providers", {})
# Shared store populated with the same refresh_token
shared = _read_shared_nous_state()
assert shared is not None
assert shared["refresh_token"] == "refresh-tok"
# Shared file path lives under the tmp override, NOT the real home
assert str(_nous_shared_store_path()).startswith(str(shared_store_env))
def test_try_import_shared_returns_none_when_store_missing(shared_store_env):
"""No shared store → no rehydrate (fall through to device-code)."""
from hermes_cli.auth import _try_import_shared_nous_state
assert _try_import_shared_nous_state() is None
def test_try_import_shared_returns_none_on_refresh_failure(
shared_store_env, monkeypatch,
):
"""If the portal rejects the stored refresh_token (revoked, expired,
portal down), _try_import_shared_nous_state must return None so the
login flow falls back to a fresh device-code run.
"""
from hermes_cli import auth as auth_mod
# Seed the shared store
auth_mod._write_shared_nous_state(_full_state_fixture())
# Make refresh fail
def _boom(*_args, **_kwargs):
raise AuthError(
"Refresh session has been revoked",
provider="nous",
code="invalid_grant",
relogin_required=True,
)
monkeypatch.setattr(auth_mod, "refresh_nous_oauth_from_state", _boom)
assert auth_mod._try_import_shared_nous_state() is None
def test_try_import_shared_rehydrates_on_success(shared_store_env, monkeypatch):
"""Happy path: stored refresh_token is accepted, forced refresh+mint
returns a fresh access_token + agent_key, and the returned dict has
every field persist_nous_credentials() needs.
"""
from hermes_cli import auth as auth_mod
auth_mod._write_shared_nous_state(_full_state_fixture())
def _fake_refresh(state, **kwargs):
# Simulate portal returning fresh tokens + a new agent_key
assert kwargs.get("force_refresh") is True
assert kwargs.get("force_mint") is True
return {
**state,
"access_token": "fresh-access-tok",
"refresh_token": "fresh-refresh-tok", # rotated
"agent_key": "new-agent-key",
"agent_key_expires_at": "2026-04-19T22:00:00+00:00",
}
monkeypatch.setattr(auth_mod, "refresh_nous_oauth_from_state", _fake_refresh)
result = auth_mod._try_import_shared_nous_state()
assert result is not None
assert result["access_token"] == "fresh-access-tok"
assert result["refresh_token"] == "fresh-refresh-tok"
assert result["agent_key"] == "new-agent-key"
# Preserved from shared state
assert result["portal_base_url"] == "https://portal.example.com"
assert result["client_id"] == "hermes-cli"
def test_shared_store_survives_across_profile_switch(
tmp_path, monkeypatch, shared_store_env,
):
"""End-to-end: profile A logs in → shared store populated → profile B
(different HERMES_HOME) sees the same shared state and can rehydrate
without re-running device-code.
"""
from hermes_cli import auth as auth_mod
# Profile A: login, which mirrors to shared store
profile_a = tmp_path / "profile_a"
profile_a.mkdir(parents=True, exist_ok=True)
(profile_a / "auth.json").write_text(
json.dumps({"version": 1, "providers": {}})
)
monkeypatch.setenv("HERMES_HOME", str(profile_a))
auth_mod.persist_nous_credentials(_full_state_fixture())
# Profile A's auth.json has nous
a_payload = json.loads((profile_a / "auth.json").read_text())
assert "nous" in a_payload.get("providers", {})
# Profile B: fresh HERMES_HOME, no auth yet, but the shared store
# persists — _read_shared_nous_state() must still return the tokens.
profile_b = tmp_path / "profile_b"
profile_b.mkdir(parents=True, exist_ok=True)
(profile_b / "auth.json").write_text(
json.dumps({"version": 1, "providers": {}})
)
monkeypatch.setenv("HERMES_HOME", str(profile_b))
# B's own auth.json has no nous
b_payload = json.loads((profile_b / "auth.json").read_text())
assert "nous" not in b_payload.get("providers", {})
# But the shared store is visible
shared = auth_mod._read_shared_nous_state()
assert shared is not None
assert shared["refresh_token"] == "refresh-tok"
# And a successful rehydrate + persist lands nous into profile B
def _fake_refresh(state, **kwargs):
return {
**state,
"access_token": "b-access-tok",
"refresh_token": "b-refresh-tok",
"agent_key": "b-agent-key",
"agent_key_expires_at": "2026-04-19T22:00:00+00:00",
}
monkeypatch.setattr(auth_mod, "refresh_nous_oauth_from_state", _fake_refresh)
result = auth_mod._try_import_shared_nous_state()
assert result is not None
auth_mod.persist_nous_credentials(result)
b_payload = json.loads((profile_b / "auth.json").read_text())
assert "nous" in b_payload.get("providers", {})
assert b_payload["providers"]["nous"]["refresh_token"] == "b-refresh-tok"
# Shared store was updated with the rotated refresh_token too
shared_after = auth_mod._read_shared_nous_state()
assert shared_after is not None
assert shared_after["refresh_token"] == "b-refresh-tok"
def test_runtime_refresh_uses_newer_shared_token_before_local_stale_token(
tmp_path, monkeypatch, shared_store_env,
):
"""A sibling profile may rotate the single-use Nous refresh token.
When this profile later wakes with an expired local token, runtime
resolution must adopt the shared token before refreshing. Otherwise it
can submit the stale local refresh token and trigger portal reuse
revocation for the whole shared session.
"""
from hermes_cli import auth as auth_mod
profile_b = tmp_path / "profile_b"
_setup_nous_auth(
profile_b,
access_token="local-expired-access",
refresh_token="local-stale-refresh",
)
monkeypatch.setenv("HERMES_HOME", str(profile_b))
shared_state = _full_state_fixture()
shared_state["access_token"] = "shared-fresh-access"
shared_state["refresh_token"] = "shared-fresh-refresh"
shared_state["expires_at"] = "2099-01-01T00:00:00+00:00"
auth_mod._write_shared_nous_state(shared_state)
def _refresh_should_not_happen(**_kwargs):
raise AssertionError("stale profile-local refresh token was used")
minted_with: list[str] = []
def _fake_mint_agent_key(*, client, portal_base_url, access_token, min_ttl_seconds):
minted_with.append(access_token)
return _mint_payload(api_key="agent-key-from-shared-token")
monkeypatch.setattr(auth_mod, "_refresh_access_token", _refresh_should_not_happen)
monkeypatch.setattr(auth_mod, "_mint_agent_key", _fake_mint_agent_key)
creds = auth_mod.resolve_nous_runtime_credentials(
min_key_ttl_seconds=300,
force_mint=True,
)
assert creds["api_key"] == "agent-key-from-shared-token"
assert minted_with == ["shared-fresh-access"]
profile_state = auth_mod.get_provider_auth_state("nous")
assert profile_state is not None
assert profile_state["refresh_token"] == "shared-fresh-refresh"
assert profile_state["access_token"] == "shared-fresh-access"
def test_managed_gateway_access_token_uses_newer_shared_token(
tmp_path, monkeypatch, shared_store_env,
):
"""Managed-tool token reads share the same stale-refresh-token hazard."""
from hermes_cli import auth as auth_mod
profile_b = tmp_path / "profile_b"
_setup_nous_auth(
profile_b,
access_token="local-expired-access",
refresh_token="local-stale-refresh",
)
monkeypatch.setenv("HERMES_HOME", str(profile_b))
shared_state = _full_state_fixture()
shared_state["access_token"] = "shared-fresh-access"
shared_state["refresh_token"] = "shared-fresh-refresh"
shared_state["expires_at"] = "2099-01-01T00:00:00+00:00"
auth_mod._write_shared_nous_state(shared_state)
def _refresh_should_not_happen(**_kwargs):
raise AssertionError("stale profile-local refresh token was used")
monkeypatch.setattr(auth_mod, "_refresh_access_token", _refresh_should_not_happen)
assert auth_mod.resolve_nous_access_token() == "shared-fresh-access"
profile_state = auth_mod.get_provider_auth_state("nous")
assert profile_state is not None
assert profile_state["refresh_token"] == "shared-fresh-refresh"