"""HTTP server that forwards OpenAI-compatible requests to a configured upstream. Listens on ``http://:/v1/`` and forwards each request to ``/`` with the client's ``Authorization`` header replaced by a freshly-resolved bearer from the configured adapter. The response is streamed back unmodified, preserving SSE. The server is intentionally minimal: it does NOT mediate, log, transform, or rewrite request/response bodies. It's a credential-attaching forwarder. """ from __future__ import annotations import asyncio import json import logging import signal from typing import Optional try: import aiohttp from aiohttp import web AIOHTTP_AVAILABLE = True except ImportError: aiohttp = None # type: ignore[assignment] web = None # type: ignore[assignment] AIOHTTP_AVAILABLE = False from hermes_cli.proxy.adapters.base import UpstreamAdapter logger = logging.getLogger(__name__) # Headers we strip when forwarding to the upstream. ``host``/``content-length`` # are recomputed by aiohttp; ``authorization`` is replaced with our bearer. # Everything else (content-type, accept, user-agent, x-* headers) passes through. _HOP_BY_HOP_HEADERS = frozenset( { "host", "content-length", "connection", "keep-alive", "proxy-authenticate", "proxy-authorization", "te", "trailers", "transfer-encoding", "upgrade", "authorization", # we replace this one } ) DEFAULT_PORT = 8645 DEFAULT_HOST = "127.0.0.1" def _json_error(status: int, message: str, code: str = "proxy_error") -> "web.Response": """Return an OpenAI-style error JSON response.""" body = {"error": {"message": message, "type": code, "code": code}} return web.json_response(body, status=status) def _filter_request_headers(headers: "aiohttp.typedefs.LooseHeaders") -> dict: """Strip hop-by-hop + auth headers from the inbound request.""" out = {} for key, value in headers.items(): if key.lower() in _HOP_BY_HOP_HEADERS: continue out[key] = value return out def _filter_response_headers(headers) -> dict: """Strip hop-by-hop headers from the upstream response.""" out = {} for key, value in headers.items(): if key.lower() in _HOP_BY_HOP_HEADERS: continue # aiohttp recomputes Content-Encoding/Content-Length on stream — let it. if key.lower() in ("content-encoding", "content-length"): continue out[key] = value return out def create_app(adapter: UpstreamAdapter) -> "web.Application": """Build the aiohttp application bound to a specific upstream adapter.""" if not AIOHTTP_AVAILABLE: raise RuntimeError( "aiohttp is required for `hermes proxy`. Install with: " "pip install 'hermes-agent[messaging]' or `pip install aiohttp`." ) app = web.Application() # AppKey ensures forward-compat with future aiohttp versions that strip # bare-string keys. _adapter_key = web.AppKey("adapter", UpstreamAdapter) app[_adapter_key] = adapter async def handle_health(request: "web.Request") -> "web.Response": return web.json_response( { "status": "ok", "upstream": adapter.display_name, "authenticated": adapter.is_authenticated(), } ) async def handle_models_fallback(request: "web.Request") -> "web.Response": # Most clients hit /v1/models on startup. If the upstream doesn't # serve /models, synthesize a minimal response so clients don't # crash. The actual forwarding path handles /models when allowed. return web.json_response( { "object": "list", "data": [], } ) async def handle_proxy(request: "web.Request") -> "web.StreamResponse": # Extract the path *after* /v1 rel_path = request.match_info.get("tail", "") rel_path = "/" + rel_path.lstrip("/") if rel_path not in adapter.allowed_paths: allowed = ", ".join(sorted(adapter.allowed_paths)) return _json_error( 404, f"Path /v1{rel_path} is not forwarded by this proxy. " f"Allowed: {allowed}", code="path_not_allowed", ) try: cred = adapter.get_credential() except Exception as exc: logger.warning("proxy: credential resolution failed: %s", exc) return _json_error(401, str(exc), code="upstream_auth_failed") upstream_url = f"{cred.base_url.rstrip('/')}{rel_path}" # Preserve query string verbatim. if request.query_string: upstream_url = f"{upstream_url}?{request.query_string}" # Forward body verbatim. Read into memory once — request bodies for # chat/completions/embeddings are small (<1MB typically). If we ever # need to forward large multipart uploads we'll switch to streaming # the request body too. body = await request.read() fwd_headers = _filter_request_headers(request.headers) fwd_headers["Authorization"] = f"{cred.token_type} {cred.bearer}" logger.debug( "proxy: forwarding %s %s -> %s (body=%d bytes)", request.method, rel_path, upstream_url, len(body), ) # Use a per-request session so connection state doesn't leak between # clients. Could be optimized to a shared session later. timeout = aiohttp.ClientTimeout(total=None, sock_connect=15, sock_read=300) try: session = aiohttp.ClientSession(timeout=timeout) except Exception as exc: # pragma: no cover - aiohttp setup issue return _json_error(500, f"proxy session init failed: {exc}") try: upstream_resp = await session.request( request.method, upstream_url, data=body if body else None, headers=fwd_headers, allow_redirects=False, ) except aiohttp.ClientError as exc: await session.close() logger.warning("proxy: upstream connection failed: %s", exc) return _json_error(502, f"upstream connection failed: {exc}", code="upstream_unreachable") except asyncio.TimeoutError: await session.close() return _json_error(504, "upstream request timed out", code="upstream_timeout") # Stream response back. Headers first, then chunked body. resp = web.StreamResponse( status=upstream_resp.status, headers=_filter_response_headers(upstream_resp.headers), ) await resp.prepare(request) try: async for chunk in upstream_resp.content.iter_any(): if chunk: await resp.write(chunk) except (aiohttp.ClientError, asyncio.CancelledError) as exc: logger.warning("proxy: streaming interrupted: %s", exc) finally: upstream_resp.release() await session.close() await resp.write_eof() return resp # /health doesn't go through the upstream app.router.add_get("/health", handle_health) # Catch-all under /v1 — forwards if the path is allowed. app.router.add_route("*", "/v1/{tail:.*}", handle_proxy) return app async def run_server( adapter: UpstreamAdapter, host: str = DEFAULT_HOST, port: int = DEFAULT_PORT, shutdown_event: Optional[asyncio.Event] = None, ) -> None: """Run the proxy in the current event loop until shutdown_event is set. If shutdown_event is None, runs until cancelled (Ctrl+C or SIGTERM). """ if not AIOHTTP_AVAILABLE: raise RuntimeError( "aiohttp is required for `hermes proxy`. Install with: " "pip install 'hermes-agent[messaging]' or `pip install aiohttp`." ) app = create_app(adapter) runner = web.AppRunner(app, access_log=None) await runner.setup() site = web.TCPSite(runner, host=host, port=port) await site.start() logger.info( "proxy: listening on http://%s:%d/v1 -> %s", host, port, adapter.display_name, ) stop_event = shutdown_event or asyncio.Event() # Wire signal handlers when we own the loop's lifetime. if shutdown_event is None: loop = asyncio.get_running_loop() for sig in (signal.SIGINT, signal.SIGTERM): try: loop.add_signal_handler(sig, stop_event.set) # windows-footgun: ok except NotImplementedError: # Windows / restricted environments — Ctrl+C will still # raise KeyboardInterrupt and unwind us. pass try: await stop_event.wait() finally: logger.info("proxy: shutting down") await runner.cleanup() __all__ = [ "create_app", "run_server", "DEFAULT_HOST", "DEFAULT_PORT", "AIOHTTP_AVAILABLE", ]