hermes-agent/tests/hermes_cli/test_proxy.py
Teknium ccb5aae0d2
feat(proxy): local OpenAI-compatible proxy for OAuth providers (#25969)
Adds 'hermes proxy start' — a local HTTP server that lets external apps
(OpenViking, Karakeep, Open WebUI, ...) use a Hermes-managed provider
subscription as their LLM endpoint. The proxy attaches the user's real
OAuth-resolved credentials to each forwarded request, refreshing them
automatically; the client can send any bearer (it gets stripped).

Ships with one adapter — Nous Portal. The UpstreamAdapter ABC and
registry in hermes_cli/proxy/adapters/ are designed for additional
OAuth providers to plug in by name without server changes.

Commands:
  hermes proxy start [--provider nous] [--host 127.0.0.1] [--port 8645]
  hermes proxy status
  hermes proxy providers

Allowed Portal paths: /v1/chat/completions, /v1/completions,
/v1/embeddings, /v1/models. Anything else returns 404 with a clear
error pointing at the allowed list.

aiohttp is gated like gateway/platforms/api_server.py (try-import,
clean runtime error if missing). No new core dependency.

Tests: 24 unit tests + 1 separate E2E that spawns the real subprocess
and verifies the upstream receives the right bearer with the client's
header stripped.
2026-05-14 15:40:48 -07:00

512 lines
18 KiB
Python

"""Tests for the `hermes proxy` subcommand and its upstream adapters."""
from __future__ import annotations
import asyncio
import json
import os
import threading
from pathlib import Path
from typing import Any, Dict
from unittest.mock import MagicMock, patch
import pytest
from hermes_cli.proxy.adapters import ADAPTERS, get_adapter
from hermes_cli.proxy.adapters.base import UpstreamAdapter, UpstreamCredential
from hermes_cli.proxy.adapters.nous_portal import NousPortalAdapter
# ---------------------------------------------------------------------------
# Adapter registry
# ---------------------------------------------------------------------------
def test_registry_lists_nous():
assert "nous" in ADAPTERS
def test_get_adapter_returns_instance():
adapter = get_adapter("nous")
assert isinstance(adapter, NousPortalAdapter)
assert isinstance(adapter, UpstreamAdapter)
def test_get_adapter_case_insensitive():
assert isinstance(get_adapter("NOUS"), NousPortalAdapter)
assert isinstance(get_adapter(" Nous "), NousPortalAdapter)
def test_get_adapter_unknown_provider_raises():
with pytest.raises(ValueError, match="anthropic"):
get_adapter("anthropic") # not yet implemented
# ---------------------------------------------------------------------------
# NousPortalAdapter
# ---------------------------------------------------------------------------
def _write_auth_store(hermes_home: Path, nous_state: Dict[str, Any]) -> Path:
"""Write an auth.json with the given nous state into a hermetic HERMES_HOME."""
auth_path = hermes_home / "auth.json"
auth_path.write_text(json.dumps({
"version": 1,
"providers": {"nous": nous_state},
}))
return auth_path
def test_nous_adapter_metadata():
adapter = NousPortalAdapter()
assert adapter.name == "nous"
assert adapter.display_name == "Nous Portal"
assert "/chat/completions" in adapter.allowed_paths
assert "/embeddings" in adapter.allowed_paths
assert "/completions" in adapter.allowed_paths
assert "/models" in adapter.allowed_paths
def test_nous_adapter_not_authenticated_when_no_auth_file(tmp_path, monkeypatch):
# HERMES_HOME is already set by conftest, but make doubly sure
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
adapter = NousPortalAdapter()
assert not adapter.is_authenticated()
def test_nous_adapter_not_authenticated_when_provider_missing(tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
(tmp_path / "auth.json").write_text(json.dumps({
"version": 1,
"providers": {},
}))
assert not NousPortalAdapter().is_authenticated()
def test_nous_adapter_authenticated_with_agent_key(tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
_write_auth_store(tmp_path, {
"agent_key": "ov-test-key",
"agent_key_expires_at": "2099-01-01T00:00:00Z",
"inference_base_url": "https://inference-api.nousresearch.com/v1",
})
assert NousPortalAdapter().is_authenticated()
def test_nous_adapter_authenticated_with_refresh_token_only(tmp_path, monkeypatch):
"""If access_token+refresh_token exist but no agent_key yet, we can still mint."""
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
_write_auth_store(tmp_path, {
"access_token": "access-tok",
"refresh_token": "refresh-tok",
})
assert NousPortalAdapter().is_authenticated()
def test_nous_adapter_get_credential_refreshes_and_persists(tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
_write_auth_store(tmp_path, {
"access_token": "access-tok",
"refresh_token": "refresh-tok",
"client_id": "hermes-cli",
"portal_base_url": "https://portal.nousresearch.com",
"inference_base_url": "https://inference-api.nousresearch.com/v1",
})
refreshed_state = {
"access_token": "access-tok",
"refresh_token": "refresh-tok",
"client_id": "hermes-cli",
"portal_base_url": "https://portal.nousresearch.com",
"inference_base_url": "https://inference-api.nousresearch.com/v1",
"agent_key": "minted-bearer",
"agent_key_expires_at": "2099-01-01T00:00:00Z",
}
with patch(
"hermes_cli.proxy.adapters.nous_portal.refresh_nous_oauth_from_state",
return_value=refreshed_state,
) as mock_refresh:
adapter = NousPortalAdapter()
cred = adapter.get_credential()
mock_refresh.assert_called_once()
assert cred.bearer == "minted-bearer"
assert cred.base_url == "https://inference-api.nousresearch.com/v1"
assert cred.expires_at == "2099-01-01T00:00:00Z"
assert cred.token_type == "Bearer"
# Verify state was persisted back
stored = json.loads((tmp_path / "auth.json").read_text())
assert stored["providers"]["nous"]["agent_key"] == "minted-bearer"
def test_nous_adapter_get_credential_raises_when_not_logged_in(tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
adapter = NousPortalAdapter()
with pytest.raises(RuntimeError, match="hermes login nous"):
adapter.get_credential()
def test_nous_adapter_get_credential_raises_on_refresh_failure(tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
_write_auth_store(tmp_path, {
"access_token": "access-tok",
"refresh_token": "refresh-tok",
})
with patch(
"hermes_cli.proxy.adapters.nous_portal.refresh_nous_oauth_from_state",
side_effect=RuntimeError("Refresh session has been revoked"),
):
adapter = NousPortalAdapter()
with pytest.raises(RuntimeError, match="Refresh session has been revoked"):
adapter.get_credential()
def test_nous_adapter_get_credential_raises_when_no_agent_key_returned(tmp_path, monkeypatch):
"""If the refresh helper succeeds but produces no agent_key, we surface a clear error."""
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
_write_auth_store(tmp_path, {
"access_token": "access-tok",
"refresh_token": "refresh-tok",
})
with patch(
"hermes_cli.proxy.adapters.nous_portal.refresh_nous_oauth_from_state",
return_value={"access_token": "a", "refresh_token": "r"},
):
adapter = NousPortalAdapter()
with pytest.raises(RuntimeError, match="did not return a usable agent_key"):
adapter.get_credential()
def test_nous_adapter_concurrent_refresh_serialized(tmp_path, monkeypatch):
"""Two parallel get_credential() calls must serialize through the lock."""
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
_write_auth_store(tmp_path, {
"access_token": "a", "refresh_token": "r",
})
call_log: list = []
in_flight = threading.Event()
overlap_detected = threading.Event()
counter = [0]
counter_lock = threading.Lock()
def serializing_refresh(state, **kwargs):
# If another thread is already inside refresh, the lock is broken.
if in_flight.is_set():
overlap_detected.set()
in_flight.set()
try:
call_log.append(threading.current_thread().ident)
# Simulate refresh latency so any race window is exposed.
import time
time.sleep(0.05)
with counter_lock:
counter[0] += 1
idx = counter[0]
return {
**state,
"agent_key": f"key-{idx}",
"agent_key_expires_at": "2099-01-01T00:00:00Z",
"inference_base_url": "https://inference-api.nousresearch.com/v1",
}
finally:
in_flight.clear()
adapter = NousPortalAdapter()
results: list = []
errors: list = []
def worker():
try:
results.append(adapter.get_credential().bearer)
except Exception as exc: # pragma: no cover - shouldn't happen
errors.append(exc)
with patch(
"hermes_cli.proxy.adapters.nous_portal.refresh_nous_oauth_from_state",
side_effect=serializing_refresh,
):
threads = [threading.Thread(target=worker) for _ in range(3)]
for t in threads:
t.start()
for t in threads:
t.join()
assert not errors, f"workers errored: {errors}"
assert len(results) == 3
assert len(call_log) == 3
assert not overlap_detected.is_set(), "refresh calls overlapped — lock is broken"
assert all(r.startswith("key-") for r in results)
# ---------------------------------------------------------------------------
# Server: path filtering + forwarding
#
# We run the proxy AND a fake upstream as real aiohttp servers on ephemeral
# ports. Avoids pytest-aiohttp's fixtures (extra dependency for one test file).
# ---------------------------------------------------------------------------
aiohttp = pytest.importorskip("aiohttp")
from aiohttp import web # noqa: E402
from hermes_cli.proxy.server import create_app # noqa: E402
class FakeAdapter(UpstreamAdapter):
"""A test adapter that returns a fixed credential without touching disk."""
def __init__(self, base_url: str, bearer: str = "test-bearer",
allowed=None, raise_on_credential=False):
self._base_url = base_url
self._bearer = bearer
self._allowed = frozenset(allowed or ["/chat/completions"])
self._raise = raise_on_credential
self.calls = 0
@property
def name(self): return "fake"
@property
def display_name(self): return "Fake Provider"
@property
def allowed_paths(self): return self._allowed
def is_authenticated(self): return True
def get_credential(self):
self.calls += 1
if self._raise:
raise RuntimeError("simulated auth failure")
return UpstreamCredential(
bearer=self._bearer, base_url=self._base_url,
expires_at="2099-01-01T00:00:00Z",
)
async def _start_runner(app: "web.Application"):
"""Spin up an aiohttp app on an ephemeral localhost port. Returns (runner, base_url)."""
runner = web.AppRunner(app, access_log=None)
await runner.setup()
site = web.TCPSite(runner, host="127.0.0.1", port=0)
await site.start()
sockets = list(site._server.sockets) # type: ignore[union-attr]
port = sockets[0].getsockname()[1]
return runner, f"http://127.0.0.1:{port}"
def _build_fake_upstream(captured: Dict[str, Any]) -> "web.Application":
async def echo(request):
body = await request.read()
captured["requests"].append({
"method": request.method,
"path": request.path,
"auth": request.headers.get("Authorization"),
"body": body.decode("utf-8") if body else "",
})
return web.json_response({"echoed": True, "path": request.path})
async def sse(request):
resp = web.StreamResponse(
status=200, headers={"Content-Type": "text/event-stream"},
)
await resp.prepare(request)
for chunk in [b"data: hello\n\n", b"data: world\n\n", b"data: [DONE]\n\n"]:
await resp.write(chunk)
await resp.write_eof()
return resp
app = web.Application()
app.router.add_route("*", "/v1/chat/completions", echo)
app.router.add_route("*", "/v1/embeddings", echo)
app.router.add_route("*", "/v1/sse", sse)
return app
def test_server_forwards_chat_completions():
async def run():
captured: Dict[str, Any] = {"requests": []}
upstream_runner, upstream_base = await _start_runner(_build_fake_upstream(captured))
adapter = FakeAdapter(f"{upstream_base}/v1", bearer="real-portal-key")
proxy_runner, proxy_base = await _start_runner(create_app(adapter))
try:
async with aiohttp.ClientSession() as session:
async with session.post(
f"{proxy_base}/v1/chat/completions",
json={"model": "Hermes-4-70B",
"messages": [{"role": "user", "content": "hi"}]},
headers={"Authorization": "Bearer client-dummy-key"},
) as resp:
assert resp.status == 200
data = await resp.json()
assert data["echoed"] is True
assert len(captured["requests"]) == 1
req = captured["requests"][0]
assert req["auth"] == "Bearer real-portal-key"
assert "Hermes-4-70B" in req["body"]
finally:
await proxy_runner.cleanup()
await upstream_runner.cleanup()
asyncio.run(run())
def test_server_rejects_disallowed_path():
async def run():
adapter = FakeAdapter("http://unused.example/v1", allowed=["/chat/completions"])
runner, base = await _start_runner(create_app(adapter))
try:
async with aiohttp.ClientSession() as session:
async with session.get(f"{base}/v1/random/endpoint") as resp:
assert resp.status == 404
body = await resp.json()
assert body["error"]["type"] == "path_not_allowed"
assert "/chat/completions" in body["error"]["message"]
finally:
await runner.cleanup()
asyncio.run(run())
def test_server_returns_401_when_adapter_fails():
async def run():
adapter = FakeAdapter("http://unused.example/v1", raise_on_credential=True)
runner, base = await _start_runner(create_app(adapter))
try:
async with aiohttp.ClientSession() as session:
async with session.post(f"{base}/v1/chat/completions", json={}) as resp:
assert resp.status == 401
body = await resp.json()
assert body["error"]["type"] == "upstream_auth_failed"
assert "simulated auth failure" in body["error"]["message"]
finally:
await runner.cleanup()
asyncio.run(run())
def test_server_health_endpoint():
async def run():
adapter = FakeAdapter("http://unused.example/v1")
runner, base = await _start_runner(create_app(adapter))
try:
async with aiohttp.ClientSession() as session:
async with session.get(f"{base}/health") as resp:
assert resp.status == 200
body = await resp.json()
assert body["status"] == "ok"
assert body["upstream"] == "Fake Provider"
assert body["authenticated"] is True
finally:
await runner.cleanup()
asyncio.run(run())
def test_server_streams_sse():
async def run():
captured: Dict[str, Any] = {"requests": []}
upstream_runner, upstream_base = await _start_runner(_build_fake_upstream(captured))
adapter = FakeAdapter(f"{upstream_base}/v1", allowed=["/sse"])
proxy_runner, proxy_base = await _start_runner(create_app(adapter))
try:
async with aiohttp.ClientSession() as session:
async with session.get(f"{proxy_base}/v1/sse") as resp:
assert resp.status == 200
chunks = []
async for chunk in resp.content.iter_any():
chunks.append(chunk)
full = b"".join(chunks)
assert b"data: hello" in full
assert b"data: [DONE]" in full
finally:
await proxy_runner.cleanup()
await upstream_runner.cleanup()
asyncio.run(run())
def test_server_strips_client_auth_header():
"""The client's Authorization header MUST NOT reach the upstream."""
async def run():
captured: Dict[str, Any] = {"requests": []}
upstream_runner, upstream_base = await _start_runner(_build_fake_upstream(captured))
adapter = FakeAdapter(f"{upstream_base}/v1", bearer="ours")
proxy_runner, proxy_base = await _start_runner(create_app(adapter))
try:
async with aiohttp.ClientSession() as session:
async with session.post(
f"{proxy_base}/v1/chat/completions",
json={},
headers={"Authorization": "Bearer SHOULD_NOT_LEAK"},
) as resp:
await resp.read()
assert captured["requests"][0]["auth"] == "Bearer ours"
assert "SHOULD_NOT_LEAK" not in captured["requests"][0]["auth"]
finally:
await proxy_runner.cleanup()
await upstream_runner.cleanup()
asyncio.run(run())
# ---------------------------------------------------------------------------
# CLI handlers
# ---------------------------------------------------------------------------
def test_cmd_proxy_status_runs(capsys, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
from hermes_cli.proxy.cli import cmd_proxy_status
args = MagicMock()
rc = cmd_proxy_status(args)
assert rc == 0
out = capsys.readouterr().out
assert "nous" in out
assert "Nous Portal" in out
assert "not logged in" in out
def test_cmd_proxy_providers_runs(capsys):
from hermes_cli.proxy.cli import cmd_proxy_list_providers
args = MagicMock()
rc = cmd_proxy_list_providers(args)
assert rc == 0
out = capsys.readouterr().out
assert "nous" in out
assert "Nous Portal" in out
def test_cmd_proxy_start_refuses_unknown_provider(capsys):
from hermes_cli.proxy.cli import cmd_proxy_start
args = MagicMock()
args.provider = "no-such-provider"
args.host = None
args.port = None
rc = cmd_proxy_start(args)
assert rc == 2
err = capsys.readouterr().err
assert "no-such-provider" in err
def test_cmd_proxy_start_refuses_when_unauthenticated(capsys, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
from hermes_cli.proxy.cli import cmd_proxy_start
args = MagicMock()
args.provider = "nous"
args.host = None
args.port = None
rc = cmd_proxy_start(args)
assert rc == 2
err = capsys.readouterr().err
assert "hermes login nous" in err