diff --git a/gateway/platforms/api_server.py b/gateway/platforms/api_server.py index 01339608d4..68bbb4b044 100644 --- a/gateway/platforms/api_server.py +++ b/gateway/platforms/api_server.py @@ -45,6 +45,7 @@ logger = logging.getLogger(__name__) DEFAULT_HOST = "127.0.0.1" DEFAULT_PORT = 8642 MAX_STORED_RESPONSES = 100 +MAX_REQUEST_BYTES = 1_000_000 # 1 MB default limit for POST bodies def check_api_server_requirements() -> bool: @@ -194,6 +195,73 @@ else: cors_middleware = None # type: ignore[assignment] +def _openai_error(message: str, err_type: str = "invalid_request_error", param: str = None, code: str = None) -> Dict[str, Any]: + """OpenAI-style error envelope.""" + return { + "error": { + "message": message, + "type": err_type, + "param": param, + "code": code, + } + } + + +if AIOHTTP_AVAILABLE: + @web.middleware + async def body_limit_middleware(request, handler): + """Reject overly large request bodies early based on Content-Length.""" + if request.method in ("POST", "PUT", "PATCH"): + cl = request.headers.get("Content-Length") + if cl is not None: + try: + if int(cl) > MAX_REQUEST_BYTES: + return web.json_response(_openai_error("Request body too large.", code="body_too_large"), status=413) + except ValueError: + return web.json_response(_openai_error("Invalid Content-Length header.", code="invalid_content_length"), status=400) + return await handler(request) +else: + body_limit_middleware = None # type: ignore[assignment] + + +class _IdempotencyCache: + """In-memory idempotency cache with TTL and basic LRU semantics.""" + def __init__(self, max_items: int = 1000, ttl_seconds: int = 300): + from collections import OrderedDict + self._store = OrderedDict() + self._ttl = ttl_seconds + self._max = max_items + + def _purge(self): + import time as _t + now = _t.time() + expired = [k for k, v in self._store.items() if now - v["ts"] > self._ttl] + for k in expired: + self._store.pop(k, None) + while len(self._store) > self._max: + self._store.popitem(last=False) + + async def get_or_set(self, key: str, fingerprint: str, compute_coro): + self._purge() + item = self._store.get(key) + if item and item["fp"] == fingerprint: + return item["resp"] + resp = await compute_coro() + import time as _t + self._store[key] = {"resp": resp, "fp": fingerprint, "ts": _t.time()} + self._purge() + return resp + + +_idem_cache = _IdempotencyCache() + + +def _make_request_fingerprint(body: Dict[str, Any], keys: List[str]) -> str: + from hashlib import sha256 + subset = {k: body.get(k) for k in keys} + return sha256(repr(subset).encode("utf-8")).hexdigest() + + class APIServerAdapter(BasePlatformAdapter): """ OpenAI-compatible HTTP API server adapter. @@ -360,10 +428,7 @@ class APIServerAdapter(BasePlatformAdapter): try: body = await request.json() except (json.JSONDecodeError, Exception): - return web.json_response( - {"error": {"message": "Invalid JSON in request body", "type": "invalid_request_error"}}, - status=400, - ) + return web.json_response(_openai_error("Invalid JSON in request body"), status=400) messages = body.get("messages") if not messages or not isinstance(messages, list): @@ -428,20 +493,35 @@ class APIServerAdapter(BasePlatformAdapter): request, completion_id, model_name, created, _stream_q, agent_task ) - # Non-streaming: run the agent and return full response - try: - result, usage = await self._run_agent( + # Non-streaming: run the agent (with optional Idempotency-Key) + async def _compute_completion(): + return await self._run_agent( user_message=user_message, conversation_history=history, ephemeral_system_prompt=system_prompt, session_id=session_id, ) - except Exception as e: - logger.error("Error running agent for chat completions: %s", e, exc_info=True) - return web.json_response( - {"error": {"message": f"Internal server error: {e}", "type": "server_error"}}, - status=500, - ) + + idempotency_key = request.headers.get("Idempotency-Key") + if idempotency_key: + fp = _make_request_fingerprint(body, keys=["model", "messages", "tools", "tool_choice", "stream"]) + try: + result, usage = await _idem_cache.get_or_set(idempotency_key, fp, _compute_completion) + except Exception as e: + logger.error("Error running agent for chat completions: %s", e, exc_info=True) + return web.json_response( + _openai_error(f"Internal server error: {e}", err_type="server_error"), + status=500, + ) + else: + try: + result, usage = await _compute_completion() + except Exception as e: + logger.error("Error running agent for chat completions: %s", e, exc_info=True) + return web.json_response( + _openai_error(f"Internal server error: {e}", err_type="server_error"), + status=500, + ) final_response = result.get("final_response", "") if not final_response: @@ -567,10 +647,7 @@ class APIServerAdapter(BasePlatformAdapter): raw_input = body.get("input") if raw_input is None: - return web.json_response( - {"error": {"message": "Missing 'input' field", "type": "invalid_request_error"}}, - status=400, - ) + return web.json_response(_openai_error("Missing 'input' field"), status=400) instructions = body.get("instructions") previous_response_id = body.get("previous_response_id") @@ -579,10 +656,7 @@ class APIServerAdapter(BasePlatformAdapter): # conversation and previous_response_id are mutually exclusive if conversation and previous_response_id: - return web.json_response( - {"error": {"message": "Cannot use both 'conversation' and 'previous_response_id'", "type": "invalid_request_error"}}, - status=400, - ) + return web.json_response(_openai_error("Cannot use both 'conversation' and 'previous_response_id'"), status=400) # Resolve conversation name to latest response_id if conversation: @@ -613,20 +687,14 @@ class APIServerAdapter(BasePlatformAdapter): content = "\n".join(text_parts) input_messages.append({"role": role, "content": content}) else: - return web.json_response( - {"error": {"message": "'input' must be a string or array", "type": "invalid_request_error"}}, - status=400, - ) + return web.json_response(_openai_error("'input' must be a string or array"), status=400) # Reconstruct conversation history from previous_response_id conversation_history: List[Dict[str, str]] = [] if previous_response_id: stored = self._response_store.get(previous_response_id) if stored is None: - return web.json_response( - {"error": {"message": f"Previous response not found: {previous_response_id}", "type": "invalid_request_error"}}, - status=404, - ) + return web.json_response(_openai_error(f"Previous response not found: {previous_response_id}"), status=404) conversation_history = list(stored.get("conversation_history", [])) # If no instructions provided, carry forward from previous if instructions is None: @@ -639,30 +707,46 @@ class APIServerAdapter(BasePlatformAdapter): # Last input message is the user_message user_message = input_messages[-1].get("content", "") if input_messages else "" if not user_message: - return web.json_response( - {"error": {"message": "No user message found in input", "type": "invalid_request_error"}}, - status=400, - ) + return web.json_response(_openai_error("No user message found in input"), status=400) # Truncation support if body.get("truncation") == "auto" and len(conversation_history) > 100: conversation_history = conversation_history[-100:] - # Run the agent + # Run the agent (with Idempotency-Key support) session_id = str(uuid.uuid4()) - try: - result, usage = await self._run_agent( + + async def _compute_response(): + return await self._run_agent( user_message=user_message, conversation_history=conversation_history, ephemeral_system_prompt=instructions, session_id=session_id, ) - except Exception as e: - logger.error("Error running agent for responses: %s", e, exc_info=True) - return web.json_response( - {"error": {"message": f"Internal server error: {e}", "type": "server_error"}}, - status=500, + + idempotency_key = request.headers.get("Idempotency-Key") + if idempotency_key: + fp = _make_request_fingerprint( + body, + keys=["input", "instructions", "previous_response_id", "conversation", "model", "tools"], ) + try: + result, usage = await _idem_cache.get_or_set(idempotency_key, fp, _compute_response) + except Exception as e: + logger.error("Error running agent for responses: %s", e, exc_info=True) + return web.json_response( + _openai_error(f"Internal server error: {e}", err_type="server_error"), + status=500, + ) + else: + try: + result, usage = await _compute_response() + except Exception as e: + logger.error("Error running agent for responses: %s", e, exc_info=True) + return web.json_response( + _openai_error(f"Internal server error: {e}", err_type="server_error"), + status=500, + ) final_response = result.get("final_response", "") if not final_response: @@ -726,10 +810,7 @@ class APIServerAdapter(BasePlatformAdapter): response_id = request.match_info["response_id"] stored = self._response_store.get(response_id) if stored is None: - return web.json_response( - {"error": {"message": f"Response not found: {response_id}", "type": "invalid_request_error"}}, - status=404, - ) + return web.json_response(_openai_error(f"Response not found: {response_id}"), status=404) return web.json_response(stored["response"]) @@ -742,10 +823,7 @@ class APIServerAdapter(BasePlatformAdapter): response_id = request.match_info["response_id"] deleted = self._response_store.delete(response_id) if not deleted: - return web.json_response( - {"error": {"message": f"Response not found: {response_id}", "type": "invalid_request_error"}}, - status=404, - ) + return web.json_response(_openai_error(f"Response not found: {response_id}"), status=404) return web.json_response({ "id": response_id, @@ -1090,7 +1168,8 @@ class APIServerAdapter(BasePlatformAdapter): return False try: - self._app = web.Application(middlewares=[cors_middleware]) + mws = [mw for mw in (cors_middleware, body_limit_middleware) if mw is not None] + self._app = web.Application(middlewares=mws) self._app["api_server_adapter"] = self self._app.router.add_get("/health", self._handle_health) self._app.router.add_get("/v1/models", self._handle_models)