feat(api-server): Idempotency-Key support, body size limit, OpenAI error envelope (#2903)

* feat(api-server): add Idempotency-Key support and request size limit; unify OpenAI error envelope

* fix(api-server): include provider error message in 500 OpenAI error body

---------

Co-authored-by: aydnOktay <xaydinoktay@gmail.com>
This commit is contained in:
Teknium 2026-03-24 19:31:08 -07:00 committed by GitHub
parent 1b24a226ea
commit 80cc27eb9d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -45,6 +45,7 @@ logger = logging.getLogger(__name__)
DEFAULT_HOST = "127.0.0.1"
DEFAULT_PORT = 8642
MAX_STORED_RESPONSES = 100
MAX_REQUEST_BYTES = 1_000_000 # 1 MB default limit for POST bodies
def check_api_server_requirements() -> bool:
@ -194,6 +195,73 @@ else:
cors_middleware = None # type: ignore[assignment]
def _openai_error(message: str, err_type: str = "invalid_request_error", param: str = None, code: str = None) -> Dict[str, Any]:
"""OpenAI-style error envelope."""
return {
"error": {
"message": message,
"type": err_type,
"param": param,
"code": code,
}
}
if AIOHTTP_AVAILABLE:
@web.middleware
async def body_limit_middleware(request, handler):
"""Reject overly large request bodies early based on Content-Length."""
if request.method in ("POST", "PUT", "PATCH"):
cl = request.headers.get("Content-Length")
if cl is not None:
try:
if int(cl) > MAX_REQUEST_BYTES:
return web.json_response(_openai_error("Request body too large.", code="body_too_large"), status=413)
except ValueError:
return web.json_response(_openai_error("Invalid Content-Length header.", code="invalid_content_length"), status=400)
return await handler(request)
else:
body_limit_middleware = None # type: ignore[assignment]
class _IdempotencyCache:
"""In-memory idempotency cache with TTL and basic LRU semantics."""
def __init__(self, max_items: int = 1000, ttl_seconds: int = 300):
from collections import OrderedDict
self._store = OrderedDict()
self._ttl = ttl_seconds
self._max = max_items
def _purge(self):
import time as _t
now = _t.time()
expired = [k for k, v in self._store.items() if now - v["ts"] > self._ttl]
for k in expired:
self._store.pop(k, None)
while len(self._store) > self._max:
self._store.popitem(last=False)
async def get_or_set(self, key: str, fingerprint: str, compute_coro):
self._purge()
item = self._store.get(key)
if item and item["fp"] == fingerprint:
return item["resp"]
resp = await compute_coro()
import time as _t
self._store[key] = {"resp": resp, "fp": fingerprint, "ts": _t.time()}
self._purge()
return resp
_idem_cache = _IdempotencyCache()
def _make_request_fingerprint(body: Dict[str, Any], keys: List[str]) -> str:
from hashlib import sha256
subset = {k: body.get(k) for k in keys}
return sha256(repr(subset).encode("utf-8")).hexdigest()
class APIServerAdapter(BasePlatformAdapter):
"""
OpenAI-compatible HTTP API server adapter.
@ -360,10 +428,7 @@ class APIServerAdapter(BasePlatformAdapter):
try:
body = await request.json()
except (json.JSONDecodeError, Exception):
return web.json_response(
{"error": {"message": "Invalid JSON in request body", "type": "invalid_request_error"}},
status=400,
)
return web.json_response(_openai_error("Invalid JSON in request body"), status=400)
messages = body.get("messages")
if not messages or not isinstance(messages, list):
@ -428,20 +493,35 @@ class APIServerAdapter(BasePlatformAdapter):
request, completion_id, model_name, created, _stream_q, agent_task
)
# Non-streaming: run the agent and return full response
try:
result, usage = await self._run_agent(
# Non-streaming: run the agent (with optional Idempotency-Key)
async def _compute_completion():
return await self._run_agent(
user_message=user_message,
conversation_history=history,
ephemeral_system_prompt=system_prompt,
session_id=session_id,
)
except Exception as e:
logger.error("Error running agent for chat completions: %s", e, exc_info=True)
return web.json_response(
{"error": {"message": f"Internal server error: {e}", "type": "server_error"}},
status=500,
)
idempotency_key = request.headers.get("Idempotency-Key")
if idempotency_key:
fp = _make_request_fingerprint(body, keys=["model", "messages", "tools", "tool_choice", "stream"])
try:
result, usage = await _idem_cache.get_or_set(idempotency_key, fp, _compute_completion)
except Exception as e:
logger.error("Error running agent for chat completions: %s", e, exc_info=True)
return web.json_response(
_openai_error(f"Internal server error: {e}", err_type="server_error"),
status=500,
)
else:
try:
result, usage = await _compute_completion()
except Exception as e:
logger.error("Error running agent for chat completions: %s", e, exc_info=True)
return web.json_response(
_openai_error(f"Internal server error: {e}", err_type="server_error"),
status=500,
)
final_response = result.get("final_response", "")
if not final_response:
@ -567,10 +647,7 @@ class APIServerAdapter(BasePlatformAdapter):
raw_input = body.get("input")
if raw_input is None:
return web.json_response(
{"error": {"message": "Missing 'input' field", "type": "invalid_request_error"}},
status=400,
)
return web.json_response(_openai_error("Missing 'input' field"), status=400)
instructions = body.get("instructions")
previous_response_id = body.get("previous_response_id")
@ -579,10 +656,7 @@ class APIServerAdapter(BasePlatformAdapter):
# conversation and previous_response_id are mutually exclusive
if conversation and previous_response_id:
return web.json_response(
{"error": {"message": "Cannot use both 'conversation' and 'previous_response_id'", "type": "invalid_request_error"}},
status=400,
)
return web.json_response(_openai_error("Cannot use both 'conversation' and 'previous_response_id'"), status=400)
# Resolve conversation name to latest response_id
if conversation:
@ -613,20 +687,14 @@ class APIServerAdapter(BasePlatformAdapter):
content = "\n".join(text_parts)
input_messages.append({"role": role, "content": content})
else:
return web.json_response(
{"error": {"message": "'input' must be a string or array", "type": "invalid_request_error"}},
status=400,
)
return web.json_response(_openai_error("'input' must be a string or array"), status=400)
# Reconstruct conversation history from previous_response_id
conversation_history: List[Dict[str, str]] = []
if previous_response_id:
stored = self._response_store.get(previous_response_id)
if stored is None:
return web.json_response(
{"error": {"message": f"Previous response not found: {previous_response_id}", "type": "invalid_request_error"}},
status=404,
)
return web.json_response(_openai_error(f"Previous response not found: {previous_response_id}"), status=404)
conversation_history = list(stored.get("conversation_history", []))
# If no instructions provided, carry forward from previous
if instructions is None:
@ -639,30 +707,46 @@ class APIServerAdapter(BasePlatformAdapter):
# Last input message is the user_message
user_message = input_messages[-1].get("content", "") if input_messages else ""
if not user_message:
return web.json_response(
{"error": {"message": "No user message found in input", "type": "invalid_request_error"}},
status=400,
)
return web.json_response(_openai_error("No user message found in input"), status=400)
# Truncation support
if body.get("truncation") == "auto" and len(conversation_history) > 100:
conversation_history = conversation_history[-100:]
# Run the agent
# Run the agent (with Idempotency-Key support)
session_id = str(uuid.uuid4())
try:
result, usage = await self._run_agent(
async def _compute_response():
return await self._run_agent(
user_message=user_message,
conversation_history=conversation_history,
ephemeral_system_prompt=instructions,
session_id=session_id,
)
except Exception as e:
logger.error("Error running agent for responses: %s", e, exc_info=True)
return web.json_response(
{"error": {"message": f"Internal server error: {e}", "type": "server_error"}},
status=500,
idempotency_key = request.headers.get("Idempotency-Key")
if idempotency_key:
fp = _make_request_fingerprint(
body,
keys=["input", "instructions", "previous_response_id", "conversation", "model", "tools"],
)
try:
result, usage = await _idem_cache.get_or_set(idempotency_key, fp, _compute_response)
except Exception as e:
logger.error("Error running agent for responses: %s", e, exc_info=True)
return web.json_response(
_openai_error(f"Internal server error: {e}", err_type="server_error"),
status=500,
)
else:
try:
result, usage = await _compute_response()
except Exception as e:
logger.error("Error running agent for responses: %s", e, exc_info=True)
return web.json_response(
_openai_error(f"Internal server error: {e}", err_type="server_error"),
status=500,
)
final_response = result.get("final_response", "")
if not final_response:
@ -726,10 +810,7 @@ class APIServerAdapter(BasePlatformAdapter):
response_id = request.match_info["response_id"]
stored = self._response_store.get(response_id)
if stored is None:
return web.json_response(
{"error": {"message": f"Response not found: {response_id}", "type": "invalid_request_error"}},
status=404,
)
return web.json_response(_openai_error(f"Response not found: {response_id}"), status=404)
return web.json_response(stored["response"])
@ -742,10 +823,7 @@ class APIServerAdapter(BasePlatformAdapter):
response_id = request.match_info["response_id"]
deleted = self._response_store.delete(response_id)
if not deleted:
return web.json_response(
{"error": {"message": f"Response not found: {response_id}", "type": "invalid_request_error"}},
status=404,
)
return web.json_response(_openai_error(f"Response not found: {response_id}"), status=404)
return web.json_response({
"id": response_id,
@ -1090,7 +1168,8 @@ class APIServerAdapter(BasePlatformAdapter):
return False
try:
self._app = web.Application(middlewares=[cors_middleware])
mws = [mw for mw in (cors_middleware, body_limit_middleware) if mw is not None]
self._app = web.Application(middlewares=mws)
self._app["api_server_adapter"] = self
self._app.router.add_get("/health", self._handle_health)
self._app.router.add_get("/v1/models", self._handle_models)