mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-18 04:41:56 +00:00
Adds 'hermes proxy start' — a local HTTP server that lets external apps (OpenViking, Karakeep, Open WebUI, ...) use a Hermes-managed provider subscription as their LLM endpoint. The proxy attaches the user's real OAuth-resolved credentials to each forwarded request, refreshing them automatically; the client can send any bearer (it gets stripped). Ships with one adapter — Nous Portal. The UpstreamAdapter ABC and registry in hermes_cli/proxy/adapters/ are designed for additional OAuth providers to plug in by name without server changes. Commands: hermes proxy start [--provider nous] [--host 127.0.0.1] [--port 8645] hermes proxy status hermes proxy providers Allowed Portal paths: /v1/chat/completions, /v1/completions, /v1/embeddings, /v1/models. Anything else returns 404 with a clear error pointing at the allowed list. aiohttp is gated like gateway/platforms/api_server.py (try-import, clean runtime error if missing). No new core dependency. Tests: 24 unit tests + 1 separate E2E that spawns the real subprocess and verifies the upstream receives the right bearer with the client's header stripped.
265 lines
9 KiB
Python
265 lines
9 KiB
Python
"""HTTP server that forwards OpenAI-compatible requests to a configured upstream.
|
|
|
|
Listens on ``http://<host>:<port>/v1/<path>`` and forwards each request to
|
|
``<upstream-base-url>/<path>`` with the client's ``Authorization`` header
|
|
replaced by a freshly-resolved bearer from the configured adapter. The
|
|
response is streamed back unmodified, preserving SSE.
|
|
|
|
The server is intentionally minimal: it does NOT mediate, log, transform,
|
|
or rewrite request/response bodies. It's a credential-attaching forwarder.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import signal
|
|
from typing import Optional
|
|
|
|
try:
|
|
import aiohttp
|
|
from aiohttp import web
|
|
AIOHTTP_AVAILABLE = True
|
|
except ImportError:
|
|
aiohttp = None # type: ignore[assignment]
|
|
web = None # type: ignore[assignment]
|
|
AIOHTTP_AVAILABLE = False
|
|
|
|
from hermes_cli.proxy.adapters.base import UpstreamAdapter
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Headers we strip when forwarding to the upstream. ``host``/``content-length``
|
|
# are recomputed by aiohttp; ``authorization`` is replaced with our bearer.
|
|
# Everything else (content-type, accept, user-agent, x-* headers) passes through.
|
|
_HOP_BY_HOP_HEADERS = frozenset(
|
|
{
|
|
"host",
|
|
"content-length",
|
|
"connection",
|
|
"keep-alive",
|
|
"proxy-authenticate",
|
|
"proxy-authorization",
|
|
"te",
|
|
"trailers",
|
|
"transfer-encoding",
|
|
"upgrade",
|
|
"authorization", # we replace this one
|
|
}
|
|
)
|
|
|
|
DEFAULT_PORT = 8645
|
|
DEFAULT_HOST = "127.0.0.1"
|
|
|
|
|
|
def _json_error(status: int, message: str, code: str = "proxy_error") -> "web.Response":
|
|
"""Return an OpenAI-style error JSON response."""
|
|
body = {"error": {"message": message, "type": code, "code": code}}
|
|
return web.json_response(body, status=status)
|
|
|
|
|
|
def _filter_request_headers(headers: "aiohttp.typedefs.LooseHeaders") -> dict:
|
|
"""Strip hop-by-hop + auth headers from the inbound request."""
|
|
out = {}
|
|
for key, value in headers.items():
|
|
if key.lower() in _HOP_BY_HOP_HEADERS:
|
|
continue
|
|
out[key] = value
|
|
return out
|
|
|
|
|
|
def _filter_response_headers(headers) -> dict:
|
|
"""Strip hop-by-hop headers from the upstream response."""
|
|
out = {}
|
|
for key, value in headers.items():
|
|
if key.lower() in _HOP_BY_HOP_HEADERS:
|
|
continue
|
|
# aiohttp recomputes Content-Encoding/Content-Length on stream — let it.
|
|
if key.lower() in ("content-encoding", "content-length"):
|
|
continue
|
|
out[key] = value
|
|
return out
|
|
|
|
|
|
def create_app(adapter: UpstreamAdapter) -> "web.Application":
|
|
"""Build the aiohttp application bound to a specific upstream adapter."""
|
|
if not AIOHTTP_AVAILABLE:
|
|
raise RuntimeError(
|
|
"aiohttp is required for `hermes proxy`. Install with: "
|
|
"pip install 'hermes-agent[messaging]' or `pip install aiohttp`."
|
|
)
|
|
|
|
app = web.Application()
|
|
# AppKey ensures forward-compat with future aiohttp versions that strip
|
|
# bare-string keys.
|
|
_adapter_key = web.AppKey("adapter", UpstreamAdapter)
|
|
app[_adapter_key] = adapter
|
|
|
|
async def handle_health(request: "web.Request") -> "web.Response":
|
|
return web.json_response(
|
|
{
|
|
"status": "ok",
|
|
"upstream": adapter.display_name,
|
|
"authenticated": adapter.is_authenticated(),
|
|
}
|
|
)
|
|
|
|
async def handle_models_fallback(request: "web.Request") -> "web.Response":
|
|
# Most clients hit /v1/models on startup. If the upstream doesn't
|
|
# serve /models, synthesize a minimal response so clients don't
|
|
# crash. The actual forwarding path handles /models when allowed.
|
|
return web.json_response(
|
|
{
|
|
"object": "list",
|
|
"data": [],
|
|
}
|
|
)
|
|
|
|
async def handle_proxy(request: "web.Request") -> "web.StreamResponse":
|
|
# Extract the path *after* /v1
|
|
rel_path = request.match_info.get("tail", "")
|
|
rel_path = "/" + rel_path.lstrip("/")
|
|
|
|
if rel_path not in adapter.allowed_paths:
|
|
allowed = ", ".join(sorted(adapter.allowed_paths))
|
|
return _json_error(
|
|
404,
|
|
f"Path /v1{rel_path} is not forwarded by this proxy. "
|
|
f"Allowed: {allowed}",
|
|
code="path_not_allowed",
|
|
)
|
|
|
|
try:
|
|
cred = adapter.get_credential()
|
|
except Exception as exc:
|
|
logger.warning("proxy: credential resolution failed: %s", exc)
|
|
return _json_error(401, str(exc), code="upstream_auth_failed")
|
|
|
|
upstream_url = f"{cred.base_url.rstrip('/')}{rel_path}"
|
|
# Preserve query string verbatim.
|
|
if request.query_string:
|
|
upstream_url = f"{upstream_url}?{request.query_string}"
|
|
|
|
# Forward body verbatim. Read into memory once — request bodies for
|
|
# chat/completions/embeddings are small (<1MB typically). If we ever
|
|
# need to forward large multipart uploads we'll switch to streaming
|
|
# the request body too.
|
|
body = await request.read()
|
|
|
|
fwd_headers = _filter_request_headers(request.headers)
|
|
fwd_headers["Authorization"] = f"{cred.token_type} {cred.bearer}"
|
|
|
|
logger.debug(
|
|
"proxy: forwarding %s %s -> %s (body=%d bytes)",
|
|
request.method, rel_path, upstream_url, len(body),
|
|
)
|
|
|
|
# Use a per-request session so connection state doesn't leak between
|
|
# clients. Could be optimized to a shared session later.
|
|
timeout = aiohttp.ClientTimeout(total=None, sock_connect=15, sock_read=300)
|
|
try:
|
|
session = aiohttp.ClientSession(timeout=timeout)
|
|
except Exception as exc: # pragma: no cover - aiohttp setup issue
|
|
return _json_error(500, f"proxy session init failed: {exc}")
|
|
|
|
try:
|
|
upstream_resp = await session.request(
|
|
request.method,
|
|
upstream_url,
|
|
data=body if body else None,
|
|
headers=fwd_headers,
|
|
allow_redirects=False,
|
|
)
|
|
except aiohttp.ClientError as exc:
|
|
await session.close()
|
|
logger.warning("proxy: upstream connection failed: %s", exc)
|
|
return _json_error(502, f"upstream connection failed: {exc}",
|
|
code="upstream_unreachable")
|
|
except asyncio.TimeoutError:
|
|
await session.close()
|
|
return _json_error(504, "upstream request timed out",
|
|
code="upstream_timeout")
|
|
|
|
# Stream response back. Headers first, then chunked body.
|
|
resp = web.StreamResponse(
|
|
status=upstream_resp.status,
|
|
headers=_filter_response_headers(upstream_resp.headers),
|
|
)
|
|
await resp.prepare(request)
|
|
|
|
try:
|
|
async for chunk in upstream_resp.content.iter_any():
|
|
if chunk:
|
|
await resp.write(chunk)
|
|
except (aiohttp.ClientError, asyncio.CancelledError) as exc:
|
|
logger.warning("proxy: streaming interrupted: %s", exc)
|
|
finally:
|
|
upstream_resp.release()
|
|
await session.close()
|
|
|
|
await resp.write_eof()
|
|
return resp
|
|
|
|
# /health doesn't go through the upstream
|
|
app.router.add_get("/health", handle_health)
|
|
# Catch-all under /v1 — forwards if the path is allowed.
|
|
app.router.add_route("*", "/v1/{tail:.*}", handle_proxy)
|
|
|
|
return app
|
|
|
|
|
|
async def run_server(
|
|
adapter: UpstreamAdapter,
|
|
host: str = DEFAULT_HOST,
|
|
port: int = DEFAULT_PORT,
|
|
shutdown_event: Optional[asyncio.Event] = None,
|
|
) -> None:
|
|
"""Run the proxy in the current event loop until shutdown_event is set.
|
|
|
|
If shutdown_event is None, runs until cancelled (Ctrl+C or SIGTERM).
|
|
"""
|
|
if not AIOHTTP_AVAILABLE:
|
|
raise RuntimeError(
|
|
"aiohttp is required for `hermes proxy`. Install with: "
|
|
"pip install 'hermes-agent[messaging]' or `pip install aiohttp`."
|
|
)
|
|
|
|
app = create_app(adapter)
|
|
runner = web.AppRunner(app, access_log=None)
|
|
await runner.setup()
|
|
site = web.TCPSite(runner, host=host, port=port)
|
|
await site.start()
|
|
|
|
logger.info(
|
|
"proxy: listening on http://%s:%d/v1 -> %s",
|
|
host, port, adapter.display_name,
|
|
)
|
|
|
|
stop_event = shutdown_event or asyncio.Event()
|
|
|
|
# Wire signal handlers when we own the loop's lifetime.
|
|
if shutdown_event is None:
|
|
loop = asyncio.get_running_loop()
|
|
for sig in (signal.SIGINT, signal.SIGTERM):
|
|
try:
|
|
loop.add_signal_handler(sig, stop_event.set)
|
|
except NotImplementedError:
|
|
# Windows / restricted environments — Ctrl+C will still
|
|
# raise KeyboardInterrupt and unwind us.
|
|
pass
|
|
|
|
try:
|
|
await stop_event.wait()
|
|
finally:
|
|
logger.info("proxy: shutting down")
|
|
await runner.cleanup()
|
|
|
|
|
|
__all__ = [
|
|
"create_app",
|
|
"run_server",
|
|
"DEFAULT_HOST",
|
|
"DEFAULT_PORT",
|
|
"AIOHTTP_AVAILABLE",
|
|
]
|