mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-25 00:51:20 +00:00
The dashboard device-code flow (_nous_poller in web_server.py) saved credentials to the credential pool only, while get_nous_auth_status() only checked the auth store (auth.json). This caused the Keys tab to show 'not connected' even when the backend was fully authenticated. Two fixes: 1. get_nous_auth_status() now checks the credential pool first (like get_codex_auth_status() already does), then falls back to the auth store. 2. _nous_poller now also persists to the auth store after saving to the credential pool, matching the CLI flow (_login_nous). Adds 3 tests covering pool-only, auth-store-fallback, and empty-state scenarios.
301 lines
11 KiB
Python
301 lines
11 KiB
Python
"""Regression tests for Nous OAuth refresh + agent-key mint interactions."""
|
|
|
|
import json
|
|
import os
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
import httpx
|
|
import pytest
|
|
|
|
from hermes_cli.auth import AuthError, get_provider_auth_state, resolve_nous_runtime_credentials
|
|
|
|
|
|
# =============================================================================
|
|
# _resolve_verify: CA bundle path validation
|
|
# =============================================================================
|
|
|
|
|
|
class TestResolveVerifyFallback:
|
|
"""Verify _resolve_verify falls back to True when CA bundle path doesn't exist."""
|
|
|
|
def test_missing_ca_bundle_in_auth_state_falls_back(self):
|
|
from hermes_cli.auth import _resolve_verify
|
|
|
|
result = _resolve_verify(auth_state={
|
|
"tls": {"insecure": False, "ca_bundle": "/nonexistent/ca-bundle.pem"},
|
|
})
|
|
assert result is True
|
|
|
|
def test_valid_ca_bundle_in_auth_state_is_returned(self, tmp_path):
|
|
from hermes_cli.auth import _resolve_verify
|
|
|
|
ca_file = tmp_path / "ca-bundle.pem"
|
|
ca_file.write_text("fake cert")
|
|
result = _resolve_verify(auth_state={
|
|
"tls": {"insecure": False, "ca_bundle": str(ca_file)},
|
|
})
|
|
assert result == str(ca_file)
|
|
|
|
def test_missing_ssl_cert_file_env_falls_back(self, monkeypatch):
|
|
from hermes_cli.auth import _resolve_verify
|
|
|
|
monkeypatch.setenv("SSL_CERT_FILE", "/nonexistent/ssl-cert.pem")
|
|
monkeypatch.delenv("HERMES_CA_BUNDLE", raising=False)
|
|
result = _resolve_verify(auth_state={"tls": {}})
|
|
assert result is True
|
|
|
|
def test_missing_hermes_ca_bundle_env_falls_back(self, monkeypatch):
|
|
from hermes_cli.auth import _resolve_verify
|
|
|
|
monkeypatch.setenv("HERMES_CA_BUNDLE", "/nonexistent/hermes-ca.pem")
|
|
monkeypatch.delenv("SSL_CERT_FILE", raising=False)
|
|
result = _resolve_verify(auth_state={"tls": {}})
|
|
assert result is True
|
|
|
|
def test_insecure_takes_precedence_over_missing_ca(self):
|
|
from hermes_cli.auth import _resolve_verify
|
|
|
|
result = _resolve_verify(
|
|
insecure=True,
|
|
auth_state={"tls": {"ca_bundle": "/nonexistent/ca.pem"}},
|
|
)
|
|
assert result is False
|
|
|
|
def test_no_ca_bundle_returns_true(self, monkeypatch):
|
|
from hermes_cli.auth import _resolve_verify
|
|
|
|
monkeypatch.delenv("HERMES_CA_BUNDLE", raising=False)
|
|
monkeypatch.delenv("SSL_CERT_FILE", raising=False)
|
|
result = _resolve_verify(auth_state={"tls": {}})
|
|
assert result is True
|
|
|
|
def test_explicit_ca_bundle_param_missing_falls_back(self):
|
|
from hermes_cli.auth import _resolve_verify
|
|
|
|
result = _resolve_verify(ca_bundle="/nonexistent/explicit-ca.pem")
|
|
assert result is True
|
|
|
|
def test_explicit_ca_bundle_param_valid_is_returned(self, tmp_path):
|
|
from hermes_cli.auth import _resolve_verify
|
|
|
|
ca_file = tmp_path / "explicit-ca.pem"
|
|
ca_file.write_text("fake cert")
|
|
result = _resolve_verify(ca_bundle=str(ca_file))
|
|
assert result == str(ca_file)
|
|
|
|
|
|
def _setup_nous_auth(
|
|
hermes_home: Path,
|
|
*,
|
|
access_token: str = "access-old",
|
|
refresh_token: str = "refresh-old",
|
|
) -> None:
|
|
hermes_home.mkdir(parents=True, exist_ok=True)
|
|
auth_store = {
|
|
"version": 1,
|
|
"active_provider": "nous",
|
|
"providers": {
|
|
"nous": {
|
|
"portal_base_url": "https://portal.example.com",
|
|
"inference_base_url": "https://inference.example.com/v1",
|
|
"client_id": "hermes-cli",
|
|
"token_type": "Bearer",
|
|
"scope": "inference:mint_agent_key",
|
|
"access_token": access_token,
|
|
"refresh_token": refresh_token,
|
|
"obtained_at": "2026-02-01T00:00:00+00:00",
|
|
"expires_in": 0,
|
|
"expires_at": "2026-02-01T00:00:00+00:00",
|
|
"agent_key": None,
|
|
"agent_key_id": None,
|
|
"agent_key_expires_at": None,
|
|
"agent_key_expires_in": None,
|
|
"agent_key_reused": None,
|
|
"agent_key_obtained_at": None,
|
|
}
|
|
},
|
|
}
|
|
(hermes_home / "auth.json").write_text(json.dumps(auth_store, indent=2))
|
|
|
|
|
|
def _mint_payload(api_key: str = "agent-key") -> dict:
|
|
return {
|
|
"api_key": api_key,
|
|
"key_id": "key-id-1",
|
|
"expires_at": datetime.now(timezone.utc).isoformat(),
|
|
"expires_in": 1800,
|
|
"reused": False,
|
|
}
|
|
|
|
|
|
def test_get_nous_auth_status_checks_credential_pool(tmp_path, monkeypatch):
|
|
"""get_nous_auth_status() should find Nous credentials in the pool
|
|
even when the auth store has no Nous provider entry — this is the
|
|
case when login happened via the dashboard device-code flow which
|
|
saves to the pool only.
|
|
"""
|
|
from hermes_cli.auth import get_nous_auth_status
|
|
|
|
hermes_home = tmp_path / "hermes"
|
|
hermes_home.mkdir(parents=True, exist_ok=True)
|
|
# Empty auth store — no Nous provider entry
|
|
(hermes_home / "auth.json").write_text(json.dumps({
|
|
"version": 1, "providers": {},
|
|
}))
|
|
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
|
|
|
|
# Seed the credential pool with a Nous entry
|
|
from agent.credential_pool import PooledCredential, load_pool
|
|
pool = load_pool("nous")
|
|
entry = PooledCredential.from_dict("nous", {
|
|
"access_token": "test-access-token",
|
|
"refresh_token": "test-refresh-token",
|
|
"portal_base_url": "https://portal.example.com",
|
|
"inference_base_url": "https://inference.example.com/v1",
|
|
"agent_key": "test-agent-key",
|
|
"agent_key_expires_at": "2099-01-01T00:00:00+00:00",
|
|
"label": "dashboard device_code",
|
|
"auth_type": "oauth",
|
|
"source": "manual:dashboard_device_code",
|
|
"base_url": "https://inference.example.com/v1",
|
|
})
|
|
pool.add_entry(entry)
|
|
|
|
status = get_nous_auth_status()
|
|
assert status["logged_in"] is True
|
|
assert "example.com" in str(status.get("portal_base_url", ""))
|
|
|
|
|
|
def test_get_nous_auth_status_auth_store_fallback(tmp_path, monkeypatch):
|
|
"""get_nous_auth_status() falls back to auth store when credential
|
|
pool is empty.
|
|
"""
|
|
from hermes_cli.auth import get_nous_auth_status
|
|
|
|
hermes_home = tmp_path / "hermes"
|
|
_setup_nous_auth(hermes_home, access_token="at-123")
|
|
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
|
|
|
|
status = get_nous_auth_status()
|
|
assert status["logged_in"] is True
|
|
assert status["portal_base_url"] == "https://portal.example.com"
|
|
|
|
|
|
def test_get_nous_auth_status_empty_returns_not_logged_in(tmp_path, monkeypatch):
|
|
"""get_nous_auth_status() returns logged_in=False when both pool
|
|
and auth store are empty.
|
|
"""
|
|
from hermes_cli.auth import get_nous_auth_status
|
|
|
|
hermes_home = tmp_path / "hermes"
|
|
hermes_home.mkdir(parents=True, exist_ok=True)
|
|
(hermes_home / "auth.json").write_text(json.dumps({
|
|
"version": 1, "providers": {},
|
|
}))
|
|
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
|
|
|
|
status = get_nous_auth_status()
|
|
assert status["logged_in"] is False
|
|
|
|
|
|
def test_refresh_token_persisted_when_mint_returns_insufficient_credits(tmp_path, monkeypatch):
|
|
hermes_home = tmp_path / "hermes"
|
|
_setup_nous_auth(hermes_home, refresh_token="refresh-old")
|
|
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
|
|
|
|
refresh_calls = []
|
|
mint_calls = {"count": 0}
|
|
|
|
def _fake_refresh_access_token(*, client, portal_base_url, client_id, refresh_token):
|
|
refresh_calls.append(refresh_token)
|
|
idx = len(refresh_calls)
|
|
return {
|
|
"access_token": f"access-{idx}",
|
|
"refresh_token": f"refresh-{idx}",
|
|
"expires_in": 0,
|
|
"token_type": "Bearer",
|
|
}
|
|
|
|
def _fake_mint_agent_key(*, client, portal_base_url, access_token, min_ttl_seconds):
|
|
mint_calls["count"] += 1
|
|
if mint_calls["count"] == 1:
|
|
raise AuthError("credits exhausted", provider="nous", code="insufficient_credits")
|
|
return _mint_payload(api_key="agent-key-2")
|
|
|
|
monkeypatch.setattr("hermes_cli.auth._refresh_access_token", _fake_refresh_access_token)
|
|
monkeypatch.setattr("hermes_cli.auth._mint_agent_key", _fake_mint_agent_key)
|
|
|
|
with pytest.raises(AuthError) as exc:
|
|
resolve_nous_runtime_credentials(min_key_ttl_seconds=300)
|
|
assert exc.value.code == "insufficient_credits"
|
|
|
|
state_after_failure = get_provider_auth_state("nous")
|
|
assert state_after_failure is not None
|
|
assert state_after_failure["refresh_token"] == "refresh-1"
|
|
assert state_after_failure["access_token"] == "access-1"
|
|
|
|
creds = resolve_nous_runtime_credentials(min_key_ttl_seconds=300)
|
|
assert creds["api_key"] == "agent-key-2"
|
|
assert refresh_calls == ["refresh-old", "refresh-1"]
|
|
|
|
|
|
def test_refresh_token_persisted_when_mint_times_out(tmp_path, monkeypatch):
|
|
hermes_home = tmp_path / "hermes"
|
|
_setup_nous_auth(hermes_home, refresh_token="refresh-old")
|
|
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
|
|
|
|
def _fake_refresh_access_token(*, client, portal_base_url, client_id, refresh_token):
|
|
return {
|
|
"access_token": "access-1",
|
|
"refresh_token": "refresh-1",
|
|
"expires_in": 0,
|
|
"token_type": "Bearer",
|
|
}
|
|
|
|
def _fake_mint_agent_key(*, client, portal_base_url, access_token, min_ttl_seconds):
|
|
raise httpx.ReadTimeout("mint timeout")
|
|
|
|
monkeypatch.setattr("hermes_cli.auth._refresh_access_token", _fake_refresh_access_token)
|
|
monkeypatch.setattr("hermes_cli.auth._mint_agent_key", _fake_mint_agent_key)
|
|
|
|
with pytest.raises(httpx.ReadTimeout):
|
|
resolve_nous_runtime_credentials(min_key_ttl_seconds=300)
|
|
|
|
state_after_failure = get_provider_auth_state("nous")
|
|
assert state_after_failure is not None
|
|
assert state_after_failure["refresh_token"] == "refresh-1"
|
|
assert state_after_failure["access_token"] == "access-1"
|
|
|
|
|
|
def test_mint_retry_uses_latest_rotated_refresh_token(tmp_path, monkeypatch):
|
|
hermes_home = tmp_path / "hermes"
|
|
_setup_nous_auth(hermes_home, refresh_token="refresh-old")
|
|
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
|
|
|
|
refresh_calls = []
|
|
mint_calls = {"count": 0}
|
|
|
|
def _fake_refresh_access_token(*, client, portal_base_url, client_id, refresh_token):
|
|
refresh_calls.append(refresh_token)
|
|
idx = len(refresh_calls)
|
|
return {
|
|
"access_token": f"access-{idx}",
|
|
"refresh_token": f"refresh-{idx}",
|
|
"expires_in": 0,
|
|
"token_type": "Bearer",
|
|
}
|
|
|
|
def _fake_mint_agent_key(*, client, portal_base_url, access_token, min_ttl_seconds):
|
|
mint_calls["count"] += 1
|
|
if mint_calls["count"] == 1:
|
|
raise AuthError("stale access token", provider="nous", code="invalid_token")
|
|
return _mint_payload(api_key="agent-key")
|
|
|
|
monkeypatch.setattr("hermes_cli.auth._refresh_access_token", _fake_refresh_access_token)
|
|
monkeypatch.setattr("hermes_cli.auth._mint_agent_key", _fake_mint_agent_key)
|
|
|
|
creds = resolve_nous_runtime_credentials(min_key_ttl_seconds=300)
|
|
assert creds["api_key"] == "agent-key"
|
|
assert refresh_calls == ["refresh-old", "refresh-1"]
|
|
|