mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-25 00:51:20 +00:00
Widen return type annotations to match actual control flow, add unreachable assertions after retry loops ty cannot prove terminate, split ambiguous union returns (auth.py credential pool), and remove the AIOHTTP_AVAILABLE conditional-import guard from api_server.py.
2586 lines
109 KiB
Python
2586 lines
109 KiB
Python
"""
|
|
OpenAI-compatible API server platform adapter.
|
|
|
|
Exposes an HTTP server with endpoints:
|
|
- POST /v1/chat/completions — OpenAI Chat Completions format (stateless; opt-in session continuity via X-Hermes-Session-Id header)
|
|
- POST /v1/responses — OpenAI Responses API format (stateful via previous_response_id)
|
|
- GET /v1/responses/{response_id} — Retrieve a stored response
|
|
- DELETE /v1/responses/{response_id} — Delete a stored response
|
|
- GET /v1/models — lists hermes-agent as an available model
|
|
- POST /v1/runs — start a run, returns run_id immediately (202)
|
|
- GET /v1/runs/{run_id}/events — SSE stream of structured lifecycle events
|
|
- GET /health — health check
|
|
- GET /health/detailed — rich status for cross-container dashboard probing
|
|
|
|
Any OpenAI-compatible frontend (Open WebUI, LobeChat, LibreChat,
|
|
AnythingLLM, NextChat, ChatBox, etc.) can connect to hermes-agent
|
|
through this adapter by pointing at http://localhost:8642/v1.
|
|
|
|
Requires:
|
|
- aiohttp (already available in the gateway)
|
|
"""
|
|
|
|
import asyncio
|
|
import hashlib
|
|
import hmac
|
|
import json
|
|
import logging
|
|
import os
|
|
import socket as _socket
|
|
import re
|
|
import sqlite3
|
|
import time
|
|
import uuid
|
|
from typing import Any, Dict, List, Optional
|
|
from aiohttp import web
|
|
from gateway.config import Platform, PlatformConfig
|
|
from gateway.platforms.base import (
|
|
BasePlatformAdapter,
|
|
SendResult,
|
|
is_network_accessible,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Default settings
|
|
DEFAULT_HOST = "127.0.0.1"
|
|
DEFAULT_PORT = 8642
|
|
MAX_STORED_RESPONSES = 100
|
|
MAX_REQUEST_BYTES = 1_000_000 # 1 MB default limit for POST bodies
|
|
CHAT_COMPLETIONS_SSE_KEEPALIVE_SECONDS = 30.0
|
|
MAX_NORMALIZED_TEXT_LENGTH = 65_536 # 64 KB cap for normalized content parts
|
|
MAX_CONTENT_LIST_SIZE = 1_000 # Max items when content is an array
|
|
|
|
|
|
def _normalize_chat_content(
|
|
content: Any, *, _max_depth: int = 10, _depth: int = 0,
|
|
) -> str:
|
|
"""Normalize OpenAI chat message content into a plain text string.
|
|
|
|
Some clients (Open WebUI, LobeChat, etc.) send content as an array of
|
|
typed parts instead of a plain string::
|
|
|
|
[{"type": "text", "text": "hello"}, {"type": "input_text", "text": "..."}]
|
|
|
|
This function flattens those into a single string so the agent pipeline
|
|
(which expects strings) doesn't choke.
|
|
|
|
Defensive limits prevent abuse: recursion depth, list size, and output
|
|
length are all bounded.
|
|
"""
|
|
if _depth > _max_depth:
|
|
return ""
|
|
if content is None:
|
|
return ""
|
|
if isinstance(content, str):
|
|
return content[:MAX_NORMALIZED_TEXT_LENGTH] if len(content) > MAX_NORMALIZED_TEXT_LENGTH else content
|
|
|
|
if isinstance(content, list):
|
|
parts: List[str] = []
|
|
items = content[:MAX_CONTENT_LIST_SIZE] if len(content) > MAX_CONTENT_LIST_SIZE else content
|
|
for item in items:
|
|
if isinstance(item, str):
|
|
if item:
|
|
parts.append(item[:MAX_NORMALIZED_TEXT_LENGTH])
|
|
elif isinstance(item, dict):
|
|
item_type = str(item.get("type") or "").strip().lower()
|
|
if item_type in {"text", "input_text", "output_text"}:
|
|
text = item.get("text", "")
|
|
if text:
|
|
try:
|
|
parts.append(str(text)[:MAX_NORMALIZED_TEXT_LENGTH])
|
|
except Exception:
|
|
pass
|
|
# Silently skip image_url / other non-text parts
|
|
elif isinstance(item, list):
|
|
nested = _normalize_chat_content(item, _max_depth=_max_depth, _depth=_depth + 1)
|
|
if nested:
|
|
parts.append(nested)
|
|
# Check accumulated size
|
|
if sum(len(p) for p in parts) >= MAX_NORMALIZED_TEXT_LENGTH:
|
|
break
|
|
result = "\n".join(parts)
|
|
return result[:MAX_NORMALIZED_TEXT_LENGTH] if len(result) > MAX_NORMALIZED_TEXT_LENGTH else result
|
|
|
|
# Fallback for unexpected types (int, float, bool, etc.)
|
|
try:
|
|
result = str(content)
|
|
return result[:MAX_NORMALIZED_TEXT_LENGTH] if len(result) > MAX_NORMALIZED_TEXT_LENGTH else result
|
|
except Exception:
|
|
return ""
|
|
|
|
|
|
# Content part type aliases used by the OpenAI Chat Completions and Responses
|
|
# APIs. We accept both spellings on input and emit a single canonical internal
|
|
# shape (``{"type": "text", ...}`` / ``{"type": "image_url", ...}``) that the
|
|
# rest of the agent pipeline already understands.
|
|
_TEXT_PART_TYPES = frozenset({"text", "input_text", "output_text"})
|
|
_IMAGE_PART_TYPES = frozenset({"image_url", "input_image"})
|
|
_FILE_PART_TYPES = frozenset({"file", "input_file"})
|
|
|
|
|
|
def _normalize_multimodal_content(content: Any) -> Any:
|
|
"""Validate and normalize multimodal content for the API server.
|
|
|
|
Returns a plain string when the content is text-only, or a list of
|
|
``{"type": "text"|"image_url", ...}`` parts when images are present.
|
|
The output shape is the native OpenAI Chat Completions vision format,
|
|
which the agent pipeline accepts verbatim (OpenAI-wire providers) or
|
|
converts (``_preprocess_anthropic_content`` for Anthropic).
|
|
|
|
Raises ``ValueError`` with an OpenAI-style code on invalid input:
|
|
* ``unsupported_content_type`` — file/input_file/file_id parts, or
|
|
non-image ``data:`` URLs.
|
|
* ``invalid_image_url`` — missing URL or unsupported scheme.
|
|
* ``invalid_content_part`` — malformed text/image objects.
|
|
|
|
Callers translate the ValueError into a 400 response.
|
|
"""
|
|
# Scalar passthrough mirrors ``_normalize_chat_content``.
|
|
if content is None:
|
|
return ""
|
|
if isinstance(content, str):
|
|
return content[:MAX_NORMALIZED_TEXT_LENGTH] if len(content) > MAX_NORMALIZED_TEXT_LENGTH else content
|
|
if not isinstance(content, list):
|
|
# Mirror the legacy text-normalizer's fallback so callers that
|
|
# pre-existed image support still get a string back.
|
|
return _normalize_chat_content(content)
|
|
|
|
items = content[:MAX_CONTENT_LIST_SIZE] if len(content) > MAX_CONTENT_LIST_SIZE else content
|
|
normalized_parts: List[Dict[str, Any]] = []
|
|
text_accum_len = 0
|
|
|
|
for part in items:
|
|
if isinstance(part, str):
|
|
if part:
|
|
trimmed = part[:MAX_NORMALIZED_TEXT_LENGTH]
|
|
normalized_parts.append({"type": "text", "text": trimmed})
|
|
text_accum_len += len(trimmed)
|
|
continue
|
|
|
|
if not isinstance(part, dict):
|
|
# Ignore unknown scalars for forward compatibility with future
|
|
# Responses API additions (e.g. ``refusal``). The same policy
|
|
# the text normalizer applies.
|
|
continue
|
|
|
|
raw_type = part.get("type")
|
|
part_type = str(raw_type or "").strip().lower()
|
|
|
|
if part_type in _TEXT_PART_TYPES:
|
|
text = part.get("text")
|
|
if text is None:
|
|
continue
|
|
if not isinstance(text, str):
|
|
text = str(text)
|
|
if text:
|
|
trimmed = text[:MAX_NORMALIZED_TEXT_LENGTH]
|
|
normalized_parts.append({"type": "text", "text": trimmed})
|
|
text_accum_len += len(trimmed)
|
|
continue
|
|
|
|
if part_type in _IMAGE_PART_TYPES:
|
|
detail = part.get("detail")
|
|
image_ref = part.get("image_url")
|
|
# OpenAI Responses sends ``input_image`` with a top-level
|
|
# ``image_url`` string; Chat Completions sends ``image_url`` as
|
|
# ``{"url": "...", "detail": "..."}``. Support both.
|
|
if isinstance(image_ref, dict):
|
|
url_value = image_ref.get("url")
|
|
detail = image_ref.get("detail", detail)
|
|
else:
|
|
url_value = image_ref
|
|
if not isinstance(url_value, str) or not url_value.strip():
|
|
raise ValueError("invalid_image_url:Image parts must include a non-empty image URL.")
|
|
url_value = url_value.strip()
|
|
lowered = url_value.lower()
|
|
if lowered.startswith("data:"):
|
|
if not lowered.startswith("data:image/") or "," not in url_value:
|
|
raise ValueError(
|
|
"unsupported_content_type:Only image data URLs are supported. "
|
|
"Non-image data payloads are not supported."
|
|
)
|
|
elif not (lowered.startswith("http://") or lowered.startswith("https://")):
|
|
raise ValueError(
|
|
"invalid_image_url:Image inputs must use http(s) URLs or data:image/... URLs."
|
|
)
|
|
image_part: Dict[str, Any] = {"type": "image_url", "image_url": {"url": url_value}}
|
|
if detail is not None:
|
|
if not isinstance(detail, str) or not detail.strip():
|
|
raise ValueError("invalid_content_part:Image detail must be a non-empty string when provided.")
|
|
image_part["image_url"]["detail"] = detail.strip()
|
|
normalized_parts.append(image_part)
|
|
continue
|
|
|
|
if part_type in _FILE_PART_TYPES:
|
|
raise ValueError(
|
|
"unsupported_content_type:Inline image inputs are supported, "
|
|
"but uploaded files and document inputs are not supported on this endpoint."
|
|
)
|
|
|
|
# Unknown part type — reject explicitly so clients get a clear error
|
|
# instead of a silently dropped turn.
|
|
raise ValueError(
|
|
f"unsupported_content_type:Unsupported content part type {raw_type!r}. "
|
|
"Only text and image_url/input_image parts are supported."
|
|
)
|
|
|
|
if not normalized_parts:
|
|
return ""
|
|
|
|
# Text-only: collapse to a plain string so downstream logging/trajectory
|
|
# code sees the native shape and prompt caching on text-only turns is
|
|
# unaffected.
|
|
if all(p.get("type") == "text" for p in normalized_parts):
|
|
return "\n".join(p["text"] for p in normalized_parts if p.get("text"))
|
|
|
|
return normalized_parts
|
|
|
|
|
|
def _content_has_visible_payload(content: Any) -> bool:
|
|
"""True when content has any text or image attachment. Used to reject empty turns."""
|
|
if isinstance(content, str):
|
|
return bool(content.strip())
|
|
if isinstance(content, list):
|
|
for part in content:
|
|
if isinstance(part, dict):
|
|
ptype = str(part.get("type") or "").strip().lower()
|
|
if ptype in _TEXT_PART_TYPES and str(part.get("text") or "").strip():
|
|
return True
|
|
if ptype in _IMAGE_PART_TYPES:
|
|
return True
|
|
return False
|
|
|
|
|
|
def _multimodal_validation_error(exc: ValueError, *, param: str) -> "web.Response":
|
|
"""Translate a ``_normalize_multimodal_content`` ValueError into a 400 response."""
|
|
raw = str(exc)
|
|
code, _, message = raw.partition(":")
|
|
if not message:
|
|
code, message = "invalid_content_part", raw
|
|
return web.json_response(
|
|
_openai_error(message, code=code, param=param),
|
|
status=400,
|
|
)
|
|
|
|
class ResponseStore:
|
|
"""
|
|
SQLite-backed LRU store for Responses API state.
|
|
|
|
Each stored response includes the full internal conversation history
|
|
(with tool calls and results) so it can be reconstructed on subsequent
|
|
requests via previous_response_id.
|
|
|
|
Persists across gateway restarts. Falls back to in-memory SQLite
|
|
if the on-disk path is unavailable.
|
|
"""
|
|
|
|
def __init__(self, max_size: int = MAX_STORED_RESPONSES, db_path: str = None):
|
|
self._max_size = max_size
|
|
if db_path is None:
|
|
try:
|
|
from hermes_cli.config import get_hermes_home
|
|
db_path = str(get_hermes_home() / "response_store.db")
|
|
except Exception:
|
|
db_path = ":memory:"
|
|
try:
|
|
self._conn = sqlite3.connect(db_path, check_same_thread=False)
|
|
except Exception:
|
|
self._conn = sqlite3.connect(":memory:", check_same_thread=False)
|
|
self._conn.execute("PRAGMA journal_mode=WAL")
|
|
self._conn.execute(
|
|
"""CREATE TABLE IF NOT EXISTS responses (
|
|
response_id TEXT PRIMARY KEY,
|
|
data TEXT NOT NULL,
|
|
accessed_at REAL NOT NULL
|
|
)"""
|
|
)
|
|
self._conn.execute(
|
|
"""CREATE TABLE IF NOT EXISTS conversations (
|
|
name TEXT PRIMARY KEY,
|
|
response_id TEXT NOT NULL
|
|
)"""
|
|
)
|
|
self._conn.commit()
|
|
|
|
def get(self, response_id: str) -> Optional[Dict[str, Any]]:
|
|
"""Retrieve a stored response by ID (updates access time for LRU)."""
|
|
row = self._conn.execute(
|
|
"SELECT data FROM responses WHERE response_id = ?", (response_id,)
|
|
).fetchone()
|
|
if row is None:
|
|
return None
|
|
self._conn.execute(
|
|
"UPDATE responses SET accessed_at = ? WHERE response_id = ?",
|
|
(time.time(), response_id),
|
|
)
|
|
self._conn.commit()
|
|
return json.loads(row[0])
|
|
|
|
def put(self, response_id: str, data: Dict[str, Any]) -> None:
|
|
"""Store a response, evicting the oldest if at capacity."""
|
|
self._conn.execute(
|
|
"INSERT OR REPLACE INTO responses (response_id, data, accessed_at) VALUES (?, ?, ?)",
|
|
(response_id, json.dumps(data, default=str), time.time()),
|
|
)
|
|
# Evict oldest entries beyond max_size
|
|
count = self._conn.execute("SELECT COUNT(*) FROM responses").fetchone()[0]
|
|
if count > self._max_size:
|
|
self._conn.execute(
|
|
"DELETE FROM responses WHERE response_id IN "
|
|
"(SELECT response_id FROM responses ORDER BY accessed_at ASC LIMIT ?)",
|
|
(count - self._max_size,),
|
|
)
|
|
self._conn.commit()
|
|
|
|
def delete(self, response_id: str) -> bool:
|
|
"""Remove a response from the store. Returns True if found and deleted."""
|
|
cursor = self._conn.execute(
|
|
"DELETE FROM responses WHERE response_id = ?", (response_id,)
|
|
)
|
|
self._conn.commit()
|
|
return cursor.rowcount > 0
|
|
|
|
def get_conversation(self, name: str) -> Optional[str]:
|
|
"""Get the latest response_id for a conversation name."""
|
|
row = self._conn.execute(
|
|
"SELECT response_id FROM conversations WHERE name = ?", (name,)
|
|
).fetchone()
|
|
return row[0] if row else None
|
|
|
|
def set_conversation(self, name: str, response_id: str) -> None:
|
|
"""Map a conversation name to its latest response_id."""
|
|
self._conn.execute(
|
|
"INSERT OR REPLACE INTO conversations (name, response_id) VALUES (?, ?)",
|
|
(name, response_id),
|
|
)
|
|
self._conn.commit()
|
|
|
|
def close(self) -> None:
|
|
"""Close the database connection."""
|
|
try:
|
|
self._conn.close()
|
|
except Exception:
|
|
pass
|
|
|
|
def __len__(self) -> int:
|
|
row = self._conn.execute("SELECT COUNT(*) FROM responses").fetchone()
|
|
return row[0] if row else 0
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# CORS middleware
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_CORS_HEADERS = {
|
|
"Access-Control-Allow-Methods": "GET, POST, DELETE, OPTIONS",
|
|
"Access-Control-Allow-Headers": "Authorization, Content-Type, Idempotency-Key",
|
|
}
|
|
|
|
|
|
@web.middleware
|
|
async def cors_middleware(request, handler):
|
|
"""Add CORS headers for explicitly allowed origins; handle OPTIONS preflight."""
|
|
adapter = request.app.get("api_server_adapter")
|
|
origin = request.headers.get("Origin", "")
|
|
cors_headers = None
|
|
if adapter is not None:
|
|
if not adapter._origin_allowed(origin):
|
|
return web.Response(status=403)
|
|
cors_headers = adapter._cors_headers_for_origin(origin)
|
|
|
|
if request.method == "OPTIONS":
|
|
if cors_headers is None:
|
|
return web.Response(status=403)
|
|
return web.Response(status=200, headers=cors_headers)
|
|
|
|
response = await handler(request)
|
|
if cors_headers is not None:
|
|
response.headers.update(cors_headers)
|
|
return response
|
|
|
|
def _openai_error(message: str, err_type: str = "invalid_request_error", param: str = None, code: str = None) -> Dict[str, Any]:
|
|
"""OpenAI-style error envelope."""
|
|
return {
|
|
"error": {
|
|
"message": message,
|
|
"type": err_type,
|
|
"param": param,
|
|
"code": code,
|
|
}
|
|
}
|
|
|
|
|
|
@web.middleware
|
|
async def body_limit_middleware(request, handler):
|
|
"""Reject overly large request bodies early based on Content-Length."""
|
|
if request.method in ("POST", "PUT", "PATCH"):
|
|
cl = request.headers.get("Content-Length")
|
|
if cl is not None:
|
|
try:
|
|
if int(cl) > MAX_REQUEST_BYTES:
|
|
return web.json_response(_openai_error("Request body too large.", code="body_too_large"), status=413)
|
|
except ValueError:
|
|
return web.json_response(_openai_error("Invalid Content-Length header.", code="invalid_content_length"), status=400)
|
|
return await handler(request)
|
|
|
|
_SECURITY_HEADERS = {
|
|
"X-Content-Type-Options": "nosniff",
|
|
"Referrer-Policy": "no-referrer",
|
|
}
|
|
|
|
|
|
@web.middleware
|
|
async def security_headers_middleware(request, handler):
|
|
"""Add security headers to all responses (including errors)."""
|
|
response = await handler(request)
|
|
for k, v in _SECURITY_HEADERS.items():
|
|
response.headers.setdefault(k, v)
|
|
return response
|
|
|
|
|
|
class _IdempotencyCache:
|
|
"""In-memory idempotency cache with TTL and basic LRU semantics."""
|
|
def __init__(self, max_items: int = 1000, ttl_seconds: int = 300):
|
|
from collections import OrderedDict
|
|
self._store = OrderedDict()
|
|
self._inflight: Dict[tuple[str, str], "asyncio.Task[Any]"] = {}
|
|
self._ttl = ttl_seconds
|
|
self._max = max_items
|
|
|
|
def _purge(self):
|
|
now = time.time()
|
|
expired = [k for k, v in self._store.items() if now - v["ts"] > self._ttl]
|
|
for k in expired:
|
|
self._store.pop(k, None)
|
|
while len(self._store) > self._max:
|
|
self._store.popitem(last=False)
|
|
|
|
async def get_or_set(self, key: str, fingerprint: str, compute_coro):
|
|
self._purge()
|
|
item = self._store.get(key)
|
|
if item and item["fp"] == fingerprint:
|
|
return item["resp"]
|
|
|
|
inflight_key = (key, fingerprint)
|
|
task = self._inflight.get(inflight_key)
|
|
if task is None:
|
|
async def _compute_and_store():
|
|
resp = await compute_coro()
|
|
import time as _t
|
|
self._store[key] = {"resp": resp, "fp": fingerprint, "ts": _t.time()}
|
|
self._purge()
|
|
return resp
|
|
|
|
task = asyncio.create_task(_compute_and_store())
|
|
self._inflight[inflight_key] = task
|
|
|
|
def _clear_inflight(done_task: "asyncio.Task[Any]") -> None:
|
|
if self._inflight.get(inflight_key) is done_task:
|
|
self._inflight.pop(inflight_key, None)
|
|
|
|
task.add_done_callback(_clear_inflight)
|
|
|
|
return await asyncio.shield(task)
|
|
|
|
|
|
_idem_cache = _IdempotencyCache()
|
|
|
|
|
|
def _make_request_fingerprint(body: Dict[str, Any], keys: List[str]) -> str:
|
|
from hashlib import sha256
|
|
subset = {k: body.get(k) for k in keys}
|
|
return sha256(repr(subset).encode("utf-8")).hexdigest()
|
|
|
|
|
|
def _derive_chat_session_id(
|
|
system_prompt: Optional[str],
|
|
first_user_message: str,
|
|
) -> str:
|
|
"""Derive a stable session ID from the conversation's first user message.
|
|
|
|
OpenAI-compatible frontends (Open WebUI, LibreChat, etc.) send the full
|
|
conversation history with every request. The system prompt and first user
|
|
message are constant across all turns of the same conversation, so hashing
|
|
them produces a deterministic session ID that lets the API server reuse
|
|
the same Hermes session (and therefore the same Docker container sandbox
|
|
directory) across turns.
|
|
"""
|
|
seed = f"{system_prompt or ''}\n{first_user_message}"
|
|
digest = hashlib.sha256(seed.encode("utf-8")).hexdigest()[:16]
|
|
return f"api-{digest}"
|
|
|
|
|
|
_CRON_AVAILABLE = False
|
|
try:
|
|
from cron.jobs import (
|
|
list_jobs as _cron_list,
|
|
get_job as _cron_get,
|
|
create_job as _cron_create,
|
|
update_job as _cron_update,
|
|
remove_job as _cron_remove,
|
|
pause_job as _cron_pause,
|
|
resume_job as _cron_resume,
|
|
trigger_job as _cron_trigger,
|
|
)
|
|
_CRON_AVAILABLE = True
|
|
except ImportError:
|
|
_cron_list = None
|
|
_cron_get = None
|
|
_cron_create = None
|
|
_cron_update = None
|
|
_cron_remove = None
|
|
_cron_pause = None
|
|
_cron_resume = None
|
|
_cron_trigger = None
|
|
|
|
|
|
class APIServerAdapter(BasePlatformAdapter):
|
|
"""
|
|
OpenAI-compatible HTTP API server adapter.
|
|
|
|
Runs an aiohttp web server that accepts OpenAI-format requests
|
|
and routes them through hermes-agent's AIAgent.
|
|
"""
|
|
|
|
def __init__(self, config: PlatformConfig):
|
|
super().__init__(config, Platform.API_SERVER)
|
|
extra = config.extra or {}
|
|
self._host: str = extra.get("host", os.getenv("API_SERVER_HOST", DEFAULT_HOST))
|
|
self._port: int = int(extra.get("port", os.getenv("API_SERVER_PORT", str(DEFAULT_PORT))))
|
|
self._api_key: str = extra.get("key", os.getenv("API_SERVER_KEY", ""))
|
|
self._cors_origins: tuple[str, ...] = self._parse_cors_origins(
|
|
extra.get("cors_origins", os.getenv("API_SERVER_CORS_ORIGINS", "")),
|
|
)
|
|
self._model_name: str = self._resolve_model_name(
|
|
extra.get("model_name", os.getenv("API_SERVER_MODEL_NAME", "")),
|
|
)
|
|
self._app: Optional["web.Application"] = None
|
|
self._runner: Optional["web.AppRunner"] = None
|
|
self._site: Optional["web.TCPSite"] = None
|
|
self._response_store = ResponseStore()
|
|
# Active run streams: run_id -> asyncio.Queue of SSE event dicts
|
|
self._run_streams: Dict[str, "asyncio.Queue[Optional[Dict]]"] = {}
|
|
# Creation timestamps for orphaned-run TTL sweep
|
|
self._run_streams_created: Dict[str, float] = {}
|
|
self._session_db: Optional[Any] = None # Lazy-init SessionDB for session continuity
|
|
|
|
@staticmethod
|
|
def _parse_cors_origins(value: Any) -> tuple[str, ...]:
|
|
"""Normalize configured CORS origins into a stable tuple."""
|
|
if not value:
|
|
return ()
|
|
|
|
if isinstance(value, str):
|
|
items = value.split(",")
|
|
elif isinstance(value, (list, tuple, set)):
|
|
items = value
|
|
else:
|
|
items = [str(value)]
|
|
|
|
return tuple(str(item).strip() for item in items if str(item).strip())
|
|
|
|
@staticmethod
|
|
def _resolve_model_name(explicit: str) -> str:
|
|
"""Derive the advertised model name for /v1/models.
|
|
|
|
Priority:
|
|
1. Explicit override (config extra or API_SERVER_MODEL_NAME env var)
|
|
2. Active profile name (so each profile advertises a distinct model)
|
|
3. Fallback: "hermes-agent"
|
|
"""
|
|
if explicit and explicit.strip():
|
|
return explicit.strip()
|
|
try:
|
|
from hermes_cli.profiles import get_active_profile_name
|
|
profile = get_active_profile_name()
|
|
if profile and profile not in ("default", "custom"):
|
|
return profile
|
|
except Exception:
|
|
pass
|
|
return "hermes-agent"
|
|
|
|
def _cors_headers_for_origin(self, origin: str) -> Optional[Dict[str, str]]:
|
|
"""Return CORS headers for an allowed browser origin."""
|
|
if not origin or not self._cors_origins:
|
|
return None
|
|
|
|
if "*" in self._cors_origins:
|
|
headers = dict(_CORS_HEADERS)
|
|
headers["Access-Control-Allow-Origin"] = "*"
|
|
headers["Access-Control-Max-Age"] = "600"
|
|
return headers
|
|
|
|
if origin not in self._cors_origins:
|
|
return None
|
|
|
|
headers = dict(_CORS_HEADERS)
|
|
headers["Access-Control-Allow-Origin"] = origin
|
|
headers["Vary"] = "Origin"
|
|
headers["Access-Control-Max-Age"] = "600"
|
|
return headers
|
|
|
|
def _origin_allowed(self, origin: str) -> bool:
|
|
"""Allow non-browser clients and explicitly configured browser origins."""
|
|
if not origin:
|
|
return True
|
|
|
|
if not self._cors_origins:
|
|
return False
|
|
|
|
return "*" in self._cors_origins or origin in self._cors_origins
|
|
|
|
# ------------------------------------------------------------------
|
|
# Auth helper
|
|
# ------------------------------------------------------------------
|
|
|
|
def _check_auth(self, request: "web.Request") -> Optional["web.Response"]:
|
|
"""
|
|
Validate Bearer token from Authorization header.
|
|
|
|
Returns None if auth is OK, or a 401 web.Response on failure.
|
|
If no API key is configured, all requests are allowed (only when API
|
|
server is local).
|
|
"""
|
|
if not self._api_key:
|
|
return None # No key configured — allow all (local-only use)
|
|
|
|
auth_header = request.headers.get("Authorization", "")
|
|
if auth_header.startswith("Bearer "):
|
|
token = auth_header[7:].strip()
|
|
if hmac.compare_digest(token, self._api_key):
|
|
return None # Auth OK
|
|
|
|
return web.json_response(
|
|
{"error": {"message": "Invalid API key", "type": "invalid_request_error", "code": "invalid_api_key"}},
|
|
status=401,
|
|
)
|
|
|
|
# ------------------------------------------------------------------
|
|
# Session DB helper
|
|
# ------------------------------------------------------------------
|
|
|
|
def _ensure_session_db(self):
|
|
"""Lazily initialise and return the shared SessionDB instance.
|
|
|
|
Sessions are persisted to ``state.db`` so that ``hermes sessions list``
|
|
shows API-server conversations alongside CLI and gateway ones.
|
|
"""
|
|
if self._session_db is None:
|
|
try:
|
|
from hermes_state import SessionDB
|
|
self._session_db = SessionDB()
|
|
except Exception as e:
|
|
logger.debug("SessionDB unavailable for API server: %s", e)
|
|
return self._session_db
|
|
|
|
# ------------------------------------------------------------------
|
|
# Agent creation helper
|
|
# ------------------------------------------------------------------
|
|
|
|
def _create_agent(
|
|
self,
|
|
ephemeral_system_prompt: Optional[str] = None,
|
|
session_id: Optional[str] = None,
|
|
stream_delta_callback=None,
|
|
tool_progress_callback=None,
|
|
tool_start_callback=None,
|
|
tool_complete_callback=None,
|
|
) -> Any:
|
|
"""
|
|
Create an AIAgent instance using the gateway's runtime config.
|
|
|
|
Uses _resolve_runtime_agent_kwargs() to pick up model, api_key,
|
|
base_url, etc. from config.yaml / env vars. Toolsets are resolved
|
|
from config.yaml platform_toolsets.api_server (same as all other
|
|
gateway platforms), falling back to the hermes-api-server default.
|
|
"""
|
|
from run_agent import AIAgent
|
|
from gateway.run import _resolve_runtime_agent_kwargs, _resolve_gateway_model, _load_gateway_config
|
|
from hermes_cli.tools_config import _get_platform_tools
|
|
|
|
runtime_kwargs = _resolve_runtime_agent_kwargs()
|
|
model = _resolve_gateway_model()
|
|
|
|
user_config = _load_gateway_config()
|
|
enabled_toolsets = sorted(_get_platform_tools(user_config, "api_server"))
|
|
|
|
max_iterations = int(os.getenv("HERMES_MAX_ITERATIONS", "90"))
|
|
|
|
# Load fallback provider chain so the API server platform has the
|
|
# same fallback behaviour as Telegram/Discord/Slack (fixes #4954).
|
|
from gateway.run import GatewayRunner
|
|
fallback_model = GatewayRunner._load_fallback_model()
|
|
|
|
agent = AIAgent(
|
|
model=model,
|
|
**runtime_kwargs,
|
|
max_iterations=max_iterations,
|
|
quiet_mode=True,
|
|
verbose_logging=False,
|
|
ephemeral_system_prompt=ephemeral_system_prompt or None,
|
|
enabled_toolsets=enabled_toolsets,
|
|
session_id=session_id,
|
|
platform="api_server",
|
|
stream_delta_callback=stream_delta_callback,
|
|
tool_progress_callback=tool_progress_callback,
|
|
tool_start_callback=tool_start_callback,
|
|
tool_complete_callback=tool_complete_callback,
|
|
session_db=self._ensure_session_db(),
|
|
fallback_model=fallback_model,
|
|
)
|
|
return agent
|
|
|
|
# ------------------------------------------------------------------
|
|
# HTTP Handlers
|
|
# ------------------------------------------------------------------
|
|
|
|
async def _handle_health(self, request: "web.Request") -> "web.Response":
|
|
"""GET /health — simple health check."""
|
|
return web.json_response({"status": "ok", "platform": "hermes-agent"})
|
|
|
|
async def _handle_health_detailed(self, request: "web.Request") -> "web.Response":
|
|
"""GET /health/detailed — rich status for cross-container dashboard probing.
|
|
|
|
Returns gateway state, connected platforms, PID, and uptime so the
|
|
dashboard can display full status without needing a shared PID file or
|
|
/proc access. No authentication required.
|
|
"""
|
|
from gateway.status import read_runtime_status
|
|
|
|
runtime = read_runtime_status() or {}
|
|
return web.json_response({
|
|
"status": "ok",
|
|
"platform": "hermes-agent",
|
|
"gateway_state": runtime.get("gateway_state"),
|
|
"platforms": runtime.get("platforms", {}),
|
|
"active_agents": runtime.get("active_agents", 0),
|
|
"exit_reason": runtime.get("exit_reason"),
|
|
"updated_at": runtime.get("updated_at"),
|
|
"pid": os.getpid(),
|
|
})
|
|
|
|
async def _handle_models(self, request: "web.Request") -> "web.Response":
|
|
"""GET /v1/models — return hermes-agent as an available model."""
|
|
auth_err = self._check_auth(request)
|
|
if auth_err:
|
|
return auth_err
|
|
|
|
return web.json_response({
|
|
"object": "list",
|
|
"data": [
|
|
{
|
|
"id": self._model_name,
|
|
"object": "model",
|
|
"created": int(time.time()),
|
|
"owned_by": "hermes",
|
|
"permission": [],
|
|
"root": self._model_name,
|
|
"parent": None,
|
|
}
|
|
],
|
|
})
|
|
|
|
async def _handle_chat_completions(self, request: "web.Request") -> "web.StreamResponse":
|
|
"""POST /v1/chat/completions — OpenAI Chat Completions format."""
|
|
auth_err = self._check_auth(request)
|
|
if auth_err:
|
|
return auth_err
|
|
|
|
# Parse request body
|
|
try:
|
|
body = await request.json()
|
|
except (json.JSONDecodeError, Exception):
|
|
return web.json_response(_openai_error("Invalid JSON in request body"), status=400)
|
|
|
|
messages = body.get("messages")
|
|
if not messages or not isinstance(messages, list):
|
|
return web.json_response(
|
|
{"error": {"message": "Missing or invalid 'messages' field", "type": "invalid_request_error"}},
|
|
status=400,
|
|
)
|
|
|
|
stream = body.get("stream", False)
|
|
|
|
# Extract system message (becomes ephemeral system prompt layered ON TOP of core)
|
|
system_prompt = None
|
|
conversation_messages: List[Dict[str, str]] = []
|
|
|
|
for idx, msg in enumerate(messages):
|
|
role = msg.get("role", "")
|
|
raw_content = msg.get("content", "")
|
|
if role == "system":
|
|
# System messages don't support images (Anthropic rejects, OpenAI
|
|
# text-model systems don't render them). Flatten to text.
|
|
content = _normalize_chat_content(raw_content)
|
|
if system_prompt is None:
|
|
system_prompt = content
|
|
else:
|
|
system_prompt = system_prompt + "\n" + content
|
|
elif role in ("user", "assistant"):
|
|
try:
|
|
content = _normalize_multimodal_content(raw_content)
|
|
except ValueError as exc:
|
|
return _multimodal_validation_error(exc, param=f"messages[{idx}].content")
|
|
conversation_messages.append({"role": role, "content": content})
|
|
|
|
# Extract the last user message as the primary input
|
|
user_message: Any = ""
|
|
history = []
|
|
if conversation_messages:
|
|
user_message = conversation_messages[-1].get("content", "")
|
|
history = conversation_messages[:-1]
|
|
|
|
if not _content_has_visible_payload(user_message):
|
|
return web.json_response(
|
|
{"error": {"message": "No user message found in messages", "type": "invalid_request_error"}},
|
|
status=400,
|
|
)
|
|
|
|
# Allow caller to continue an existing session by passing X-Hermes-Session-Id.
|
|
# When provided, history is loaded from state.db instead of from the request body.
|
|
#
|
|
# Security: session continuation exposes conversation history, so it is
|
|
# only allowed when the API key is configured and the request is
|
|
# authenticated. Without this gate, any unauthenticated client could
|
|
# read arbitrary session history by guessing/enumerating session IDs.
|
|
provided_session_id = request.headers.get("X-Hermes-Session-Id", "").strip()
|
|
if provided_session_id:
|
|
if not self._api_key:
|
|
logger.warning(
|
|
"Session continuation via X-Hermes-Session-Id rejected: "
|
|
"no API key configured. Set API_SERVER_KEY to enable "
|
|
"session continuity."
|
|
)
|
|
return web.json_response(
|
|
_openai_error(
|
|
"Session continuation requires API key authentication. "
|
|
"Configure API_SERVER_KEY to enable this feature."
|
|
),
|
|
status=403,
|
|
)
|
|
# Sanitize: reject control characters that could enable header injection.
|
|
if re.search(r'[\r\n\x00]', provided_session_id):
|
|
return web.json_response(
|
|
{"error": {"message": "Invalid session ID", "type": "invalid_request_error"}},
|
|
status=400,
|
|
)
|
|
session_id = provided_session_id
|
|
try:
|
|
db = self._ensure_session_db()
|
|
if db is not None:
|
|
history = db.get_messages_as_conversation(session_id)
|
|
except Exception as e:
|
|
logger.warning("Failed to load session history for %s: %s", session_id, e)
|
|
history = []
|
|
else:
|
|
# Derive a stable session ID from the conversation fingerprint so
|
|
# that consecutive messages from the same Open WebUI (or similar)
|
|
# conversation map to the same Hermes session. The first user
|
|
# message + system prompt are constant across all turns.
|
|
first_user = ""
|
|
for cm in conversation_messages:
|
|
if cm.get("role") == "user":
|
|
first_user = cm.get("content", "")
|
|
break
|
|
session_id = _derive_chat_session_id(system_prompt, first_user)
|
|
# history already set from request body above
|
|
|
|
completion_id = f"chatcmpl-{uuid.uuid4().hex[:29]}"
|
|
model_name = body.get("model", self._model_name)
|
|
created = int(time.time())
|
|
|
|
if stream:
|
|
import queue as _q
|
|
_stream_q: _q.Queue = _q.Queue()
|
|
|
|
def _on_delta(delta):
|
|
# Filter out None — the agent fires stream_delta_callback(None)
|
|
# to signal the CLI display to close its response box before
|
|
# tool execution, but the SSE writer uses None as end-of-stream
|
|
# sentinel. Forwarding it would prematurely close the HTTP
|
|
# response, causing Open WebUI (and similar frontends) to miss
|
|
# the final answer after tool calls. The SSE loop detects
|
|
# completion via agent_task.done() instead.
|
|
if delta is not None:
|
|
_stream_q.put(delta)
|
|
|
|
def _on_tool_progress(event_type, name, preview, args, **kwargs):
|
|
"""Send tool progress as a separate SSE event.
|
|
|
|
Previously, progress markers like ``⏰ list`` were injected
|
|
directly into ``delta.content``. OpenAI-compatible frontends
|
|
(Open WebUI, LobeChat, …) store ``delta.content`` verbatim as
|
|
the assistant message and send it back on subsequent requests.
|
|
After enough turns the model learns to *emit* the markers as
|
|
plain text instead of issuing real tool calls — silently
|
|
hallucinating tool results. See #6972.
|
|
|
|
The fix: push a tagged tuple ``("__tool_progress__", payload)``
|
|
onto the stream queue. The SSE writer emits it as a custom
|
|
``event: hermes.tool.progress`` line that compliant frontends
|
|
can render for UX but will *not* persist into conversation
|
|
history. Clients that don't understand the custom event type
|
|
silently ignore it per the SSE specification.
|
|
"""
|
|
if event_type != "tool.started":
|
|
return
|
|
if name.startswith("_"):
|
|
return
|
|
from agent.display import get_tool_emoji
|
|
emoji = get_tool_emoji(name)
|
|
label = preview or name
|
|
_stream_q.put(("__tool_progress__", {
|
|
"tool": name,
|
|
"emoji": emoji,
|
|
"label": label,
|
|
}))
|
|
|
|
# Start agent in background. agent_ref is a mutable container
|
|
# so the SSE writer can interrupt the agent on client disconnect.
|
|
agent_ref = [None]
|
|
agent_task = asyncio.ensure_future(self._run_agent(
|
|
user_message=user_message,
|
|
conversation_history=history,
|
|
ephemeral_system_prompt=system_prompt,
|
|
session_id=session_id,
|
|
stream_delta_callback=_on_delta,
|
|
tool_progress_callback=_on_tool_progress,
|
|
agent_ref=agent_ref,
|
|
))
|
|
|
|
return await self._write_sse_chat_completion(
|
|
request, completion_id, model_name, created, _stream_q,
|
|
agent_task, agent_ref, session_id=session_id,
|
|
)
|
|
|
|
# Non-streaming: run the agent (with optional Idempotency-Key)
|
|
async def _compute_completion():
|
|
return await self._run_agent(
|
|
user_message=user_message,
|
|
conversation_history=history,
|
|
ephemeral_system_prompt=system_prompt,
|
|
session_id=session_id,
|
|
)
|
|
|
|
idempotency_key = request.headers.get("Idempotency-Key")
|
|
if idempotency_key:
|
|
fp = _make_request_fingerprint(body, keys=["model", "messages", "tools", "tool_choice", "stream"])
|
|
try:
|
|
result, usage = await _idem_cache.get_or_set(idempotency_key, fp, _compute_completion)
|
|
except Exception as e:
|
|
logger.error("Error running agent for chat completions: %s", e, exc_info=True)
|
|
return web.json_response(
|
|
_openai_error(f"Internal server error: {e}", err_type="server_error"),
|
|
status=500,
|
|
)
|
|
else:
|
|
try:
|
|
result, usage = await _compute_completion()
|
|
except Exception as e:
|
|
logger.error("Error running agent for chat completions: %s", e, exc_info=True)
|
|
return web.json_response(
|
|
_openai_error(f"Internal server error: {e}", err_type="server_error"),
|
|
status=500,
|
|
)
|
|
|
|
final_response = result.get("final_response", "")
|
|
if not final_response:
|
|
final_response = result.get("error", "(No response generated)")
|
|
|
|
response_data = {
|
|
"id": completion_id,
|
|
"object": "chat.completion",
|
|
"created": created,
|
|
"model": model_name,
|
|
"choices": [
|
|
{
|
|
"index": 0,
|
|
"message": {
|
|
"role": "assistant",
|
|
"content": final_response,
|
|
},
|
|
"finish_reason": "stop",
|
|
}
|
|
],
|
|
"usage": {
|
|
"prompt_tokens": usage.get("input_tokens", 0),
|
|
"completion_tokens": usage.get("output_tokens", 0),
|
|
"total_tokens": usage.get("total_tokens", 0),
|
|
},
|
|
}
|
|
|
|
return web.json_response(response_data, headers={"X-Hermes-Session-Id": session_id})
|
|
|
|
async def _write_sse_chat_completion(
|
|
self, request: "web.Request", completion_id: str, model: str,
|
|
created: int, stream_q, agent_task, agent_ref=None, session_id: str = None,
|
|
) -> "web.StreamResponse":
|
|
"""Write real streaming SSE from agent's stream_delta_callback queue.
|
|
|
|
If the client disconnects mid-stream (network drop, browser tab close),
|
|
the agent is interrupted via ``agent.interrupt()`` so it stops making
|
|
LLM API calls, and the asyncio task wrapper is cancelled.
|
|
"""
|
|
import queue as _q
|
|
|
|
sse_headers = {
|
|
"Content-Type": "text/event-stream",
|
|
"Cache-Control": "no-cache",
|
|
"X-Accel-Buffering": "no",
|
|
}
|
|
# CORS middleware can't inject headers into StreamResponse after
|
|
# prepare() flushes them, so resolve CORS headers up front.
|
|
origin = request.headers.get("Origin", "")
|
|
cors = self._cors_headers_for_origin(origin) if origin else None
|
|
if cors:
|
|
sse_headers.update(cors)
|
|
if session_id:
|
|
sse_headers["X-Hermes-Session-Id"] = session_id
|
|
response = web.StreamResponse(status=200, headers=sse_headers)
|
|
await response.prepare(request)
|
|
|
|
try:
|
|
last_activity = time.monotonic()
|
|
|
|
# Role chunk
|
|
role_chunk = {
|
|
"id": completion_id, "object": "chat.completion.chunk",
|
|
"created": created, "model": model,
|
|
"choices": [{"index": 0, "delta": {"role": "assistant"}, "finish_reason": None}],
|
|
}
|
|
await response.write(f"data: {json.dumps(role_chunk)}\n\n".encode())
|
|
last_activity = time.monotonic()
|
|
|
|
# Helper — route a queue item to the correct SSE event.
|
|
async def _emit(item):
|
|
"""Write a single queue item to the SSE stream.
|
|
|
|
Plain strings are sent as normal ``delta.content`` chunks.
|
|
Tagged tuples ``("__tool_progress__", payload)`` are sent
|
|
as a custom ``event: hermes.tool.progress`` SSE event so
|
|
frontends can display them without storing the markers in
|
|
conversation history. See #6972.
|
|
"""
|
|
if isinstance(item, tuple) and len(item) == 2 and item[0] == "__tool_progress__":
|
|
event_data = json.dumps(item[1])
|
|
await response.write(
|
|
f"event: hermes.tool.progress\ndata: {event_data}\n\n".encode()
|
|
)
|
|
else:
|
|
content_chunk = {
|
|
"id": completion_id, "object": "chat.completion.chunk",
|
|
"created": created, "model": model,
|
|
"choices": [{"index": 0, "delta": {"content": item}, "finish_reason": None}],
|
|
}
|
|
await response.write(f"data: {json.dumps(content_chunk)}\n\n".encode())
|
|
return time.monotonic()
|
|
|
|
# Stream content chunks as they arrive from the agent
|
|
loop = asyncio.get_running_loop()
|
|
while True:
|
|
try:
|
|
delta = await loop.run_in_executor(None, lambda: stream_q.get(timeout=0.5))
|
|
except _q.Empty:
|
|
if agent_task.done():
|
|
# Drain any remaining items
|
|
while True:
|
|
try:
|
|
delta = stream_q.get_nowait()
|
|
if delta is None:
|
|
break
|
|
last_activity = await _emit(delta)
|
|
except _q.Empty:
|
|
break
|
|
break
|
|
if time.monotonic() - last_activity >= CHAT_COMPLETIONS_SSE_KEEPALIVE_SECONDS:
|
|
await response.write(b": keepalive\n\n")
|
|
last_activity = time.monotonic()
|
|
continue
|
|
|
|
if delta is None: # End of stream sentinel
|
|
break
|
|
|
|
last_activity = await _emit(delta)
|
|
|
|
# Get usage from completed agent
|
|
usage = {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0}
|
|
try:
|
|
result, agent_usage = await agent_task
|
|
usage = agent_usage or usage
|
|
except Exception:
|
|
pass
|
|
|
|
# Finish chunk
|
|
finish_chunk = {
|
|
"id": completion_id, "object": "chat.completion.chunk",
|
|
"created": created, "model": model,
|
|
"choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}],
|
|
"usage": {
|
|
"prompt_tokens": usage.get("input_tokens", 0),
|
|
"completion_tokens": usage.get("output_tokens", 0),
|
|
"total_tokens": usage.get("total_tokens", 0),
|
|
},
|
|
}
|
|
await response.write(f"data: {json.dumps(finish_chunk)}\n\n".encode())
|
|
await response.write(b"data: [DONE]\n\n")
|
|
except (ConnectionResetError, ConnectionAbortedError, BrokenPipeError, OSError):
|
|
# Client disconnected mid-stream. Interrupt the agent so it
|
|
# stops making LLM API calls at the next loop iteration, then
|
|
# cancel the asyncio task wrapper.
|
|
agent = agent_ref[0] if agent_ref else None
|
|
if agent is not None:
|
|
try:
|
|
agent.interrupt("SSE client disconnected")
|
|
except Exception:
|
|
pass
|
|
if not agent_task.done():
|
|
agent_task.cancel()
|
|
try:
|
|
await agent_task
|
|
except (asyncio.CancelledError, Exception):
|
|
pass
|
|
logger.info("SSE client disconnected; interrupted agent task %s", completion_id)
|
|
|
|
return response
|
|
|
|
async def _write_sse_responses(
|
|
self,
|
|
request: "web.Request",
|
|
response_id: str,
|
|
model: str,
|
|
created_at: int,
|
|
stream_q,
|
|
agent_task,
|
|
agent_ref,
|
|
conversation_history: List[Dict[str, str]],
|
|
user_message: str,
|
|
instructions: Optional[str],
|
|
conversation: Optional[str],
|
|
store: bool,
|
|
session_id: str,
|
|
) -> "web.StreamResponse":
|
|
"""Write an SSE stream for POST /v1/responses (OpenAI Responses API).
|
|
|
|
Emits spec-compliant event types as the agent runs:
|
|
|
|
- ``response.created`` — initial envelope (status=in_progress)
|
|
- ``response.output_text.delta`` / ``response.output_text.done`` —
|
|
streamed assistant text
|
|
- ``response.output_item.added`` / ``response.output_item.done``
|
|
with ``item.type == "function_call"`` — when the agent invokes a
|
|
tool (both events fire; the ``done`` event carries the finalized
|
|
``arguments`` string)
|
|
- ``response.output_item.added`` with
|
|
``item.type == "function_call_output"`` — tool result with
|
|
``{call_id, output, status}``
|
|
- ``response.completed`` — terminal event carrying the full
|
|
response object with all output items + usage (same payload
|
|
shape as the non-streaming path for parity)
|
|
- ``response.failed`` — terminal event on agent error
|
|
|
|
If the client disconnects mid-stream, ``agent.interrupt()`` is
|
|
called so the agent stops issuing upstream LLM calls, then the
|
|
asyncio task is cancelled. When ``store=True`` the full response
|
|
is persisted to the ResponseStore in a ``finally`` block so GET
|
|
/v1/responses/{id} and ``previous_response_id`` chaining work the
|
|
same as the batch path.
|
|
"""
|
|
import queue as _q
|
|
|
|
sse_headers = {
|
|
"Content-Type": "text/event-stream",
|
|
"Cache-Control": "no-cache",
|
|
"X-Accel-Buffering": "no",
|
|
}
|
|
origin = request.headers.get("Origin", "")
|
|
cors = self._cors_headers_for_origin(origin) if origin else None
|
|
if cors:
|
|
sse_headers.update(cors)
|
|
if session_id:
|
|
sse_headers["X-Hermes-Session-Id"] = session_id
|
|
response = web.StreamResponse(status=200, headers=sse_headers)
|
|
await response.prepare(request)
|
|
|
|
# State accumulated during the stream
|
|
final_text_parts: List[str] = []
|
|
# Track open function_call items by name so we can emit a matching
|
|
# ``done`` event when the tool completes. Order preserved.
|
|
pending_tool_calls: List[Dict[str, Any]] = []
|
|
# Output items we've emitted so far (used to build the terminal
|
|
# response.completed payload). Kept in the order they appeared.
|
|
emitted_items: List[Dict[str, Any]] = []
|
|
# Monotonic counter for output_index (spec requires it).
|
|
output_index = 0
|
|
# Monotonic counter for call_id generation if the agent doesn't
|
|
# provide one (it doesn't, from tool_progress_callback).
|
|
call_counter = 0
|
|
# Canonical Responses SSE events include a monotonically increasing
|
|
# sequence_number. Add it server-side for every emitted event so
|
|
# clients that validate the OpenAI event schema can parse our stream.
|
|
sequence_number = 0
|
|
# Track the assistant message item id + content index for text
|
|
# delta events — the spec ties deltas to a specific item.
|
|
message_item_id = f"msg_{uuid.uuid4().hex[:24]}"
|
|
message_output_index: Optional[int] = None
|
|
message_opened = False
|
|
|
|
async def _write_event(event_type: str, data: Dict[str, Any]) -> None:
|
|
nonlocal sequence_number
|
|
if "sequence_number" not in data:
|
|
data["sequence_number"] = sequence_number
|
|
sequence_number += 1
|
|
payload = f"event: {event_type}\ndata: {json.dumps(data)}\n\n"
|
|
await response.write(payload.encode())
|
|
|
|
def _envelope(status: str) -> Dict[str, Any]:
|
|
env: Dict[str, Any] = {
|
|
"id": response_id,
|
|
"object": "response",
|
|
"status": status,
|
|
"created_at": created_at,
|
|
"model": model,
|
|
}
|
|
return env
|
|
|
|
final_response_text = ""
|
|
agent_error: Optional[str] = None
|
|
usage: Dict[str, int] = {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0}
|
|
|
|
try:
|
|
# response.created — initial envelope, status=in_progress
|
|
created_env = _envelope("in_progress")
|
|
created_env["output"] = []
|
|
await _write_event("response.created", {
|
|
"type": "response.created",
|
|
"response": created_env,
|
|
})
|
|
last_activity = time.monotonic()
|
|
|
|
async def _open_message_item() -> None:
|
|
"""Emit response.output_item.added for the assistant message
|
|
the first time any text delta arrives."""
|
|
nonlocal message_opened, message_output_index, output_index
|
|
if message_opened:
|
|
return
|
|
message_opened = True
|
|
message_output_index = output_index
|
|
output_index += 1
|
|
item = {
|
|
"id": message_item_id,
|
|
"type": "message",
|
|
"status": "in_progress",
|
|
"role": "assistant",
|
|
"content": [],
|
|
}
|
|
await _write_event("response.output_item.added", {
|
|
"type": "response.output_item.added",
|
|
"output_index": message_output_index,
|
|
"item": item,
|
|
})
|
|
|
|
async def _emit_text_delta(delta_text: str) -> None:
|
|
await _open_message_item()
|
|
final_text_parts.append(delta_text)
|
|
await _write_event("response.output_text.delta", {
|
|
"type": "response.output_text.delta",
|
|
"item_id": message_item_id,
|
|
"output_index": message_output_index,
|
|
"content_index": 0,
|
|
"delta": delta_text,
|
|
"logprobs": [],
|
|
})
|
|
|
|
async def _emit_tool_started(payload: Dict[str, Any]) -> str:
|
|
"""Emit response.output_item.added for a function_call.
|
|
|
|
Returns the call_id so the matching completion event can
|
|
reference it. Prefer the real ``tool_call_id`` from the
|
|
agent when available; fall back to a generated call id for
|
|
safety in tests or older code paths.
|
|
"""
|
|
nonlocal output_index, call_counter
|
|
call_counter += 1
|
|
call_id = payload.get("tool_call_id") or f"call_{response_id[5:]}_{call_counter}"
|
|
args = payload.get("arguments", {})
|
|
if isinstance(args, dict):
|
|
arguments_str = json.dumps(args)
|
|
else:
|
|
arguments_str = str(args)
|
|
item = {
|
|
"id": f"fc_{uuid.uuid4().hex[:24]}",
|
|
"type": "function_call",
|
|
"status": "in_progress",
|
|
"name": payload.get("name", ""),
|
|
"call_id": call_id,
|
|
"arguments": arguments_str,
|
|
}
|
|
idx = output_index
|
|
output_index += 1
|
|
pending_tool_calls.append({
|
|
"call_id": call_id,
|
|
"name": payload.get("name", ""),
|
|
"arguments": arguments_str,
|
|
"item_id": item["id"],
|
|
"output_index": idx,
|
|
})
|
|
emitted_items.append({
|
|
"type": "function_call",
|
|
"name": payload.get("name", ""),
|
|
"arguments": arguments_str,
|
|
"call_id": call_id,
|
|
})
|
|
await _write_event("response.output_item.added", {
|
|
"type": "response.output_item.added",
|
|
"output_index": idx,
|
|
"item": item,
|
|
})
|
|
return call_id
|
|
|
|
async def _emit_tool_completed(payload: Dict[str, Any]) -> None:
|
|
"""Emit response.output_item.done (function_call) followed
|
|
by response.output_item.added (function_call_output)."""
|
|
nonlocal output_index
|
|
call_id = payload.get("tool_call_id")
|
|
result = payload.get("result", "")
|
|
pending = None
|
|
if call_id:
|
|
for i, p in enumerate(pending_tool_calls):
|
|
if p["call_id"] == call_id:
|
|
pending = pending_tool_calls.pop(i)
|
|
break
|
|
if pending is None:
|
|
# Completion without a matching start — skip to avoid
|
|
# emitting orphaned done events.
|
|
return
|
|
|
|
# function_call done
|
|
done_item = {
|
|
"id": pending["item_id"],
|
|
"type": "function_call",
|
|
"status": "completed",
|
|
"name": pending["name"],
|
|
"call_id": pending["call_id"],
|
|
"arguments": pending["arguments"],
|
|
}
|
|
await _write_event("response.output_item.done", {
|
|
"type": "response.output_item.done",
|
|
"output_index": pending["output_index"],
|
|
"item": done_item,
|
|
})
|
|
|
|
# function_call_output added (result)
|
|
result_str = result if isinstance(result, str) else json.dumps(result)
|
|
output_parts = [{"type": "input_text", "text": result_str}]
|
|
output_item = {
|
|
"id": f"fco_{uuid.uuid4().hex[:24]}",
|
|
"type": "function_call_output",
|
|
"call_id": pending["call_id"],
|
|
"output": output_parts,
|
|
"status": "completed",
|
|
}
|
|
idx = output_index
|
|
output_index += 1
|
|
emitted_items.append({
|
|
"type": "function_call_output",
|
|
"call_id": pending["call_id"],
|
|
"output": output_parts,
|
|
})
|
|
await _write_event("response.output_item.added", {
|
|
"type": "response.output_item.added",
|
|
"output_index": idx,
|
|
"item": output_item,
|
|
})
|
|
await _write_event("response.output_item.done", {
|
|
"type": "response.output_item.done",
|
|
"output_index": idx,
|
|
"item": output_item,
|
|
})
|
|
|
|
# Main drain loop — thread-safe queue fed by agent callbacks.
|
|
async def _dispatch(it) -> None:
|
|
"""Route a queue item to the correct SSE emitter.
|
|
|
|
Plain strings are text deltas. Tagged tuples with
|
|
``__tool_started__`` / ``__tool_completed__`` prefixes
|
|
are tool lifecycle events.
|
|
"""
|
|
if isinstance(it, tuple) and len(it) == 2 and isinstance(it[0], str):
|
|
tag, payload = it
|
|
if tag == "__tool_started__":
|
|
await _emit_tool_started(payload)
|
|
elif tag == "__tool_completed__":
|
|
await _emit_tool_completed(payload)
|
|
# Unknown tags are silently ignored (forward-compat).
|
|
elif isinstance(it, str):
|
|
await _emit_text_delta(it)
|
|
# Other types (non-string, non-tuple) are silently dropped.
|
|
|
|
loop = asyncio.get_running_loop()
|
|
while True:
|
|
try:
|
|
item = await loop.run_in_executor(None, lambda: stream_q.get(timeout=0.5))
|
|
except _q.Empty:
|
|
if agent_task.done():
|
|
# Drain remaining
|
|
while True:
|
|
try:
|
|
item = stream_q.get_nowait()
|
|
if item is None:
|
|
break
|
|
await _dispatch(item)
|
|
last_activity = time.monotonic()
|
|
except _q.Empty:
|
|
break
|
|
break
|
|
if time.monotonic() - last_activity >= CHAT_COMPLETIONS_SSE_KEEPALIVE_SECONDS:
|
|
await response.write(b": keepalive\n\n")
|
|
last_activity = time.monotonic()
|
|
continue
|
|
|
|
if item is None: # EOS sentinel
|
|
break
|
|
|
|
await _dispatch(item)
|
|
last_activity = time.monotonic()
|
|
|
|
# Pick up agent result + usage from the completed task
|
|
try:
|
|
result, agent_usage = await agent_task
|
|
usage = agent_usage or usage
|
|
# If the agent produced a final_response but no text
|
|
# deltas were streamed (e.g. some providers only emit
|
|
# the full response at the end), emit a single fallback
|
|
# delta so Responses clients still receive a live text part.
|
|
agent_final = result.get("final_response", "") if isinstance(result, dict) else ""
|
|
if agent_final and not final_text_parts:
|
|
await _emit_text_delta(agent_final)
|
|
if agent_final and not final_response_text:
|
|
final_response_text = agent_final
|
|
if isinstance(result, dict) and result.get("error") and not final_response_text:
|
|
agent_error = result["error"]
|
|
except Exception as e: # noqa: BLE001
|
|
logger.error("Error running agent for streaming responses: %s", e, exc_info=True)
|
|
agent_error = str(e)
|
|
|
|
# Close the message item if it was opened
|
|
final_response_text = "".join(final_text_parts) or final_response_text
|
|
if message_opened:
|
|
await _write_event("response.output_text.done", {
|
|
"type": "response.output_text.done",
|
|
"item_id": message_item_id,
|
|
"output_index": message_output_index,
|
|
"content_index": 0,
|
|
"text": final_response_text,
|
|
"logprobs": [],
|
|
})
|
|
msg_done_item = {
|
|
"id": message_item_id,
|
|
"type": "message",
|
|
"status": "completed",
|
|
"role": "assistant",
|
|
"content": [
|
|
{"type": "output_text", "text": final_response_text}
|
|
],
|
|
}
|
|
await _write_event("response.output_item.done", {
|
|
"type": "response.output_item.done",
|
|
"output_index": message_output_index,
|
|
"item": msg_done_item,
|
|
})
|
|
|
|
# Always append a final message item in the completed
|
|
# response envelope so clients that only parse the terminal
|
|
# payload still see the assistant text. This mirrors the
|
|
# shape produced by _extract_output_items in the batch path.
|
|
final_items: List[Dict[str, Any]] = list(emitted_items)
|
|
final_items.append({
|
|
"type": "message",
|
|
"role": "assistant",
|
|
"content": [
|
|
{"type": "output_text", "text": final_response_text or (agent_error or "")}
|
|
],
|
|
})
|
|
|
|
if agent_error:
|
|
failed_env = _envelope("failed")
|
|
failed_env["output"] = final_items
|
|
failed_env["error"] = {"message": agent_error, "type": "server_error"}
|
|
failed_env["usage"] = {
|
|
"input_tokens": usage.get("input_tokens", 0),
|
|
"output_tokens": usage.get("output_tokens", 0),
|
|
"total_tokens": usage.get("total_tokens", 0),
|
|
}
|
|
await _write_event("response.failed", {
|
|
"type": "response.failed",
|
|
"response": failed_env,
|
|
})
|
|
else:
|
|
completed_env = _envelope("completed")
|
|
completed_env["output"] = final_items
|
|
completed_env["usage"] = {
|
|
"input_tokens": usage.get("input_tokens", 0),
|
|
"output_tokens": usage.get("output_tokens", 0),
|
|
"total_tokens": usage.get("total_tokens", 0),
|
|
}
|
|
await _write_event("response.completed", {
|
|
"type": "response.completed",
|
|
"response": completed_env,
|
|
})
|
|
|
|
# Persist for future chaining / GET retrieval, mirroring
|
|
# the batch path behavior.
|
|
if store:
|
|
full_history = list(conversation_history)
|
|
full_history.append({"role": "user", "content": user_message})
|
|
if isinstance(result, dict) and result.get("messages"):
|
|
full_history.extend(result["messages"])
|
|
else:
|
|
full_history.append({"role": "assistant", "content": final_response_text})
|
|
self._response_store.put(response_id, {
|
|
"response": completed_env,
|
|
"conversation_history": full_history,
|
|
"instructions": instructions,
|
|
"session_id": session_id,
|
|
})
|
|
if conversation:
|
|
self._response_store.set_conversation(conversation, response_id)
|
|
|
|
except (ConnectionResetError, ConnectionAbortedError, BrokenPipeError, OSError):
|
|
# Client disconnected — interrupt the agent so it stops
|
|
# making upstream LLM calls, then cancel the task.
|
|
agent = agent_ref[0] if agent_ref else None
|
|
if agent is not None:
|
|
try:
|
|
agent.interrupt("SSE client disconnected")
|
|
except Exception:
|
|
pass
|
|
if not agent_task.done():
|
|
agent_task.cancel()
|
|
try:
|
|
await agent_task
|
|
except (asyncio.CancelledError, Exception):
|
|
pass
|
|
logger.info("SSE client disconnected; interrupted agent task %s", response_id)
|
|
|
|
return response
|
|
|
|
async def _handle_responses(self, request: "web.Request") -> "web.StreamResponse":
|
|
"""POST /v1/responses — OpenAI Responses API format."""
|
|
auth_err = self._check_auth(request)
|
|
if auth_err:
|
|
return auth_err
|
|
|
|
# Parse request body
|
|
try:
|
|
body = await request.json()
|
|
except (json.JSONDecodeError, Exception):
|
|
return web.json_response(
|
|
{"error": {"message": "Invalid JSON in request body", "type": "invalid_request_error"}},
|
|
status=400,
|
|
)
|
|
|
|
raw_input = body.get("input")
|
|
if raw_input is None:
|
|
return web.json_response(_openai_error("Missing 'input' field"), status=400)
|
|
|
|
instructions = body.get("instructions")
|
|
previous_response_id = body.get("previous_response_id")
|
|
conversation = body.get("conversation")
|
|
store = body.get("store", True)
|
|
|
|
# conversation and previous_response_id are mutually exclusive
|
|
if conversation and previous_response_id:
|
|
return web.json_response(_openai_error("Cannot use both 'conversation' and 'previous_response_id'"), status=400)
|
|
|
|
# Resolve conversation name to latest response_id
|
|
if conversation:
|
|
previous_response_id = self._response_store.get_conversation(conversation)
|
|
# No error if conversation doesn't exist yet — it's a new conversation
|
|
|
|
# Normalize input to message list
|
|
input_messages: List[Dict[str, Any]] = []
|
|
if isinstance(raw_input, str):
|
|
input_messages = [{"role": "user", "content": raw_input}]
|
|
elif isinstance(raw_input, list):
|
|
for idx, item in enumerate(raw_input):
|
|
if isinstance(item, str):
|
|
input_messages.append({"role": "user", "content": item})
|
|
elif isinstance(item, dict):
|
|
role = item.get("role", "user")
|
|
try:
|
|
content = _normalize_multimodal_content(item.get("content", ""))
|
|
except ValueError as exc:
|
|
return _multimodal_validation_error(exc, param=f"input[{idx}].content")
|
|
input_messages.append({"role": role, "content": content})
|
|
else:
|
|
return web.json_response(_openai_error("'input' must be a string or array"), status=400)
|
|
|
|
# Accept explicit conversation_history from the request body.
|
|
# This lets stateless clients supply their own history instead of
|
|
# relying on server-side response chaining via previous_response_id.
|
|
# Precedence: explicit conversation_history > previous_response_id.
|
|
conversation_history: List[Dict[str, Any]] = []
|
|
raw_history = body.get("conversation_history")
|
|
if raw_history:
|
|
if not isinstance(raw_history, list):
|
|
return web.json_response(
|
|
_openai_error("'conversation_history' must be an array of message objects"),
|
|
status=400,
|
|
)
|
|
for i, entry in enumerate(raw_history):
|
|
if not isinstance(entry, dict) or "role" not in entry or "content" not in entry:
|
|
return web.json_response(
|
|
_openai_error(f"conversation_history[{i}] must have 'role' and 'content' fields"),
|
|
status=400,
|
|
)
|
|
try:
|
|
entry_content = _normalize_multimodal_content(entry["content"])
|
|
except ValueError as exc:
|
|
return _multimodal_validation_error(exc, param=f"conversation_history[{i}].content")
|
|
conversation_history.append({"role": str(entry["role"]), "content": entry_content})
|
|
if previous_response_id:
|
|
logger.debug("Both conversation_history and previous_response_id provided; using conversation_history")
|
|
|
|
stored_session_id = None
|
|
if not conversation_history and previous_response_id:
|
|
stored = self._response_store.get(previous_response_id)
|
|
if stored is None:
|
|
return web.json_response(_openai_error(f"Previous response not found: {previous_response_id}"), status=404)
|
|
conversation_history = list(stored.get("conversation_history", []))
|
|
stored_session_id = stored.get("session_id")
|
|
# If no instructions provided, carry forward from previous
|
|
if instructions is None:
|
|
instructions = stored.get("instructions")
|
|
|
|
# Append new input messages to history (all but the last become history)
|
|
for msg in input_messages[:-1]:
|
|
conversation_history.append(msg)
|
|
|
|
# Last input message is the user_message
|
|
user_message: Any = input_messages[-1].get("content", "") if input_messages else ""
|
|
if not _content_has_visible_payload(user_message):
|
|
return web.json_response(_openai_error("No user message found in input"), status=400)
|
|
|
|
# Truncation support
|
|
if body.get("truncation") == "auto" and len(conversation_history) > 100:
|
|
conversation_history = conversation_history[-100:]
|
|
|
|
# Reuse session from previous_response_id chain so the dashboard
|
|
# groups the entire conversation under one session entry.
|
|
session_id = stored_session_id or str(uuid.uuid4())
|
|
|
|
stream = bool(body.get("stream", False))
|
|
if stream:
|
|
# Streaming branch — emit OpenAI Responses SSE events as the
|
|
# agent runs so frontends can render text deltas and tool
|
|
# calls in real time. See _write_sse_responses for details.
|
|
import queue as _q
|
|
_stream_q: _q.Queue = _q.Queue()
|
|
|
|
def _on_delta(delta):
|
|
# None from the agent is a CLI box-close signal, not EOS.
|
|
# Forwarding would kill the SSE stream prematurely; the
|
|
# SSE writer detects completion via agent_task.done().
|
|
if delta is not None:
|
|
_stream_q.put(delta)
|
|
|
|
def _on_tool_progress(event_type, name, preview, args, **kwargs):
|
|
"""Queue non-start tool progress events if needed in future.
|
|
|
|
The structured Responses stream uses ``tool_start_callback``
|
|
and ``tool_complete_callback`` for exact call-id correlation,
|
|
so progress events are currently ignored here.
|
|
"""
|
|
return
|
|
|
|
def _on_tool_start(tool_call_id, function_name, function_args):
|
|
"""Queue a started tool for live function_call streaming."""
|
|
_stream_q.put(("__tool_started__", {
|
|
"tool_call_id": tool_call_id,
|
|
"name": function_name,
|
|
"arguments": function_args or {},
|
|
}))
|
|
|
|
def _on_tool_complete(tool_call_id, function_name, function_args, function_result):
|
|
"""Queue a completed tool result for live function_call_output streaming."""
|
|
_stream_q.put(("__tool_completed__", {
|
|
"tool_call_id": tool_call_id,
|
|
"name": function_name,
|
|
"arguments": function_args or {},
|
|
"result": function_result,
|
|
}))
|
|
|
|
agent_ref = [None]
|
|
agent_task = asyncio.ensure_future(self._run_agent(
|
|
user_message=user_message,
|
|
conversation_history=conversation_history,
|
|
ephemeral_system_prompt=instructions,
|
|
session_id=session_id,
|
|
stream_delta_callback=_on_delta,
|
|
tool_progress_callback=_on_tool_progress,
|
|
tool_start_callback=_on_tool_start,
|
|
tool_complete_callback=_on_tool_complete,
|
|
agent_ref=agent_ref,
|
|
))
|
|
|
|
response_id = f"resp_{uuid.uuid4().hex[:28]}"
|
|
model_name = body.get("model", self._model_name)
|
|
created_at = int(time.time())
|
|
|
|
return await self._write_sse_responses(
|
|
request=request,
|
|
response_id=response_id,
|
|
model=model_name,
|
|
created_at=created_at,
|
|
stream_q=_stream_q,
|
|
agent_task=agent_task,
|
|
agent_ref=agent_ref,
|
|
conversation_history=conversation_history,
|
|
user_message=user_message,
|
|
instructions=instructions,
|
|
conversation=conversation,
|
|
store=store,
|
|
session_id=session_id,
|
|
)
|
|
|
|
async def _compute_response():
|
|
return await self._run_agent(
|
|
user_message=user_message,
|
|
conversation_history=conversation_history,
|
|
ephemeral_system_prompt=instructions,
|
|
session_id=session_id,
|
|
)
|
|
|
|
idempotency_key = request.headers.get("Idempotency-Key")
|
|
if idempotency_key:
|
|
fp = _make_request_fingerprint(
|
|
body,
|
|
keys=["input", "instructions", "previous_response_id", "conversation", "model", "tools"],
|
|
)
|
|
try:
|
|
result, usage = await _idem_cache.get_or_set(idempotency_key, fp, _compute_response)
|
|
except Exception as e:
|
|
logger.error("Error running agent for responses: %s", e, exc_info=True)
|
|
return web.json_response(
|
|
_openai_error(f"Internal server error: {e}", err_type="server_error"),
|
|
status=500,
|
|
)
|
|
else:
|
|
try:
|
|
result, usage = await _compute_response()
|
|
except Exception as e:
|
|
logger.error("Error running agent for responses: %s", e, exc_info=True)
|
|
return web.json_response(
|
|
_openai_error(f"Internal server error: {e}", err_type="server_error"),
|
|
status=500,
|
|
)
|
|
|
|
final_response = result.get("final_response", "")
|
|
if not final_response:
|
|
final_response = result.get("error", "(No response generated)")
|
|
|
|
response_id = f"resp_{uuid.uuid4().hex[:28]}"
|
|
created_at = int(time.time())
|
|
|
|
# Build the full conversation history for storage
|
|
# (includes tool calls from the agent run)
|
|
full_history = list(conversation_history)
|
|
full_history.append({"role": "user", "content": user_message})
|
|
# Add agent's internal messages if available
|
|
agent_messages = result.get("messages", [])
|
|
if agent_messages:
|
|
full_history.extend(agent_messages)
|
|
else:
|
|
full_history.append({"role": "assistant", "content": final_response})
|
|
|
|
# Build output items (includes tool calls + final message)
|
|
output_items = self._extract_output_items(result)
|
|
|
|
response_data = {
|
|
"id": response_id,
|
|
"object": "response",
|
|
"status": "completed",
|
|
"created_at": created_at,
|
|
"model": body.get("model", self._model_name),
|
|
"output": output_items,
|
|
"usage": {
|
|
"input_tokens": usage.get("input_tokens", 0),
|
|
"output_tokens": usage.get("output_tokens", 0),
|
|
"total_tokens": usage.get("total_tokens", 0),
|
|
},
|
|
}
|
|
|
|
# Store the complete response object for future chaining / GET retrieval
|
|
if store:
|
|
self._response_store.put(response_id, {
|
|
"response": response_data,
|
|
"conversation_history": full_history,
|
|
"instructions": instructions,
|
|
"session_id": session_id,
|
|
})
|
|
# Update conversation mapping so the next request with the same
|
|
# conversation name automatically chains to this response
|
|
if conversation:
|
|
self._response_store.set_conversation(conversation, response_id)
|
|
|
|
return web.json_response(response_data)
|
|
|
|
# ------------------------------------------------------------------
|
|
# GET / DELETE response endpoints
|
|
# ------------------------------------------------------------------
|
|
|
|
async def _handle_get_response(self, request: "web.Request") -> "web.Response":
|
|
"""GET /v1/responses/{response_id} — retrieve a stored response."""
|
|
auth_err = self._check_auth(request)
|
|
if auth_err:
|
|
return auth_err
|
|
|
|
response_id = request.match_info["response_id"]
|
|
stored = self._response_store.get(response_id)
|
|
if stored is None:
|
|
return web.json_response(_openai_error(f"Response not found: {response_id}"), status=404)
|
|
|
|
return web.json_response(stored["response"])
|
|
|
|
async def _handle_delete_response(self, request: "web.Request") -> "web.Response":
|
|
"""DELETE /v1/responses/{response_id} — delete a stored response."""
|
|
auth_err = self._check_auth(request)
|
|
if auth_err:
|
|
return auth_err
|
|
|
|
response_id = request.match_info["response_id"]
|
|
deleted = self._response_store.delete(response_id)
|
|
if not deleted:
|
|
return web.json_response(_openai_error(f"Response not found: {response_id}"), status=404)
|
|
|
|
return web.json_response({
|
|
"id": response_id,
|
|
"object": "response",
|
|
"deleted": True,
|
|
})
|
|
|
|
# ------------------------------------------------------------------
|
|
# Cron jobs API
|
|
# ------------------------------------------------------------------
|
|
|
|
_JOB_ID_RE = __import__("re").compile(r"[a-f0-9]{12}")
|
|
# Allowed fields for update — prevents clients injecting arbitrary keys
|
|
_UPDATE_ALLOWED_FIELDS = {"name", "schedule", "prompt", "deliver", "skills", "skill", "repeat", "enabled"}
|
|
_MAX_NAME_LENGTH = 200
|
|
_MAX_PROMPT_LENGTH = 5000
|
|
|
|
@staticmethod
|
|
def _check_jobs_available() -> Optional["web.Response"]:
|
|
"""Return error response if cron module isn't available."""
|
|
if not _CRON_AVAILABLE:
|
|
return web.json_response(
|
|
{"error": "Cron module not available"}, status=501,
|
|
)
|
|
return None
|
|
|
|
def _check_job_id(self, request: "web.Request") -> tuple:
|
|
"""Validate and extract job_id. Returns (job_id, error_response)."""
|
|
job_id = request.match_info["job_id"]
|
|
if not self._JOB_ID_RE.fullmatch(job_id):
|
|
return job_id, web.json_response(
|
|
{"error": "Invalid job ID format"}, status=400,
|
|
)
|
|
return job_id, None
|
|
|
|
async def _handle_list_jobs(self, request: "web.Request") -> "web.Response":
|
|
"""GET /api/jobs — list all cron jobs."""
|
|
auth_err = self._check_auth(request)
|
|
if auth_err:
|
|
return auth_err
|
|
cron_err = self._check_jobs_available()
|
|
if cron_err:
|
|
return cron_err
|
|
try:
|
|
include_disabled = request.query.get("include_disabled", "").lower() in ("true", "1")
|
|
jobs = _cron_list(include_disabled=include_disabled)
|
|
return web.json_response({"jobs": jobs})
|
|
except Exception as e:
|
|
return web.json_response({"error": str(e)}, status=500)
|
|
|
|
async def _handle_create_job(self, request: "web.Request") -> "web.Response":
|
|
"""POST /api/jobs — create a new cron job."""
|
|
auth_err = self._check_auth(request)
|
|
if auth_err:
|
|
return auth_err
|
|
cron_err = self._check_jobs_available()
|
|
if cron_err:
|
|
return cron_err
|
|
try:
|
|
body = await request.json()
|
|
name = (body.get("name") or "").strip()
|
|
schedule = (body.get("schedule") or "").strip()
|
|
prompt = body.get("prompt", "")
|
|
deliver = body.get("deliver", "local")
|
|
skills = body.get("skills")
|
|
repeat = body.get("repeat")
|
|
|
|
if not name:
|
|
return web.json_response({"error": "Name is required"}, status=400)
|
|
if len(name) > self._MAX_NAME_LENGTH:
|
|
return web.json_response(
|
|
{"error": f"Name must be ≤ {self._MAX_NAME_LENGTH} characters"}, status=400,
|
|
)
|
|
if not schedule:
|
|
return web.json_response({"error": "Schedule is required"}, status=400)
|
|
if len(prompt) > self._MAX_PROMPT_LENGTH:
|
|
return web.json_response(
|
|
{"error": f"Prompt must be ≤ {self._MAX_PROMPT_LENGTH} characters"}, status=400,
|
|
)
|
|
if repeat is not None and (not isinstance(repeat, int) or repeat < 1):
|
|
return web.json_response({"error": "Repeat must be a positive integer"}, status=400)
|
|
|
|
kwargs = {
|
|
"prompt": prompt,
|
|
"schedule": schedule,
|
|
"name": name,
|
|
"deliver": deliver,
|
|
}
|
|
if skills:
|
|
kwargs["skills"] = skills
|
|
if repeat is not None:
|
|
kwargs["repeat"] = repeat
|
|
|
|
job = _cron_create(**kwargs)
|
|
return web.json_response({"job": job})
|
|
except Exception as e:
|
|
return web.json_response({"error": str(e)}, status=500)
|
|
|
|
async def _handle_get_job(self, request: "web.Request") -> "web.Response":
|
|
"""GET /api/jobs/{job_id} — get a single cron job."""
|
|
auth_err = self._check_auth(request)
|
|
if auth_err:
|
|
return auth_err
|
|
cron_err = self._check_jobs_available()
|
|
if cron_err:
|
|
return cron_err
|
|
job_id, id_err = self._check_job_id(request)
|
|
if id_err:
|
|
return id_err
|
|
try:
|
|
job = _cron_get(job_id)
|
|
if not job:
|
|
return web.json_response({"error": "Job not found"}, status=404)
|
|
return web.json_response({"job": job})
|
|
except Exception as e:
|
|
return web.json_response({"error": str(e)}, status=500)
|
|
|
|
async def _handle_update_job(self, request: "web.Request") -> "web.Response":
|
|
"""PATCH /api/jobs/{job_id} — update a cron job."""
|
|
auth_err = self._check_auth(request)
|
|
if auth_err:
|
|
return auth_err
|
|
cron_err = self._check_jobs_available()
|
|
if cron_err:
|
|
return cron_err
|
|
job_id, id_err = self._check_job_id(request)
|
|
if id_err:
|
|
return id_err
|
|
try:
|
|
body = await request.json()
|
|
# Whitelist allowed fields to prevent arbitrary key injection
|
|
sanitized = {k: v for k, v in body.items() if k in self._UPDATE_ALLOWED_FIELDS}
|
|
if not sanitized:
|
|
return web.json_response({"error": "No valid fields to update"}, status=400)
|
|
# Validate lengths if present
|
|
if "name" in sanitized and len(sanitized["name"]) > self._MAX_NAME_LENGTH:
|
|
return web.json_response(
|
|
{"error": f"Name must be ≤ {self._MAX_NAME_LENGTH} characters"}, status=400,
|
|
)
|
|
if "prompt" in sanitized and len(sanitized["prompt"]) > self._MAX_PROMPT_LENGTH:
|
|
return web.json_response(
|
|
{"error": f"Prompt must be ≤ {self._MAX_PROMPT_LENGTH} characters"}, status=400,
|
|
)
|
|
job = _cron_update(job_id, sanitized)
|
|
if not job:
|
|
return web.json_response({"error": "Job not found"}, status=404)
|
|
return web.json_response({"job": job})
|
|
except Exception as e:
|
|
return web.json_response({"error": str(e)}, status=500)
|
|
|
|
async def _handle_delete_job(self, request: "web.Request") -> "web.Response":
|
|
"""DELETE /api/jobs/{job_id} — delete a cron job."""
|
|
auth_err = self._check_auth(request)
|
|
if auth_err:
|
|
return auth_err
|
|
cron_err = self._check_jobs_available()
|
|
if cron_err:
|
|
return cron_err
|
|
job_id, id_err = self._check_job_id(request)
|
|
if id_err:
|
|
return id_err
|
|
try:
|
|
success = _cron_remove(job_id)
|
|
if not success:
|
|
return web.json_response({"error": "Job not found"}, status=404)
|
|
return web.json_response({"ok": True})
|
|
except Exception as e:
|
|
return web.json_response({"error": str(e)}, status=500)
|
|
|
|
async def _handle_pause_job(self, request: "web.Request") -> "web.Response":
|
|
"""POST /api/jobs/{job_id}/pause — pause a cron job."""
|
|
auth_err = self._check_auth(request)
|
|
if auth_err:
|
|
return auth_err
|
|
cron_err = self._check_jobs_available()
|
|
if cron_err:
|
|
return cron_err
|
|
job_id, id_err = self._check_job_id(request)
|
|
if id_err:
|
|
return id_err
|
|
try:
|
|
job = _cron_pause(job_id)
|
|
if not job:
|
|
return web.json_response({"error": "Job not found"}, status=404)
|
|
return web.json_response({"job": job})
|
|
except Exception as e:
|
|
return web.json_response({"error": str(e)}, status=500)
|
|
|
|
async def _handle_resume_job(self, request: "web.Request") -> "web.Response":
|
|
"""POST /api/jobs/{job_id}/resume — resume a paused cron job."""
|
|
auth_err = self._check_auth(request)
|
|
if auth_err:
|
|
return auth_err
|
|
cron_err = self._check_jobs_available()
|
|
if cron_err:
|
|
return cron_err
|
|
job_id, id_err = self._check_job_id(request)
|
|
if id_err:
|
|
return id_err
|
|
try:
|
|
job = _cron_resume(job_id)
|
|
if not job:
|
|
return web.json_response({"error": "Job not found"}, status=404)
|
|
return web.json_response({"job": job})
|
|
except Exception as e:
|
|
return web.json_response({"error": str(e)}, status=500)
|
|
|
|
async def _handle_run_job(self, request: "web.Request") -> "web.Response":
|
|
"""POST /api/jobs/{job_id}/run — trigger immediate execution."""
|
|
auth_err = self._check_auth(request)
|
|
if auth_err:
|
|
return auth_err
|
|
cron_err = self._check_jobs_available()
|
|
if cron_err:
|
|
return cron_err
|
|
job_id, id_err = self._check_job_id(request)
|
|
if id_err:
|
|
return id_err
|
|
try:
|
|
job = _cron_trigger(job_id)
|
|
if not job:
|
|
return web.json_response({"error": "Job not found"}, status=404)
|
|
return web.json_response({"job": job})
|
|
except Exception as e:
|
|
return web.json_response({"error": str(e)}, status=500)
|
|
|
|
# ------------------------------------------------------------------
|
|
# Output extraction helper
|
|
# ------------------------------------------------------------------
|
|
|
|
@staticmethod
|
|
def _extract_output_items(result: Dict[str, Any]) -> List[Dict[str, Any]]:
|
|
"""
|
|
Build the full output item array from the agent's messages.
|
|
|
|
Walks *result["messages"]* and emits:
|
|
- ``function_call`` items for each tool_call on assistant messages
|
|
- ``function_call_output`` items for each tool-role message
|
|
- a final ``message`` item with the assistant's text reply
|
|
"""
|
|
items: List[Dict[str, Any]] = []
|
|
messages = result.get("messages", [])
|
|
|
|
for msg in messages:
|
|
role = msg.get("role")
|
|
if role == "assistant" and msg.get("tool_calls"):
|
|
for tc in msg["tool_calls"]:
|
|
func = tc.get("function", {})
|
|
items.append({
|
|
"type": "function_call",
|
|
"name": func.get("name", ""),
|
|
"arguments": func.get("arguments", ""),
|
|
"call_id": tc.get("id", ""),
|
|
})
|
|
elif role == "tool":
|
|
items.append({
|
|
"type": "function_call_output",
|
|
"call_id": msg.get("tool_call_id", ""),
|
|
"output": msg.get("content", ""),
|
|
})
|
|
|
|
# Final assistant message
|
|
final = result.get("final_response", "")
|
|
if not final:
|
|
final = result.get("error", "(No response generated)")
|
|
|
|
items.append({
|
|
"type": "message",
|
|
"role": "assistant",
|
|
"content": [
|
|
{
|
|
"type": "output_text",
|
|
"text": final,
|
|
}
|
|
],
|
|
})
|
|
return items
|
|
|
|
# ------------------------------------------------------------------
|
|
# Agent execution
|
|
# ------------------------------------------------------------------
|
|
|
|
async def _run_agent(
|
|
self,
|
|
user_message: str,
|
|
conversation_history: List[Dict[str, str]],
|
|
ephemeral_system_prompt: Optional[str] = None,
|
|
session_id: Optional[str] = None,
|
|
stream_delta_callback=None,
|
|
tool_progress_callback=None,
|
|
tool_start_callback=None,
|
|
tool_complete_callback=None,
|
|
agent_ref: Optional[list] = None,
|
|
) -> tuple:
|
|
"""
|
|
Create an agent and run a conversation in a thread executor.
|
|
|
|
Returns ``(result_dict, usage_dict)`` where *usage_dict* contains
|
|
``input_tokens``, ``output_tokens`` and ``total_tokens``.
|
|
|
|
If *agent_ref* is a one-element list, the AIAgent instance is stored
|
|
at ``agent_ref[0]`` before ``run_conversation`` begins. This allows
|
|
callers (e.g. the SSE writer) to call ``agent.interrupt()`` from
|
|
another thread to stop in-progress LLM calls.
|
|
"""
|
|
loop = asyncio.get_running_loop()
|
|
|
|
def _run():
|
|
agent = self._create_agent(
|
|
ephemeral_system_prompt=ephemeral_system_prompt,
|
|
session_id=session_id,
|
|
stream_delta_callback=stream_delta_callback,
|
|
tool_progress_callback=tool_progress_callback,
|
|
tool_start_callback=tool_start_callback,
|
|
tool_complete_callback=tool_complete_callback,
|
|
)
|
|
if agent_ref is not None:
|
|
agent_ref[0] = agent
|
|
result = agent.run_conversation(
|
|
user_message=user_message,
|
|
conversation_history=conversation_history,
|
|
task_id="default",
|
|
)
|
|
usage = {
|
|
"input_tokens": getattr(agent, "session_prompt_tokens", 0) or 0,
|
|
"output_tokens": getattr(agent, "session_completion_tokens", 0) or 0,
|
|
"total_tokens": getattr(agent, "session_total_tokens", 0) or 0,
|
|
}
|
|
return result, usage
|
|
|
|
return await loop.run_in_executor(None, _run)
|
|
|
|
# ------------------------------------------------------------------
|
|
# /v1/runs — structured event streaming
|
|
# ------------------------------------------------------------------
|
|
|
|
_MAX_CONCURRENT_RUNS = 10 # Prevent unbounded resource allocation
|
|
_RUN_STREAM_TTL = 300 # seconds before orphaned runs are swept
|
|
|
|
def _make_run_event_callback(self, run_id: str, loop: "asyncio.AbstractEventLoop"):
|
|
"""Return a tool_progress_callback that pushes structured events to the run's SSE queue."""
|
|
def _push(event: Dict[str, Any]) -> None:
|
|
q = self._run_streams.get(run_id)
|
|
if q is None:
|
|
return
|
|
try:
|
|
loop.call_soon_threadsafe(q.put_nowait, event)
|
|
except Exception:
|
|
pass
|
|
|
|
def _callback(event_type: str, tool_name: str = None, preview: str = None, args=None, **kwargs):
|
|
ts = time.time()
|
|
if event_type == "tool.started":
|
|
_push({
|
|
"event": "tool.started",
|
|
"run_id": run_id,
|
|
"timestamp": ts,
|
|
"tool": tool_name,
|
|
"preview": preview,
|
|
})
|
|
elif event_type == "tool.completed":
|
|
_push({
|
|
"event": "tool.completed",
|
|
"run_id": run_id,
|
|
"timestamp": ts,
|
|
"tool": tool_name,
|
|
"duration": round(kwargs.get("duration", 0), 3),
|
|
"error": kwargs.get("is_error", False),
|
|
})
|
|
elif event_type == "reasoning.available":
|
|
_push({
|
|
"event": "reasoning.available",
|
|
"run_id": run_id,
|
|
"timestamp": ts,
|
|
"text": preview or "",
|
|
})
|
|
# _thinking and subagent_progress are intentionally not forwarded
|
|
|
|
return _callback
|
|
|
|
async def _handle_runs(self, request: "web.Request") -> "web.Response":
|
|
"""POST /v1/runs — start an agent run, return run_id immediately."""
|
|
auth_err = self._check_auth(request)
|
|
if auth_err:
|
|
return auth_err
|
|
|
|
# Enforce concurrency limit
|
|
if len(self._run_streams) >= self._MAX_CONCURRENT_RUNS:
|
|
return web.json_response(
|
|
_openai_error(f"Too many concurrent runs (max {self._MAX_CONCURRENT_RUNS})", code="rate_limit_exceeded"),
|
|
status=429,
|
|
)
|
|
|
|
try:
|
|
body = await request.json()
|
|
except Exception:
|
|
return web.json_response(_openai_error("Invalid JSON"), status=400)
|
|
|
|
raw_input = body.get("input")
|
|
if not raw_input:
|
|
return web.json_response(_openai_error("Missing 'input' field"), status=400)
|
|
|
|
user_message = raw_input if isinstance(raw_input, str) else (raw_input[-1].get("content", "") if isinstance(raw_input, list) else "")
|
|
if not user_message:
|
|
return web.json_response(_openai_error("No user message found in input"), status=400)
|
|
|
|
run_id = f"run_{uuid.uuid4().hex}"
|
|
loop = asyncio.get_running_loop()
|
|
q: "asyncio.Queue[Optional[Dict]]" = asyncio.Queue()
|
|
self._run_streams[run_id] = q
|
|
self._run_streams_created[run_id] = time.time()
|
|
|
|
event_cb = self._make_run_event_callback(run_id, loop)
|
|
|
|
# Also wire stream_delta_callback so message.delta events flow through
|
|
def _text_cb(delta: Optional[str]) -> None:
|
|
if delta is None:
|
|
return
|
|
try:
|
|
loop.call_soon_threadsafe(q.put_nowait, {
|
|
"event": "message.delta",
|
|
"run_id": run_id,
|
|
"timestamp": time.time(),
|
|
"delta": delta,
|
|
})
|
|
except Exception:
|
|
pass
|
|
|
|
instructions = body.get("instructions")
|
|
previous_response_id = body.get("previous_response_id")
|
|
|
|
# Accept explicit conversation_history from the request body.
|
|
# Precedence: explicit conversation_history > previous_response_id.
|
|
conversation_history: List[Dict[str, str]] = []
|
|
raw_history = body.get("conversation_history")
|
|
if raw_history:
|
|
if not isinstance(raw_history, list):
|
|
return web.json_response(
|
|
_openai_error("'conversation_history' must be an array of message objects"),
|
|
status=400,
|
|
)
|
|
for i, entry in enumerate(raw_history):
|
|
if not isinstance(entry, dict) or "role" not in entry or "content" not in entry:
|
|
return web.json_response(
|
|
_openai_error(f"conversation_history[{i}] must have 'role' and 'content' fields"),
|
|
status=400,
|
|
)
|
|
conversation_history.append({"role": str(entry["role"]), "content": str(entry["content"])})
|
|
if previous_response_id:
|
|
logger.debug("Both conversation_history and previous_response_id provided; using conversation_history")
|
|
|
|
stored_session_id = None
|
|
if not conversation_history and previous_response_id:
|
|
stored = self._response_store.get(previous_response_id)
|
|
if stored:
|
|
conversation_history = list(stored.get("conversation_history", []))
|
|
stored_session_id = stored.get("session_id")
|
|
if instructions is None:
|
|
instructions = stored.get("instructions")
|
|
|
|
# When input is a multi-message array, extract all but the last
|
|
# message as conversation history (the last becomes user_message).
|
|
# Only fires when no explicit history was provided.
|
|
if not conversation_history and isinstance(raw_input, list) and len(raw_input) > 1:
|
|
for msg in raw_input[:-1]:
|
|
if isinstance(msg, dict) and msg.get("role") and msg.get("content"):
|
|
content = msg["content"]
|
|
if isinstance(content, list):
|
|
# Flatten multi-part content blocks to text
|
|
content = " ".join(
|
|
part.get("text", "") for part in content
|
|
if isinstance(part, dict) and part.get("type") == "text"
|
|
)
|
|
conversation_history.append({"role": msg["role"], "content": str(content)})
|
|
|
|
session_id = body.get("session_id") or stored_session_id or run_id
|
|
ephemeral_system_prompt = instructions
|
|
|
|
async def _run_and_close():
|
|
try:
|
|
agent = self._create_agent(
|
|
ephemeral_system_prompt=ephemeral_system_prompt,
|
|
session_id=session_id,
|
|
stream_delta_callback=_text_cb,
|
|
tool_progress_callback=event_cb,
|
|
)
|
|
def _run_sync():
|
|
r = agent.run_conversation(
|
|
user_message=user_message,
|
|
conversation_history=conversation_history,
|
|
task_id="default",
|
|
)
|
|
u = {
|
|
"input_tokens": getattr(agent, "session_prompt_tokens", 0) or 0,
|
|
"output_tokens": getattr(agent, "session_completion_tokens", 0) or 0,
|
|
"total_tokens": getattr(agent, "session_total_tokens", 0) or 0,
|
|
}
|
|
return r, u
|
|
|
|
result, usage = await asyncio.get_running_loop().run_in_executor(None, _run_sync)
|
|
final_response = result.get("final_response", "") if isinstance(result, dict) else ""
|
|
q.put_nowait({
|
|
"event": "run.completed",
|
|
"run_id": run_id,
|
|
"timestamp": time.time(),
|
|
"output": final_response,
|
|
"usage": usage,
|
|
})
|
|
except Exception as exc:
|
|
logger.exception("[api_server] run %s failed", run_id)
|
|
try:
|
|
q.put_nowait({
|
|
"event": "run.failed",
|
|
"run_id": run_id,
|
|
"timestamp": time.time(),
|
|
"error": str(exc),
|
|
})
|
|
except Exception:
|
|
pass
|
|
finally:
|
|
# Sentinel: signal SSE stream to close
|
|
try:
|
|
q.put_nowait(None)
|
|
except Exception:
|
|
pass
|
|
|
|
task = asyncio.create_task(_run_and_close())
|
|
try:
|
|
self._background_tasks.add(task)
|
|
except TypeError:
|
|
pass
|
|
if hasattr(task, "add_done_callback"):
|
|
task.add_done_callback(self._background_tasks.discard)
|
|
|
|
return web.json_response({"run_id": run_id, "status": "started"}, status=202)
|
|
|
|
async def _handle_run_events(self, request: "web.Request") -> "web.StreamResponse":
|
|
"""GET /v1/runs/{run_id}/events — SSE stream of structured agent lifecycle events."""
|
|
auth_err = self._check_auth(request)
|
|
if auth_err:
|
|
return auth_err
|
|
|
|
run_id = request.match_info["run_id"]
|
|
|
|
# Allow subscribing slightly before the run is registered (race condition window)
|
|
for _ in range(20):
|
|
if run_id in self._run_streams:
|
|
break
|
|
await asyncio.sleep(0.05)
|
|
else:
|
|
return web.json_response(_openai_error(f"Run not found: {run_id}", code="run_not_found"), status=404)
|
|
|
|
q = self._run_streams[run_id]
|
|
|
|
response = web.StreamResponse(
|
|
status=200,
|
|
headers={
|
|
"Content-Type": "text/event-stream",
|
|
"Cache-Control": "no-cache",
|
|
"X-Accel-Buffering": "no",
|
|
},
|
|
)
|
|
await response.prepare(request)
|
|
|
|
try:
|
|
while True:
|
|
try:
|
|
event = await asyncio.wait_for(q.get(), timeout=30.0)
|
|
except asyncio.TimeoutError:
|
|
await response.write(b": keepalive\n\n")
|
|
continue
|
|
if event is None:
|
|
# Run finished — send final SSE comment and close
|
|
await response.write(b": stream closed\n\n")
|
|
break
|
|
payload = f"data: {json.dumps(event)}\n\n"
|
|
await response.write(payload.encode())
|
|
except Exception as exc:
|
|
logger.debug("[api_server] SSE stream error for run %s: %s", run_id, exc)
|
|
finally:
|
|
self._run_streams.pop(run_id, None)
|
|
self._run_streams_created.pop(run_id, None)
|
|
|
|
return response
|
|
|
|
async def _sweep_orphaned_runs(self) -> None:
|
|
"""Periodically clean up run streams that were never consumed."""
|
|
while True:
|
|
await asyncio.sleep(60)
|
|
now = time.time()
|
|
stale = [
|
|
run_id
|
|
for run_id, created_at in list(self._run_streams_created.items())
|
|
if now - created_at > self._RUN_STREAM_TTL
|
|
]
|
|
for run_id in stale:
|
|
logger.debug("[api_server] sweeping orphaned run %s", run_id)
|
|
self._run_streams.pop(run_id, None)
|
|
self._run_streams_created.pop(run_id, None)
|
|
|
|
# ------------------------------------------------------------------
|
|
# BasePlatformAdapter interface
|
|
# ------------------------------------------------------------------
|
|
|
|
async def connect(self) -> bool:
|
|
"""Start the aiohttp web server."""
|
|
try:
|
|
mws = [mw for mw in (cors_middleware, body_limit_middleware, security_headers_middleware) if mw is not None]
|
|
self._app = web.Application(middlewares=mws)
|
|
self._app["api_server_adapter"] = self
|
|
self._app.router.add_get("/health", self._handle_health)
|
|
self._app.router.add_get("/health/detailed", self._handle_health_detailed)
|
|
self._app.router.add_get("/v1/health", self._handle_health)
|
|
self._app.router.add_get("/v1/models", self._handle_models)
|
|
self._app.router.add_post("/v1/chat/completions", self._handle_chat_completions)
|
|
self._app.router.add_post("/v1/responses", self._handle_responses)
|
|
self._app.router.add_get("/v1/responses/{response_id}", self._handle_get_response)
|
|
self._app.router.add_delete("/v1/responses/{response_id}", self._handle_delete_response)
|
|
# Cron jobs management API
|
|
self._app.router.add_get("/api/jobs", self._handle_list_jobs)
|
|
self._app.router.add_post("/api/jobs", self._handle_create_job)
|
|
self._app.router.add_get("/api/jobs/{job_id}", self._handle_get_job)
|
|
self._app.router.add_patch("/api/jobs/{job_id}", self._handle_update_job)
|
|
self._app.router.add_delete("/api/jobs/{job_id}", self._handle_delete_job)
|
|
self._app.router.add_post("/api/jobs/{job_id}/pause", self._handle_pause_job)
|
|
self._app.router.add_post("/api/jobs/{job_id}/resume", self._handle_resume_job)
|
|
self._app.router.add_post("/api/jobs/{job_id}/run", self._handle_run_job)
|
|
# Structured event streaming
|
|
self._app.router.add_post("/v1/runs", self._handle_runs)
|
|
self._app.router.add_get("/v1/runs/{run_id}/events", self._handle_run_events)
|
|
# Start background sweep to clean up orphaned (unconsumed) run streams
|
|
sweep_task = asyncio.create_task(self._sweep_orphaned_runs())
|
|
try:
|
|
self._background_tasks.add(sweep_task)
|
|
except TypeError:
|
|
pass
|
|
if hasattr(sweep_task, "add_done_callback"):
|
|
sweep_task.add_done_callback(self._background_tasks.discard)
|
|
|
|
# Refuse to start network-accessible without authentication
|
|
if is_network_accessible(self._host) and not self._api_key:
|
|
logger.error(
|
|
"[%s] Refusing to start: binding to %s requires API_SERVER_KEY. "
|
|
"Set API_SERVER_KEY or use the default 127.0.0.1.",
|
|
self.name, self._host,
|
|
)
|
|
return False
|
|
|
|
# Refuse to start network-accessible with a placeholder key.
|
|
# Ported from openclaw/openclaw#64586.
|
|
if is_network_accessible(self._host) and self._api_key:
|
|
try:
|
|
from hermes_cli.auth import has_usable_secret
|
|
if not has_usable_secret(self._api_key, min_length=8):
|
|
logger.error(
|
|
"[%s] Refusing to start: API_SERVER_KEY is set to a "
|
|
"placeholder value. Generate a real secret "
|
|
"(e.g. `openssl rand -hex 32`) and set API_SERVER_KEY "
|
|
"before exposing the API server on %s.",
|
|
self.name, self._host,
|
|
)
|
|
return False
|
|
except ImportError:
|
|
pass
|
|
|
|
# Port conflict detection — fail fast if port is already in use
|
|
try:
|
|
with _socket.socket(_socket.AF_INET, _socket.SOCK_STREAM) as _s:
|
|
_s.settimeout(1)
|
|
_s.connect(('127.0.0.1', self._port))
|
|
logger.error('[%s] Port %d already in use. Set a different port in config.yaml: platforms.api_server.port', self.name, self._port)
|
|
return False
|
|
except (ConnectionRefusedError, OSError):
|
|
pass # port is free
|
|
|
|
self._runner = web.AppRunner(self._app)
|
|
await self._runner.setup()
|
|
self._site = web.TCPSite(self._runner, self._host, self._port)
|
|
await self._site.start()
|
|
|
|
self._mark_connected()
|
|
if not self._api_key:
|
|
logger.warning(
|
|
"[%s] ⚠️ No API key configured (API_SERVER_KEY / platforms.api_server.key). "
|
|
"All requests will be accepted without authentication. "
|
|
"Set an API key for production deployments to prevent "
|
|
"unauthorized access to sessions, responses, and cron jobs.",
|
|
self.name,
|
|
)
|
|
logger.info(
|
|
"[%s] API server listening on http://%s:%d (model: %s)",
|
|
self.name, self._host, self._port, self._model_name,
|
|
)
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error("[%s] Failed to start API server: %s", self.name, e)
|
|
return False
|
|
|
|
async def disconnect(self) -> None:
|
|
"""Stop the aiohttp web server."""
|
|
self._mark_disconnected()
|
|
if self._site:
|
|
await self._site.stop()
|
|
self._site = None
|
|
if self._runner:
|
|
await self._runner.cleanup()
|
|
self._runner = None
|
|
self._app = None
|
|
logger.info("[%s] API server stopped", self.name)
|
|
|
|
async def send(
|
|
self,
|
|
chat_id: str,
|
|
content: str,
|
|
reply_to: Optional[str] = None,
|
|
metadata: Optional[Dict[str, Any]] = None,
|
|
) -> SendResult:
|
|
"""
|
|
Not used — HTTP request/response cycle handles delivery directly.
|
|
"""
|
|
return SendResult(success=False, error="API server uses HTTP request/response, not send()")
|
|
|
|
async def get_chat_info(self, chat_id: str) -> Dict[str, Any]:
|
|
"""Return basic info about the API server."""
|
|
return {
|
|
"name": "API Server",
|
|
"type": "api",
|
|
"host": self._host,
|
|
"port": self._port,
|
|
}
|