diff --git a/gateway/config.py b/gateway/config.py index ba0840bfc0..6ee65bf3b3 100644 --- a/gateway/config.py +++ b/gateway/config.py @@ -29,6 +29,7 @@ class Platform(Enum): SIGNAL = "signal" HOMEASSISTANT = "homeassistant" EMAIL = "email" + API_SERVER = "api_server" @dataclass @@ -171,6 +172,9 @@ class GatewayConfig: # Email uses extra dict for config (address + imap_host + smtp_host) elif platform == Platform.EMAIL and config.extra.get("address"): connected.append(platform) + # API Server uses enabled flag only (no token needed) + elif platform == Platform.API_SERVER: + connected.append(platform) return connected def get_home_channel(self, platform: Platform) -> Optional[HomeChannel]: @@ -446,6 +450,25 @@ def _apply_env_overrides(config: GatewayConfig) -> None: name=os.getenv("EMAIL_HOME_ADDRESS_NAME", "Home"), ) + # API Server + api_server_enabled = os.getenv("API_SERVER_ENABLED", "").lower() in ("true", "1", "yes") + api_server_key = os.getenv("API_SERVER_KEY", "") + api_server_port = os.getenv("API_SERVER_PORT") + api_server_host = os.getenv("API_SERVER_HOST") + if api_server_enabled or api_server_key: + if Platform.API_SERVER not in config.platforms: + config.platforms[Platform.API_SERVER] = PlatformConfig() + config.platforms[Platform.API_SERVER].enabled = True + if api_server_key: + config.platforms[Platform.API_SERVER].extra["key"] = api_server_key + if api_server_port: + try: + config.platforms[Platform.API_SERVER].extra["port"] = int(api_server_port) + except ValueError: + pass + if api_server_host: + config.platforms[Platform.API_SERVER].extra["host"] = api_server_host + # Session settings idle_minutes = os.getenv("SESSION_IDLE_MINUTES") if idle_minutes: diff --git a/gateway/platforms/api_server.py b/gateway/platforms/api_server.py new file mode 100644 index 0000000000..25e6db950c --- /dev/null +++ b/gateway/platforms/api_server.py @@ -0,0 +1,563 @@ +""" +OpenAI-compatible API server platform adapter. + +Exposes an HTTP server with endpoints: +- POST /v1/chat/completions — OpenAI Chat Completions format (stateless) +- POST /v1/responses — OpenAI Responses API format (stateful via previous_response_id) +- GET /v1/models — lists hermes-agent as an available model +- GET /health — health check + +Any OpenAI-compatible frontend (Open WebUI, LobeChat, etc.) can connect +to hermes-agent through this adapter. + +Requires: +- aiohttp (already available in the gateway) +""" + +import asyncio +import collections +import json +import logging +import os +import time +import uuid +from functools import wraps +from typing import Any, Dict, List, Optional + +try: + import aiohttp + from aiohttp import web + AIOHTTP_AVAILABLE = True +except ImportError: + AIOHTTP_AVAILABLE = False + aiohttp = None # type: ignore[assignment] + web = None # type: ignore[assignment] + +from gateway.config import Platform, PlatformConfig +from gateway.platforms.base import ( + BasePlatformAdapter, + MessageEvent, + MessageType, + SendResult, +) + +logger = logging.getLogger(__name__) + +# Default settings +DEFAULT_HOST = "127.0.0.1" +DEFAULT_PORT = 8642 +MAX_STORED_RESPONSES = 100 + + +def check_api_server_requirements() -> bool: + """Check if API server dependencies are available.""" + return AIOHTTP_AVAILABLE + + +class ResponseStore: + """ + In-memory LRU store for Responses API state. + + Each stored response includes the full internal conversation history + (with tool calls and results) so it can be reconstructed on subsequent + requests via previous_response_id. + """ + + def __init__(self, max_size: int = MAX_STORED_RESPONSES): + self._store: collections.OrderedDict[str, Dict[str, Any]] = collections.OrderedDict() + self._max_size = max_size + + def get(self, response_id: str) -> Optional[Dict[str, Any]]: + """Retrieve a stored response by ID (moves to end for LRU).""" + if response_id in self._store: + self._store.move_to_end(response_id) + return self._store[response_id] + return None + + def put(self, response_id: str, data: Dict[str, Any]) -> None: + """Store a response, evicting the oldest if at capacity.""" + if response_id in self._store: + self._store.move_to_end(response_id) + self._store[response_id] = data + while len(self._store) > self._max_size: + self._store.popitem(last=False) + + def __len__(self) -> int: + return len(self._store) + + +class APIServerAdapter(BasePlatformAdapter): + """ + OpenAI-compatible HTTP API server adapter. + + Runs an aiohttp web server that accepts OpenAI-format requests + and routes them through hermes-agent's AIAgent. + """ + + def __init__(self, config: PlatformConfig): + super().__init__(config, Platform.API_SERVER) + extra = config.extra or {} + self._host: str = extra.get("host", os.getenv("API_SERVER_HOST", DEFAULT_HOST)) + self._port: int = int(extra.get("port", os.getenv("API_SERVER_PORT", str(DEFAULT_PORT)))) + self._api_key: str = extra.get("key", os.getenv("API_SERVER_KEY", "")) + self._app: Optional["web.Application"] = None + self._runner: Optional["web.AppRunner"] = None + self._site: Optional["web.TCPSite"] = None + self._response_store = ResponseStore() + + # ------------------------------------------------------------------ + # Auth helper + # ------------------------------------------------------------------ + + def _check_auth(self, request: "web.Request") -> Optional["web.Response"]: + """ + Validate Bearer token from Authorization header. + + Returns None if auth is OK, or a 401 web.Response on failure. + If no API key is configured, all requests are allowed. + """ + if not self._api_key: + return None # No key configured — allow all (local-only use) + + auth_header = request.headers.get("Authorization", "") + if auth_header.startswith("Bearer "): + token = auth_header[7:].strip() + if token == self._api_key: + return None # Auth OK + + return web.json_response( + {"error": {"message": "Invalid API key", "type": "invalid_request_error", "code": "invalid_api_key"}}, + status=401, + ) + + # ------------------------------------------------------------------ + # Agent creation helper + # ------------------------------------------------------------------ + + def _create_agent( + self, + ephemeral_system_prompt: Optional[str] = None, + session_id: Optional[str] = None, + ) -> Any: + """ + Create an AIAgent instance using the gateway's runtime config. + + Uses _resolve_runtime_agent_kwargs() to pick up model, api_key, + base_url, etc. from config.yaml / env vars. + """ + from run_agent import AIAgent + from gateway.run import _resolve_runtime_agent_kwargs + + runtime_kwargs = _resolve_runtime_agent_kwargs() + + # Read model from env/config (same as gateway run.py) + model = os.getenv("HERMES_MODEL") or os.getenv("LLM_MODEL") or "anthropic/claude-opus-4.6" + try: + import yaml + from pathlib import Path + config_yaml_path = Path.home() / ".hermes" / "config.yaml" + if config_yaml_path.exists(): + with open(config_yaml_path, encoding="utf-8") as f: + cfg = yaml.safe_load(f) or {} + model_cfg = cfg.get("model", {}) + if isinstance(model_cfg, str): + model = model_cfg + elif isinstance(model_cfg, dict): + model = model_cfg.get("default", model) + except Exception: + pass + + max_iterations = int(os.getenv("HERMES_MAX_ITERATIONS", "90")) + + agent = AIAgent( + model=model, + **runtime_kwargs, + max_iterations=max_iterations, + quiet_mode=True, + verbose_logging=False, + ephemeral_system_prompt=ephemeral_system_prompt or None, + session_id=session_id, + platform="api_server", + ) + return agent + + # ------------------------------------------------------------------ + # HTTP Handlers + # ------------------------------------------------------------------ + + async def _handle_health(self, request: "web.Request") -> "web.Response": + """GET /health — simple health check.""" + return web.json_response({"status": "ok", "platform": "hermes-agent"}) + + async def _handle_models(self, request: "web.Request") -> "web.Response": + """GET /v1/models — return hermes-agent as an available model.""" + auth_err = self._check_auth(request) + if auth_err: + return auth_err + + return web.json_response({ + "object": "list", + "data": [ + { + "id": "hermes-agent", + "object": "model", + "created": int(time.time()), + "owned_by": "hermes", + "permission": [], + "root": "hermes-agent", + "parent": None, + } + ], + }) + + async def _handle_chat_completions(self, request: "web.Request") -> "web.Response": + """POST /v1/chat/completions — OpenAI Chat Completions format.""" + auth_err = self._check_auth(request) + if auth_err: + return auth_err + + # Parse request body + try: + body = await request.json() + except (json.JSONDecodeError, Exception): + return web.json_response( + {"error": {"message": "Invalid JSON in request body", "type": "invalid_request_error"}}, + status=400, + ) + + messages = body.get("messages") + if not messages or not isinstance(messages, list): + return web.json_response( + {"error": {"message": "Missing or invalid 'messages' field", "type": "invalid_request_error"}}, + status=400, + ) + + stream = body.get("stream", False) + if stream: + return web.json_response( + {"error": {"message": "Streaming is not yet supported. Set stream=false.", "type": "invalid_request_error"}}, + status=501, + ) + + # Extract system message (becomes ephemeral system prompt layered ON TOP of core) + system_prompt = None + conversation_messages: List[Dict[str, str]] = [] + + for msg in messages: + role = msg.get("role", "") + content = msg.get("content", "") + if role == "system": + # Accumulate system messages + if system_prompt is None: + system_prompt = content + else: + system_prompt = system_prompt + "\n" + content + elif role in ("user", "assistant"): + conversation_messages.append({"role": role, "content": content}) + + # Extract the last user message as the primary input + user_message = "" + history = [] + if conversation_messages: + user_message = conversation_messages[-1].get("content", "") + history = conversation_messages[:-1] + + if not user_message: + return web.json_response( + {"error": {"message": "No user message found in messages", "type": "invalid_request_error"}}, + status=400, + ) + + # Run the agent in an executor (run_conversation is synchronous) + session_id = str(uuid.uuid4()) + try: + result = await self._run_agent( + user_message=user_message, + conversation_history=history, + ephemeral_system_prompt=system_prompt, + session_id=session_id, + ) + except Exception as e: + logger.error("Error running agent for chat completions: %s", e, exc_info=True) + return web.json_response( + {"error": {"message": f"Internal server error: {e}", "type": "server_error"}}, + status=500, + ) + + final_response = result.get("final_response", "") + if not final_response: + final_response = result.get("error", "(No response generated)") + + completion_id = f"chatcmpl-{uuid.uuid4().hex[:29]}" + response_data = { + "id": completion_id, + "object": "chat.completion", + "created": int(time.time()), + "model": body.get("model", "hermes-agent"), + "choices": [ + { + "index": 0, + "message": { + "role": "assistant", + "content": final_response, + }, + "finish_reason": "stop", + } + ], + "usage": { + "prompt_tokens": 0, + "completion_tokens": 0, + "total_tokens": 0, + }, + } + + return web.json_response(response_data) + + async def _handle_responses(self, request: "web.Request") -> "web.Response": + """POST /v1/responses — OpenAI Responses API format.""" + auth_err = self._check_auth(request) + if auth_err: + return auth_err + + # Parse request body + try: + body = await request.json() + except (json.JSONDecodeError, Exception): + return web.json_response( + {"error": {"message": "Invalid JSON in request body", "type": "invalid_request_error"}}, + status=400, + ) + + raw_input = body.get("input") + if raw_input is None: + return web.json_response( + {"error": {"message": "Missing 'input' field", "type": "invalid_request_error"}}, + status=400, + ) + + instructions = body.get("instructions") + previous_response_id = body.get("previous_response_id") + store = body.get("store", True) + + # Normalize input to message list + input_messages: List[Dict[str, str]] = [] + if isinstance(raw_input, str): + input_messages = [{"role": "user", "content": raw_input}] + elif isinstance(raw_input, list): + for item in raw_input: + if isinstance(item, str): + input_messages.append({"role": "user", "content": item}) + elif isinstance(item, dict): + role = item.get("role", "user") + content = item.get("content", "") + # Handle content that may be a list of content parts + if isinstance(content, list): + text_parts = [] + for part in content: + if isinstance(part, dict) and part.get("type") == "input_text": + text_parts.append(part.get("text", "")) + elif isinstance(part, dict) and part.get("type") == "output_text": + text_parts.append(part.get("text", "")) + elif isinstance(part, str): + text_parts.append(part) + content = "\n".join(text_parts) + input_messages.append({"role": role, "content": content}) + else: + return web.json_response( + {"error": {"message": "'input' must be a string or array", "type": "invalid_request_error"}}, + status=400, + ) + + # Reconstruct conversation history from previous_response_id + conversation_history: List[Dict[str, str]] = [] + if previous_response_id: + stored = self._response_store.get(previous_response_id) + if stored is None: + return web.json_response( + {"error": {"message": f"Previous response not found: {previous_response_id}", "type": "invalid_request_error"}}, + status=404, + ) + conversation_history = list(stored.get("conversation_history", [])) + # If no instructions provided, carry forward from previous + if instructions is None: + instructions = stored.get("instructions") + + # Append new input messages to history (all but the last become history) + for msg in input_messages[:-1]: + conversation_history.append(msg) + + # Last input message is the user_message + user_message = input_messages[-1].get("content", "") if input_messages else "" + if not user_message: + return web.json_response( + {"error": {"message": "No user message found in input", "type": "invalid_request_error"}}, + status=400, + ) + + # Run the agent + session_id = str(uuid.uuid4()) + try: + result = await self._run_agent( + user_message=user_message, + conversation_history=conversation_history, + ephemeral_system_prompt=instructions, + session_id=session_id, + ) + except Exception as e: + logger.error("Error running agent for responses: %s", e, exc_info=True) + return web.json_response( + {"error": {"message": f"Internal server error: {e}", "type": "server_error"}}, + status=500, + ) + + final_response = result.get("final_response", "") + if not final_response: + final_response = result.get("error", "(No response generated)") + + response_id = f"resp_{uuid.uuid4().hex[:28]}" + created_at = int(time.time()) + + # Build the full conversation history for storage + # (includes tool calls from the agent run) + full_history = list(conversation_history) + full_history.append({"role": "user", "content": user_message}) + # Add agent's internal messages if available + agent_messages = result.get("messages", []) + if agent_messages: + full_history.extend(agent_messages) + else: + full_history.append({"role": "assistant", "content": final_response}) + + # Store response for future chaining + if store: + self._response_store.put(response_id, { + "input": raw_input, + "output": final_response, + "conversation_history": full_history, + "instructions": instructions, + "created_at": created_at, + }) + + response_data = { + "id": response_id, + "object": "response", + "status": "completed", + "created_at": created_at, + "model": body.get("model", "hermes-agent"), + "output": [ + { + "type": "message", + "role": "assistant", + "content": [ + { + "type": "output_text", + "text": final_response, + } + ], + } + ], + "usage": { + "input_tokens": 0, + "output_tokens": 0, + "total_tokens": 0, + }, + } + + return web.json_response(response_data) + + # ------------------------------------------------------------------ + # Agent execution + # ------------------------------------------------------------------ + + async def _run_agent( + self, + user_message: str, + conversation_history: List[Dict[str, str]], + ephemeral_system_prompt: Optional[str] = None, + session_id: Optional[str] = None, + ) -> Dict[str, Any]: + """ + Create an agent and run a conversation in a thread executor. + + run_conversation() is synchronous, so we run it off the event loop. + """ + loop = asyncio.get_event_loop() + + def _run(): + agent = self._create_agent( + ephemeral_system_prompt=ephemeral_system_prompt, + session_id=session_id, + ) + result = agent.run_conversation( + user_message=user_message, + conversation_history=conversation_history, + ) + return result + + return await loop.run_in_executor(None, _run) + + # ------------------------------------------------------------------ + # BasePlatformAdapter interface + # ------------------------------------------------------------------ + + async def connect(self) -> bool: + """Start the aiohttp web server.""" + if not AIOHTTP_AVAILABLE: + logger.warning("[%s] aiohttp not installed", self.name) + return False + + try: + self._app = web.Application() + self._app.router.add_get("/health", self._handle_health) + self._app.router.add_get("/v1/models", self._handle_models) + self._app.router.add_post("/v1/chat/completions", self._handle_chat_completions) + self._app.router.add_post("/v1/responses", self._handle_responses) + + self._runner = web.AppRunner(self._app) + await self._runner.setup() + self._site = web.TCPSite(self._runner, self._host, self._port) + await self._site.start() + + self._running = True + logger.info( + "[%s] API server listening on http://%s:%d", + self.name, self._host, self._port, + ) + return True + + except Exception as e: + logger.error("[%s] Failed to start API server: %s", self.name, e) + return False + + async def disconnect(self) -> None: + """Stop the aiohttp web server.""" + self._running = False + if self._site: + await self._site.stop() + self._site = None + if self._runner: + await self._runner.cleanup() + self._runner = None + self._app = None + logger.info("[%s] API server stopped", self.name) + + async def send( + self, + chat_id: str, + content: str, + reply_to: Optional[str] = None, + metadata: Optional[Dict[str, Any]] = None, + ) -> SendResult: + """ + Not used — HTTP request/response cycle handles delivery directly. + """ + return SendResult(success=False, error="API server uses HTTP request/response, not send()") + + async def get_chat_info(self, chat_id: str) -> Dict[str, Any]: + """Return basic info about the API server.""" + return { + "name": "API Server", + "type": "api", + "host": self._host, + "port": self._port, + } diff --git a/gateway/run.py b/gateway/run.py index 96d43672ff..178e102915 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -679,6 +679,13 @@ class GatewayRunner: return None return EmailAdapter(config) + elif platform == Platform.API_SERVER: + from gateway.platforms.api_server import APIServerAdapter, check_api_server_requirements + if not check_api_server_requirements(): + logger.warning("API Server: aiohttp not installed") + return None + return APIServerAdapter(config) + return None def _is_user_authorized(self, source: SessionSource) -> bool: @@ -698,6 +705,11 @@ class GatewayRunner: if source.platform == Platform.HOMEASSISTANT: return True + # API Server handles auth at the HTTP layer (Bearer token), + # so requests that reach the gateway runner are already authorized. + if source.platform == Platform.API_SERVER: + return True + user_id = source.user_id if not user_id: return False @@ -709,6 +721,7 @@ class GatewayRunner: Platform.SLACK: "SLACK_ALLOWED_USERS", Platform.SIGNAL: "SIGNAL_ALLOWED_USERS", Platform.EMAIL: "EMAIL_ALLOWED_USERS", + Platform.API_SERVER: "API_SERVER_ALLOWED_KEYS", } platform_allow_all_map = { Platform.TELEGRAM: "TELEGRAM_ALLOW_ALL_USERS", @@ -717,6 +730,7 @@ class GatewayRunner: Platform.SLACK: "SLACK_ALLOW_ALL_USERS", Platform.SIGNAL: "SIGNAL_ALLOW_ALL_USERS", Platform.EMAIL: "EMAIL_ALLOW_ALL_USERS", + Platform.API_SERVER: "API_SERVER_ALLOW_ALL", } # Per-platform allow-all flag (e.g., DISCORD_ALLOW_ALL_USERS=true) diff --git a/tests/gateway/test_api_server.py b/tests/gateway/test_api_server.py new file mode 100644 index 0000000000..2e72f0b893 --- /dev/null +++ b/tests/gateway/test_api_server.py @@ -0,0 +1,816 @@ +""" +Tests for the OpenAI-compatible API server gateway adapter. + +Tests cover: +- Chat Completions endpoint (request parsing, response format) +- Responses API endpoint (request parsing, response format) +- previous_response_id chaining (store/retrieve) +- Auth (valid key, invalid key, no key configured) +- /v1/models endpoint +- /health endpoint +- System prompt extraction +- Error handling (invalid JSON, missing fields) +""" + +import json +import time +import uuid +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest +from aiohttp import web +from aiohttp.test_utils import AioHTTPTestCase, TestClient, TestServer + +from gateway.config import GatewayConfig, Platform, PlatformConfig +from gateway.platforms.api_server import ( + APIServerAdapter, + ResponseStore, + check_api_server_requirements, +) + + +# --------------------------------------------------------------------------- +# check_api_server_requirements +# --------------------------------------------------------------------------- + + +class TestCheckRequirements: + def test_returns_true_when_aiohttp_available(self): + assert check_api_server_requirements() is True + + @patch("gateway.platforms.api_server.AIOHTTP_AVAILABLE", False) + def test_returns_false_without_aiohttp(self): + assert check_api_server_requirements() is False + + +# --------------------------------------------------------------------------- +# ResponseStore +# --------------------------------------------------------------------------- + + +class TestResponseStore: + def test_put_and_get(self): + store = ResponseStore(max_size=10) + store.put("resp_1", {"output": "hello"}) + assert store.get("resp_1") == {"output": "hello"} + + def test_get_missing_returns_none(self): + store = ResponseStore(max_size=10) + assert store.get("resp_missing") is None + + def test_lru_eviction(self): + store = ResponseStore(max_size=3) + store.put("resp_1", {"output": "one"}) + store.put("resp_2", {"output": "two"}) + store.put("resp_3", {"output": "three"}) + # Adding a 4th should evict resp_1 + store.put("resp_4", {"output": "four"}) + assert store.get("resp_1") is None + assert store.get("resp_2") is not None + assert len(store) == 3 + + def test_access_refreshes_lru(self): + store = ResponseStore(max_size=3) + store.put("resp_1", {"output": "one"}) + store.put("resp_2", {"output": "two"}) + store.put("resp_3", {"output": "three"}) + # Access resp_1 to move it to end + store.get("resp_1") + # Now resp_2 is the oldest — adding a 4th should evict resp_2 + store.put("resp_4", {"output": "four"}) + assert store.get("resp_2") is None + assert store.get("resp_1") is not None + + def test_update_existing_key(self): + store = ResponseStore(max_size=10) + store.put("resp_1", {"output": "v1"}) + store.put("resp_1", {"output": "v2"}) + assert store.get("resp_1") == {"output": "v2"} + assert len(store) == 1 + + +# --------------------------------------------------------------------------- +# Adapter initialization +# --------------------------------------------------------------------------- + + +class TestAdapterInit: + def test_default_config(self): + config = PlatformConfig(enabled=True) + adapter = APIServerAdapter(config) + assert adapter._host == "127.0.0.1" + assert adapter._port == 8642 + assert adapter._api_key == "" + assert adapter.platform == Platform.API_SERVER + + def test_custom_config_from_extra(self): + config = PlatformConfig( + enabled=True, + extra={"host": "0.0.0.0", "port": 9999, "key": "sk-test"}, + ) + adapter = APIServerAdapter(config) + assert adapter._host == "0.0.0.0" + assert adapter._port == 9999 + assert adapter._api_key == "sk-test" + + def test_config_from_env(self, monkeypatch): + monkeypatch.setenv("API_SERVER_HOST", "10.0.0.1") + monkeypatch.setenv("API_SERVER_PORT", "7777") + monkeypatch.setenv("API_SERVER_KEY", "sk-env") + config = PlatformConfig(enabled=True) + adapter = APIServerAdapter(config) + assert adapter._host == "10.0.0.1" + assert adapter._port == 7777 + assert adapter._api_key == "sk-env" + + +# --------------------------------------------------------------------------- +# Auth checking +# --------------------------------------------------------------------------- + + +class TestAuth: + def test_no_key_configured_allows_all(self): + config = PlatformConfig(enabled=True) + adapter = APIServerAdapter(config) + mock_request = MagicMock() + mock_request.headers = {} + assert adapter._check_auth(mock_request) is None + + def test_valid_key_passes(self): + config = PlatformConfig(enabled=True, extra={"key": "sk-test123"}) + adapter = APIServerAdapter(config) + mock_request = MagicMock() + mock_request.headers = {"Authorization": "Bearer sk-test123"} + assert adapter._check_auth(mock_request) is None + + def test_invalid_key_returns_401(self): + config = PlatformConfig(enabled=True, extra={"key": "sk-test123"}) + adapter = APIServerAdapter(config) + mock_request = MagicMock() + mock_request.headers = {"Authorization": "Bearer wrong-key"} + result = adapter._check_auth(mock_request) + assert result is not None + assert result.status == 401 + + def test_missing_auth_header_returns_401(self): + config = PlatformConfig(enabled=True, extra={"key": "sk-test123"}) + adapter = APIServerAdapter(config) + mock_request = MagicMock() + mock_request.headers = {} + result = adapter._check_auth(mock_request) + assert result is not None + assert result.status == 401 + + def test_malformed_auth_header_returns_401(self): + config = PlatformConfig(enabled=True, extra={"key": "sk-test123"}) + adapter = APIServerAdapter(config) + mock_request = MagicMock() + mock_request.headers = {"Authorization": "Basic dXNlcjpwYXNz"} + result = adapter._check_auth(mock_request) + assert result is not None + assert result.status == 401 + + +# --------------------------------------------------------------------------- +# Helpers for HTTP tests +# --------------------------------------------------------------------------- + + +def _make_adapter(api_key: str = "") -> APIServerAdapter: + """Create an adapter with optional API key.""" + extra = {} + if api_key: + extra["key"] = api_key + config = PlatformConfig(enabled=True, extra=extra) + return APIServerAdapter(config) + + +def _create_app(adapter: APIServerAdapter) -> web.Application: + """Create the aiohttp app from the adapter (without starting the full server).""" + app = web.Application() + app.router.add_get("/health", adapter._handle_health) + app.router.add_get("/v1/models", adapter._handle_models) + app.router.add_post("/v1/chat/completions", adapter._handle_chat_completions) + app.router.add_post("/v1/responses", adapter._handle_responses) + return app + + +@pytest.fixture +def adapter(): + return _make_adapter() + + +@pytest.fixture +def auth_adapter(): + return _make_adapter(api_key="sk-secret") + + +# --------------------------------------------------------------------------- +# /health endpoint +# --------------------------------------------------------------------------- + + +class TestHealthEndpoint: + @pytest.mark.asyncio + async def test_health_returns_ok(self, adapter): + app = _create_app(adapter) + async with TestClient(TestServer(app)) as cli: + resp = await cli.get("/health") + assert resp.status == 200 + data = await resp.json() + assert data["status"] == "ok" + assert data["platform"] == "hermes-agent" + + +# --------------------------------------------------------------------------- +# /v1/models endpoint +# --------------------------------------------------------------------------- + + +class TestModelsEndpoint: + @pytest.mark.asyncio + async def test_models_returns_hermes_agent(self, adapter): + app = _create_app(adapter) + async with TestClient(TestServer(app)) as cli: + resp = await cli.get("/v1/models") + assert resp.status == 200 + data = await resp.json() + assert data["object"] == "list" + assert len(data["data"]) == 1 + assert data["data"][0]["id"] == "hermes-agent" + assert data["data"][0]["owned_by"] == "hermes" + + @pytest.mark.asyncio + async def test_models_requires_auth(self, auth_adapter): + app = _create_app(auth_adapter) + async with TestClient(TestServer(app)) as cli: + resp = await cli.get("/v1/models") + assert resp.status == 401 + + @pytest.mark.asyncio + async def test_models_with_valid_auth(self, auth_adapter): + app = _create_app(auth_adapter) + async with TestClient(TestServer(app)) as cli: + resp = await cli.get( + "/v1/models", + headers={"Authorization": "Bearer sk-secret"}, + ) + assert resp.status == 200 + + +# --------------------------------------------------------------------------- +# /v1/chat/completions endpoint +# --------------------------------------------------------------------------- + + +class TestChatCompletionsEndpoint: + @pytest.mark.asyncio + async def test_invalid_json_returns_400(self, adapter): + app = _create_app(adapter) + async with TestClient(TestServer(app)) as cli: + resp = await cli.post( + "/v1/chat/completions", + data="not json", + headers={"Content-Type": "application/json"}, + ) + assert resp.status == 400 + data = await resp.json() + assert "Invalid JSON" in data["error"]["message"] + + @pytest.mark.asyncio + async def test_missing_messages_returns_400(self, adapter): + app = _create_app(adapter) + async with TestClient(TestServer(app)) as cli: + resp = await cli.post("/v1/chat/completions", json={"model": "test"}) + assert resp.status == 400 + data = await resp.json() + assert "messages" in data["error"]["message"] + + @pytest.mark.asyncio + async def test_empty_messages_returns_400(self, adapter): + app = _create_app(adapter) + async with TestClient(TestServer(app)) as cli: + resp = await cli.post("/v1/chat/completions", json={"model": "test", "messages": []}) + assert resp.status == 400 + + @pytest.mark.asyncio + async def test_stream_true_returns_501(self, adapter): + app = _create_app(adapter) + async with TestClient(TestServer(app)) as cli: + resp = await cli.post( + "/v1/chat/completions", + json={ + "model": "test", + "messages": [{"role": "user", "content": "hi"}], + "stream": True, + }, + ) + assert resp.status == 501 + + @pytest.mark.asyncio + async def test_no_user_message_returns_400(self, adapter): + app = _create_app(adapter) + async with TestClient(TestServer(app)) as cli: + resp = await cli.post( + "/v1/chat/completions", + json={ + "model": "test", + "messages": [{"role": "system", "content": "You are helpful."}], + }, + ) + assert resp.status == 400 + + @pytest.mark.asyncio + async def test_successful_completion(self, adapter): + """Test a successful chat completion with mocked agent.""" + mock_result = { + "final_response": "Hello! How can I help you today?", + "messages": [], + "api_calls": 1, + } + + app = _create_app(adapter) + async with TestClient(TestServer(app)) as cli: + with patch.object(adapter, "_run_agent", new_callable=AsyncMock) as mock_run: + mock_run.return_value = mock_result + resp = await cli.post( + "/v1/chat/completions", + json={ + "model": "hermes-agent", + "messages": [{"role": "user", "content": "Hello"}], + }, + ) + + assert resp.status == 200 + data = await resp.json() + assert data["object"] == "chat.completion" + assert data["id"].startswith("chatcmpl-") + assert data["model"] == "hermes-agent" + assert len(data["choices"]) == 1 + assert data["choices"][0]["message"]["role"] == "assistant" + assert data["choices"][0]["message"]["content"] == "Hello! How can I help you today?" + assert data["choices"][0]["finish_reason"] == "stop" + assert "usage" in data + + @pytest.mark.asyncio + async def test_system_prompt_extracted(self, adapter): + """System messages from the client are passed as ephemeral_system_prompt.""" + mock_result = { + "final_response": "I am a pirate! Arrr!", + "messages": [], + "api_calls": 1, + } + + app = _create_app(adapter) + async with TestClient(TestServer(app)) as cli: + with patch.object(adapter, "_run_agent", new_callable=AsyncMock) as mock_run: + mock_run.return_value = mock_result + resp = await cli.post( + "/v1/chat/completions", + json={ + "model": "hermes-agent", + "messages": [ + {"role": "system", "content": "You are a pirate."}, + {"role": "user", "content": "Hello"}, + ], + }, + ) + + assert resp.status == 200 + # Check that _run_agent was called with the system prompt + call_kwargs = mock_run.call_args + assert call_kwargs.kwargs.get("ephemeral_system_prompt") == "You are a pirate." + assert call_kwargs.kwargs.get("user_message") == "Hello" + + @pytest.mark.asyncio + async def test_conversation_history_passed(self, adapter): + """Previous user/assistant messages become conversation_history.""" + mock_result = {"final_response": "3", "messages": [], "api_calls": 1} + + app = _create_app(adapter) + async with TestClient(TestServer(app)) as cli: + with patch.object(adapter, "_run_agent", new_callable=AsyncMock) as mock_run: + mock_run.return_value = mock_result + resp = await cli.post( + "/v1/chat/completions", + json={ + "model": "hermes-agent", + "messages": [ + {"role": "user", "content": "1+1=?"}, + {"role": "assistant", "content": "2"}, + {"role": "user", "content": "Now add 1 more"}, + ], + }, + ) + + assert resp.status == 200 + call_kwargs = mock_run.call_args.kwargs + assert call_kwargs["user_message"] == "Now add 1 more" + assert len(call_kwargs["conversation_history"]) == 2 + assert call_kwargs["conversation_history"][0] == {"role": "user", "content": "1+1=?"} + assert call_kwargs["conversation_history"][1] == {"role": "assistant", "content": "2"} + + @pytest.mark.asyncio + async def test_agent_error_returns_500(self, adapter): + """Agent exception returns 500.""" + app = _create_app(adapter) + async with TestClient(TestServer(app)) as cli: + with patch.object(adapter, "_run_agent", new_callable=AsyncMock) as mock_run: + mock_run.side_effect = RuntimeError("Provider failed") + resp = await cli.post( + "/v1/chat/completions", + json={ + "model": "hermes-agent", + "messages": [{"role": "user", "content": "Hello"}], + }, + ) + + assert resp.status == 500 + data = await resp.json() + assert "Provider failed" in data["error"]["message"] + + +# --------------------------------------------------------------------------- +# /v1/responses endpoint +# --------------------------------------------------------------------------- + + +class TestResponsesEndpoint: + @pytest.mark.asyncio + async def test_missing_input_returns_400(self, adapter): + app = _create_app(adapter) + async with TestClient(TestServer(app)) as cli: + resp = await cli.post("/v1/responses", json={"model": "test"}) + assert resp.status == 400 + data = await resp.json() + assert "input" in data["error"]["message"] + + @pytest.mark.asyncio + async def test_invalid_json_returns_400(self, adapter): + app = _create_app(adapter) + async with TestClient(TestServer(app)) as cli: + resp = await cli.post( + "/v1/responses", + data="not json", + headers={"Content-Type": "application/json"}, + ) + assert resp.status == 400 + + @pytest.mark.asyncio + async def test_successful_response_with_string_input(self, adapter): + """String input is wrapped in a user message.""" + mock_result = { + "final_response": "Paris is the capital of France.", + "messages": [], + "api_calls": 1, + } + + app = _create_app(adapter) + async with TestClient(TestServer(app)) as cli: + with patch.object(adapter, "_run_agent", new_callable=AsyncMock) as mock_run: + mock_run.return_value = mock_result + resp = await cli.post( + "/v1/responses", + json={ + "model": "hermes-agent", + "input": "What is the capital of France?", + }, + ) + + assert resp.status == 200 + data = await resp.json() + assert data["object"] == "response" + assert data["id"].startswith("resp_") + assert data["status"] == "completed" + assert len(data["output"]) == 1 + assert data["output"][0]["type"] == "message" + assert data["output"][0]["content"][0]["type"] == "output_text" + assert data["output"][0]["content"][0]["text"] == "Paris is the capital of France." + + @pytest.mark.asyncio + async def test_successful_response_with_array_input(self, adapter): + """Array input with role/content objects.""" + mock_result = {"final_response": "Done", "messages": [], "api_calls": 1} + + app = _create_app(adapter) + async with TestClient(TestServer(app)) as cli: + with patch.object(adapter, "_run_agent", new_callable=AsyncMock) as mock_run: + mock_run.return_value = mock_result + resp = await cli.post( + "/v1/responses", + json={ + "model": "hermes-agent", + "input": [ + {"role": "user", "content": "Hello"}, + {"role": "user", "content": "What is 2+2?"}, + ], + }, + ) + + assert resp.status == 200 + call_kwargs = mock_run.call_args.kwargs + # Last message is user_message, rest are history + assert call_kwargs["user_message"] == "What is 2+2?" + assert len(call_kwargs["conversation_history"]) == 1 + + @pytest.mark.asyncio + async def test_instructions_as_ephemeral_prompt(self, adapter): + """The instructions field maps to ephemeral_system_prompt.""" + mock_result = {"final_response": "Ahoy!", "messages": [], "api_calls": 1} + + app = _create_app(adapter) + async with TestClient(TestServer(app)) as cli: + with patch.object(adapter, "_run_agent", new_callable=AsyncMock) as mock_run: + mock_run.return_value = mock_result + resp = await cli.post( + "/v1/responses", + json={ + "model": "hermes-agent", + "input": "Hello", + "instructions": "Talk like a pirate.", + }, + ) + + assert resp.status == 200 + call_kwargs = mock_run.call_args.kwargs + assert call_kwargs["ephemeral_system_prompt"] == "Talk like a pirate." + + @pytest.mark.asyncio + async def test_previous_response_id_chaining(self, adapter): + """Test that responses can be chained via previous_response_id.""" + mock_result_1 = { + "final_response": "2", + "messages": [{"role": "assistant", "content": "2"}], + "api_calls": 1, + } + + app = _create_app(adapter) + async with TestClient(TestServer(app)) as cli: + # First request + with patch.object(adapter, "_run_agent", new_callable=AsyncMock) as mock_run: + mock_run.return_value = mock_result_1 + resp1 = await cli.post( + "/v1/responses", + json={"model": "hermes-agent", "input": "What is 1+1?"}, + ) + + assert resp1.status == 200 + data1 = await resp1.json() + response_id = data1["id"] + + # Second request chaining from the first + mock_result_2 = { + "final_response": "3", + "messages": [{"role": "assistant", "content": "3"}], + "api_calls": 1, + } + + with patch.object(adapter, "_run_agent", new_callable=AsyncMock) as mock_run: + mock_run.return_value = mock_result_2 + resp2 = await cli.post( + "/v1/responses", + json={ + "model": "hermes-agent", + "input": "Now add 1 more", + "previous_response_id": response_id, + }, + ) + + assert resp2.status == 200 + # The conversation_history should contain the full history from the first response + call_kwargs = mock_run.call_args.kwargs + assert len(call_kwargs["conversation_history"]) > 0 + assert call_kwargs["user_message"] == "Now add 1 more" + + @pytest.mark.asyncio + async def test_invalid_previous_response_id_returns_404(self, adapter): + app = _create_app(adapter) + async with TestClient(TestServer(app)) as cli: + resp = await cli.post( + "/v1/responses", + json={ + "model": "hermes-agent", + "input": "follow up", + "previous_response_id": "resp_nonexistent", + }, + ) + assert resp.status == 404 + + @pytest.mark.asyncio + async def test_store_false_does_not_store(self, adapter): + """When store=false, the response is NOT stored.""" + mock_result = {"final_response": "OK", "messages": [], "api_calls": 1} + + app = _create_app(adapter) + async with TestClient(TestServer(app)) as cli: + with patch.object(adapter, "_run_agent", new_callable=AsyncMock) as mock_run: + mock_run.return_value = mock_result + resp = await cli.post( + "/v1/responses", + json={ + "model": "hermes-agent", + "input": "Hello", + "store": False, + }, + ) + + assert resp.status == 200 + data = await resp.json() + # The response has an ID but it shouldn't be retrievable + assert adapter._response_store.get(data["id"]) is None + + @pytest.mark.asyncio + async def test_instructions_inherited_from_previous(self, adapter): + """If no instructions provided, carry forward from previous response.""" + mock_result = {"final_response": "Ahoy!", "messages": [], "api_calls": 1} + + app = _create_app(adapter) + async with TestClient(TestServer(app)) as cli: + # First request with instructions + with patch.object(adapter, "_run_agent", new_callable=AsyncMock) as mock_run: + mock_run.return_value = mock_result + resp1 = await cli.post( + "/v1/responses", + json={ + "model": "hermes-agent", + "input": "Hello", + "instructions": "Be a pirate", + }, + ) + + data1 = await resp1.json() + resp_id = data1["id"] + + # Second request without instructions + with patch.object(adapter, "_run_agent", new_callable=AsyncMock) as mock_run: + mock_run.return_value = mock_result + resp2 = await cli.post( + "/v1/responses", + json={ + "model": "hermes-agent", + "input": "Tell me more", + "previous_response_id": resp_id, + }, + ) + + assert resp2.status == 200 + call_kwargs = mock_run.call_args.kwargs + assert call_kwargs["ephemeral_system_prompt"] == "Be a pirate" + + @pytest.mark.asyncio + async def test_agent_error_returns_500(self, adapter): + app = _create_app(adapter) + async with TestClient(TestServer(app)) as cli: + with patch.object(adapter, "_run_agent", new_callable=AsyncMock) as mock_run: + mock_run.side_effect = RuntimeError("Boom") + resp = await cli.post( + "/v1/responses", + json={"model": "hermes-agent", "input": "Hello"}, + ) + + assert resp.status == 500 + + @pytest.mark.asyncio + async def test_invalid_input_type_returns_400(self, adapter): + app = _create_app(adapter) + async with TestClient(TestServer(app)) as cli: + resp = await cli.post( + "/v1/responses", + json={"model": "hermes-agent", "input": 42}, + ) + assert resp.status == 400 + + +# --------------------------------------------------------------------------- +# Auth on endpoints +# --------------------------------------------------------------------------- + + +class TestEndpointAuth: + @pytest.mark.asyncio + async def test_chat_completions_requires_auth(self, auth_adapter): + app = _create_app(auth_adapter) + async with TestClient(TestServer(app)) as cli: + resp = await cli.post( + "/v1/chat/completions", + json={"model": "test", "messages": [{"role": "user", "content": "hi"}]}, + ) + assert resp.status == 401 + + @pytest.mark.asyncio + async def test_responses_requires_auth(self, auth_adapter): + app = _create_app(auth_adapter) + async with TestClient(TestServer(app)) as cli: + resp = await cli.post( + "/v1/responses", + json={"model": "test", "input": "hi"}, + ) + assert resp.status == 401 + + @pytest.mark.asyncio + async def test_models_requires_auth(self, auth_adapter): + app = _create_app(auth_adapter) + async with TestClient(TestServer(app)) as cli: + resp = await cli.get("/v1/models") + assert resp.status == 401 + + @pytest.mark.asyncio + async def test_health_does_not_require_auth(self, auth_adapter): + app = _create_app(auth_adapter) + async with TestClient(TestServer(app)) as cli: + resp = await cli.get("/health") + assert resp.status == 200 + + +# --------------------------------------------------------------------------- +# Config integration +# --------------------------------------------------------------------------- + + +class TestConfigIntegration: + def test_platform_enum_has_api_server(self): + assert Platform.API_SERVER.value == "api_server" + + def test_env_override_enables_api_server(self, monkeypatch): + monkeypatch.setenv("API_SERVER_ENABLED", "true") + from gateway.config import load_gateway_config + config = load_gateway_config() + assert Platform.API_SERVER in config.platforms + assert config.platforms[Platform.API_SERVER].enabled is True + + def test_env_override_with_key(self, monkeypatch): + monkeypatch.setenv("API_SERVER_KEY", "sk-mykey") + from gateway.config import load_gateway_config + config = load_gateway_config() + assert Platform.API_SERVER in config.platforms + assert config.platforms[Platform.API_SERVER].extra.get("key") == "sk-mykey" + + def test_env_override_port_and_host(self, monkeypatch): + monkeypatch.setenv("API_SERVER_ENABLED", "true") + monkeypatch.setenv("API_SERVER_PORT", "9999") + monkeypatch.setenv("API_SERVER_HOST", "0.0.0.0") + from gateway.config import load_gateway_config + config = load_gateway_config() + assert config.platforms[Platform.API_SERVER].extra.get("port") == 9999 + assert config.platforms[Platform.API_SERVER].extra.get("host") == "0.0.0.0" + + def test_api_server_in_connected_platforms(self): + config = GatewayConfig() + config.platforms[Platform.API_SERVER] = PlatformConfig(enabled=True) + connected = config.get_connected_platforms() + assert Platform.API_SERVER in connected + + def test_api_server_not_in_connected_when_disabled(self): + config = GatewayConfig() + config.platforms[Platform.API_SERVER] = PlatformConfig(enabled=False) + connected = config.get_connected_platforms() + assert Platform.API_SERVER not in connected + + +# --------------------------------------------------------------------------- +# Multiple system messages +# --------------------------------------------------------------------------- + + +class TestMultipleSystemMessages: + @pytest.mark.asyncio + async def test_multiple_system_messages_concatenated(self, adapter): + mock_result = {"final_response": "OK", "messages": [], "api_calls": 1} + + app = _create_app(adapter) + async with TestClient(TestServer(app)) as cli: + with patch.object(adapter, "_run_agent", new_callable=AsyncMock) as mock_run: + mock_run.return_value = mock_result + resp = await cli.post( + "/v1/chat/completions", + json={ + "model": "hermes-agent", + "messages": [ + {"role": "system", "content": "You are helpful."}, + {"role": "system", "content": "Be concise."}, + {"role": "user", "content": "Hello"}, + ], + }, + ) + + assert resp.status == 200 + call_kwargs = mock_run.call_args.kwargs + prompt = call_kwargs["ephemeral_system_prompt"] + assert "You are helpful." in prompt + assert "Be concise." in prompt + + +# --------------------------------------------------------------------------- +# send() method (not used but required by base) +# --------------------------------------------------------------------------- + + +class TestSendMethod: + @pytest.mark.asyncio + async def test_send_returns_not_supported(self): + config = PlatformConfig(enabled=True) + adapter = APIServerAdapter(config) + result = await adapter.send("chat1", "hello") + assert result.success is False + assert "HTTP request/response" in result.error