"""Tests for the `hermes proxy` subcommand and its upstream adapters.""" from __future__ import annotations import asyncio import json import os import threading from pathlib import Path from typing import Any, Dict from unittest.mock import MagicMock, patch import pytest from hermes_cli.proxy.adapters import ADAPTERS, get_adapter from hermes_cli.proxy.adapters.base import UpstreamAdapter, UpstreamCredential from hermes_cli.proxy.adapters.nous_portal import NousPortalAdapter # --------------------------------------------------------------------------- # Adapter registry # --------------------------------------------------------------------------- def test_registry_lists_nous(): assert "nous" in ADAPTERS def test_get_adapter_returns_instance(): adapter = get_adapter("nous") assert isinstance(adapter, NousPortalAdapter) assert isinstance(adapter, UpstreamAdapter) def test_get_adapter_case_insensitive(): assert isinstance(get_adapter("NOUS"), NousPortalAdapter) assert isinstance(get_adapter(" Nous "), NousPortalAdapter) def test_get_adapter_unknown_provider_raises(): with pytest.raises(ValueError, match="anthropic"): get_adapter("anthropic") # not yet implemented # --------------------------------------------------------------------------- # NousPortalAdapter # --------------------------------------------------------------------------- def _write_auth_store(hermes_home: Path, nous_state: Dict[str, Any]) -> Path: """Write an auth.json with the given nous state into a hermetic HERMES_HOME.""" auth_path = hermes_home / "auth.json" auth_path.write_text(json.dumps({ "version": 1, "providers": {"nous": nous_state}, })) return auth_path def test_nous_adapter_metadata(): adapter = NousPortalAdapter() assert adapter.name == "nous" assert adapter.display_name == "Nous Portal" assert "/chat/completions" in adapter.allowed_paths assert "/embeddings" in adapter.allowed_paths assert "/completions" in adapter.allowed_paths assert "/models" in adapter.allowed_paths def test_nous_adapter_not_authenticated_when_no_auth_file(tmp_path, monkeypatch): # HERMES_HOME is already set by conftest, but make doubly sure monkeypatch.setenv("HERMES_HOME", str(tmp_path)) adapter = NousPortalAdapter() assert not adapter.is_authenticated() def test_nous_adapter_not_authenticated_when_provider_missing(tmp_path, monkeypatch): monkeypatch.setenv("HERMES_HOME", str(tmp_path)) (tmp_path / "auth.json").write_text(json.dumps({ "version": 1, "providers": {}, })) assert not NousPortalAdapter().is_authenticated() def test_nous_adapter_authenticated_with_agent_key(tmp_path, monkeypatch): monkeypatch.setenv("HERMES_HOME", str(tmp_path)) _write_auth_store(tmp_path, { "agent_key": "ov-test-key", "agent_key_expires_at": "2099-01-01T00:00:00Z", "inference_base_url": "https://inference-api.nousresearch.com/v1", }) assert NousPortalAdapter().is_authenticated() def test_nous_adapter_authenticated_with_refresh_token_only(tmp_path, monkeypatch): """If access_token+refresh_token exist but no agent_key yet, we can still mint.""" monkeypatch.setenv("HERMES_HOME", str(tmp_path)) _write_auth_store(tmp_path, { "access_token": "access-tok", "refresh_token": "refresh-tok", }) assert NousPortalAdapter().is_authenticated() def test_nous_adapter_get_credential_refreshes_and_persists(tmp_path, monkeypatch): monkeypatch.setenv("HERMES_HOME", str(tmp_path)) _write_auth_store(tmp_path, { "access_token": "access-tok", "refresh_token": "refresh-tok", "client_id": "hermes-cli", "portal_base_url": "https://portal.nousresearch.com", "inference_base_url": "https://inference-api.nousresearch.com/v1", }) refreshed_state = { "access_token": "access-tok", "refresh_token": "refresh-tok", "client_id": "hermes-cli", "portal_base_url": "https://portal.nousresearch.com", "inference_base_url": "https://inference-api.nousresearch.com/v1", "agent_key": "minted-bearer", "agent_key_expires_at": "2099-01-01T00:00:00Z", } with patch( "hermes_cli.proxy.adapters.nous_portal.refresh_nous_oauth_from_state", return_value=refreshed_state, ) as mock_refresh: adapter = NousPortalAdapter() cred = adapter.get_credential() mock_refresh.assert_called_once() assert cred.bearer == "minted-bearer" assert cred.base_url == "https://inference-api.nousresearch.com/v1" assert cred.expires_at == "2099-01-01T00:00:00Z" assert cred.token_type == "Bearer" # Verify state was persisted back stored = json.loads((tmp_path / "auth.json").read_text()) assert stored["providers"]["nous"]["agent_key"] == "minted-bearer" def test_nous_adapter_get_credential_raises_when_not_logged_in(tmp_path, monkeypatch): monkeypatch.setenv("HERMES_HOME", str(tmp_path)) adapter = NousPortalAdapter() with pytest.raises(RuntimeError, match="hermes login nous"): adapter.get_credential() def test_nous_adapter_get_credential_raises_on_refresh_failure(tmp_path, monkeypatch): monkeypatch.setenv("HERMES_HOME", str(tmp_path)) _write_auth_store(tmp_path, { "access_token": "access-tok", "refresh_token": "refresh-tok", }) with patch( "hermes_cli.proxy.adapters.nous_portal.refresh_nous_oauth_from_state", side_effect=RuntimeError("Refresh session has been revoked"), ): adapter = NousPortalAdapter() with pytest.raises(RuntimeError, match="Refresh session has been revoked"): adapter.get_credential() def test_nous_adapter_get_credential_raises_when_no_agent_key_returned(tmp_path, monkeypatch): """If the refresh helper succeeds but produces no agent_key, we surface a clear error.""" monkeypatch.setenv("HERMES_HOME", str(tmp_path)) _write_auth_store(tmp_path, { "access_token": "access-tok", "refresh_token": "refresh-tok", }) with patch( "hermes_cli.proxy.adapters.nous_portal.refresh_nous_oauth_from_state", return_value={"access_token": "a", "refresh_token": "r"}, ): adapter = NousPortalAdapter() with pytest.raises(RuntimeError, match="did not return a usable agent_key"): adapter.get_credential() def test_nous_adapter_concurrent_refresh_serialized(tmp_path, monkeypatch): """Two parallel get_credential() calls must serialize through the lock.""" monkeypatch.setenv("HERMES_HOME", str(tmp_path)) _write_auth_store(tmp_path, { "access_token": "a", "refresh_token": "r", }) call_log: list = [] in_flight = threading.Event() overlap_detected = threading.Event() counter = [0] counter_lock = threading.Lock() def serializing_refresh(state, **kwargs): # If another thread is already inside refresh, the lock is broken. if in_flight.is_set(): overlap_detected.set() in_flight.set() try: call_log.append(threading.current_thread().ident) # Simulate refresh latency so any race window is exposed. import time time.sleep(0.05) with counter_lock: counter[0] += 1 idx = counter[0] return { **state, "agent_key": f"key-{idx}", "agent_key_expires_at": "2099-01-01T00:00:00Z", "inference_base_url": "https://inference-api.nousresearch.com/v1", } finally: in_flight.clear() adapter = NousPortalAdapter() results: list = [] errors: list = [] def worker(): try: results.append(adapter.get_credential().bearer) except Exception as exc: # pragma: no cover - shouldn't happen errors.append(exc) with patch( "hermes_cli.proxy.adapters.nous_portal.refresh_nous_oauth_from_state", side_effect=serializing_refresh, ): threads = [threading.Thread(target=worker) for _ in range(3)] for t in threads: t.start() for t in threads: t.join() assert not errors, f"workers errored: {errors}" assert len(results) == 3 assert len(call_log) == 3 assert not overlap_detected.is_set(), "refresh calls overlapped — lock is broken" assert all(r.startswith("key-") for r in results) # --------------------------------------------------------------------------- # Server: path filtering + forwarding # # We run the proxy AND a fake upstream as real aiohttp servers on ephemeral # ports. Avoids pytest-aiohttp's fixtures (extra dependency for one test file). # --------------------------------------------------------------------------- aiohttp = pytest.importorskip("aiohttp") from aiohttp import web # noqa: E402 from hermes_cli.proxy.server import create_app # noqa: E402 class FakeAdapter(UpstreamAdapter): """A test adapter that returns a fixed credential without touching disk.""" def __init__(self, base_url: str, bearer: str = "test-bearer", allowed=None, raise_on_credential=False): self._base_url = base_url self._bearer = bearer self._allowed = frozenset(allowed or ["/chat/completions"]) self._raise = raise_on_credential self.calls = 0 @property def name(self): return "fake" @property def display_name(self): return "Fake Provider" @property def allowed_paths(self): return self._allowed def is_authenticated(self): return True def get_credential(self): self.calls += 1 if self._raise: raise RuntimeError("simulated auth failure") return UpstreamCredential( bearer=self._bearer, base_url=self._base_url, expires_at="2099-01-01T00:00:00Z", ) async def _start_runner(app: "web.Application"): """Spin up an aiohttp app on an ephemeral localhost port. Returns (runner, base_url).""" runner = web.AppRunner(app, access_log=None) await runner.setup() site = web.TCPSite(runner, host="127.0.0.1", port=0) await site.start() sockets = list(site._server.sockets) # type: ignore[union-attr] port = sockets[0].getsockname()[1] return runner, f"http://127.0.0.1:{port}" def _build_fake_upstream(captured: Dict[str, Any]) -> "web.Application": async def echo(request): body = await request.read() captured["requests"].append({ "method": request.method, "path": request.path, "auth": request.headers.get("Authorization"), "body": body.decode("utf-8") if body else "", }) return web.json_response({"echoed": True, "path": request.path}) async def sse(request): resp = web.StreamResponse( status=200, headers={"Content-Type": "text/event-stream"}, ) await resp.prepare(request) for chunk in [b"data: hello\n\n", b"data: world\n\n", b"data: [DONE]\n\n"]: await resp.write(chunk) await resp.write_eof() return resp app = web.Application() app.router.add_route("*", "/v1/chat/completions", echo) app.router.add_route("*", "/v1/embeddings", echo) app.router.add_route("*", "/v1/sse", sse) return app def test_server_forwards_chat_completions(): async def run(): captured: Dict[str, Any] = {"requests": []} upstream_runner, upstream_base = await _start_runner(_build_fake_upstream(captured)) adapter = FakeAdapter(f"{upstream_base}/v1", bearer="real-portal-key") proxy_runner, proxy_base = await _start_runner(create_app(adapter)) try: async with aiohttp.ClientSession() as session: async with session.post( f"{proxy_base}/v1/chat/completions", json={"model": "Hermes-4-70B", "messages": [{"role": "user", "content": "hi"}]}, headers={"Authorization": "Bearer client-dummy-key"}, ) as resp: assert resp.status == 200 data = await resp.json() assert data["echoed"] is True assert len(captured["requests"]) == 1 req = captured["requests"][0] assert req["auth"] == "Bearer real-portal-key" assert "Hermes-4-70B" in req["body"] finally: await proxy_runner.cleanup() await upstream_runner.cleanup() asyncio.run(run()) def test_server_rejects_disallowed_path(): async def run(): adapter = FakeAdapter("http://unused.example/v1", allowed=["/chat/completions"]) runner, base = await _start_runner(create_app(adapter)) try: async with aiohttp.ClientSession() as session: async with session.get(f"{base}/v1/random/endpoint") as resp: assert resp.status == 404 body = await resp.json() assert body["error"]["type"] == "path_not_allowed" assert "/chat/completions" in body["error"]["message"] finally: await runner.cleanup() asyncio.run(run()) def test_server_returns_401_when_adapter_fails(): async def run(): adapter = FakeAdapter("http://unused.example/v1", raise_on_credential=True) runner, base = await _start_runner(create_app(adapter)) try: async with aiohttp.ClientSession() as session: async with session.post(f"{base}/v1/chat/completions", json={}) as resp: assert resp.status == 401 body = await resp.json() assert body["error"]["type"] == "upstream_auth_failed" assert "simulated auth failure" in body["error"]["message"] finally: await runner.cleanup() asyncio.run(run()) def test_server_health_endpoint(): async def run(): adapter = FakeAdapter("http://unused.example/v1") runner, base = await _start_runner(create_app(adapter)) try: async with aiohttp.ClientSession() as session: async with session.get(f"{base}/health") as resp: assert resp.status == 200 body = await resp.json() assert body["status"] == "ok" assert body["upstream"] == "Fake Provider" assert body["authenticated"] is True finally: await runner.cleanup() asyncio.run(run()) def test_server_streams_sse(): async def run(): captured: Dict[str, Any] = {"requests": []} upstream_runner, upstream_base = await _start_runner(_build_fake_upstream(captured)) adapter = FakeAdapter(f"{upstream_base}/v1", allowed=["/sse"]) proxy_runner, proxy_base = await _start_runner(create_app(adapter)) try: async with aiohttp.ClientSession() as session: async with session.get(f"{proxy_base}/v1/sse") as resp: assert resp.status == 200 chunks = [] async for chunk in resp.content.iter_any(): chunks.append(chunk) full = b"".join(chunks) assert b"data: hello" in full assert b"data: [DONE]" in full finally: await proxy_runner.cleanup() await upstream_runner.cleanup() asyncio.run(run()) def test_server_strips_client_auth_header(): """The client's Authorization header MUST NOT reach the upstream.""" async def run(): captured: Dict[str, Any] = {"requests": []} upstream_runner, upstream_base = await _start_runner(_build_fake_upstream(captured)) adapter = FakeAdapter(f"{upstream_base}/v1", bearer="ours") proxy_runner, proxy_base = await _start_runner(create_app(adapter)) try: async with aiohttp.ClientSession() as session: async with session.post( f"{proxy_base}/v1/chat/completions", json={}, headers={"Authorization": "Bearer SHOULD_NOT_LEAK"}, ) as resp: await resp.read() assert captured["requests"][0]["auth"] == "Bearer ours" assert "SHOULD_NOT_LEAK" not in captured["requests"][0]["auth"] finally: await proxy_runner.cleanup() await upstream_runner.cleanup() asyncio.run(run()) # --------------------------------------------------------------------------- # CLI handlers # --------------------------------------------------------------------------- def test_cmd_proxy_status_runs(capsys, tmp_path, monkeypatch): monkeypatch.setenv("HERMES_HOME", str(tmp_path)) from hermes_cli.proxy.cli import cmd_proxy_status args = MagicMock() rc = cmd_proxy_status(args) assert rc == 0 out = capsys.readouterr().out assert "nous" in out assert "Nous Portal" in out assert "not logged in" in out def test_cmd_proxy_providers_runs(capsys): from hermes_cli.proxy.cli import cmd_proxy_list_providers args = MagicMock() rc = cmd_proxy_list_providers(args) assert rc == 0 out = capsys.readouterr().out assert "nous" in out assert "Nous Portal" in out def test_cmd_proxy_start_refuses_unknown_provider(capsys): from hermes_cli.proxy.cli import cmd_proxy_start args = MagicMock() args.provider = "no-such-provider" args.host = None args.port = None rc = cmd_proxy_start(args) assert rc == 2 err = capsys.readouterr().err assert "no-such-provider" in err def test_cmd_proxy_start_refuses_when_unauthenticated(capsys, tmp_path, monkeypatch): monkeypatch.setenv("HERMES_HOME", str(tmp_path)) from hermes_cli.proxy.cli import cmd_proxy_start args = MagicMock() args.provider = "nous" args.host = None args.port = None rc = cmd_proxy_start(args) assert rc == 2 err = capsys.readouterr().err assert "hermes login nous" in err