mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-25 00:51:20 +00:00
Some OpenAI-compatible clients (Open WebUI, LobeChat, etc.) send
message content as an array of typed parts instead of a plain string:
[{"type": "text", "text": "hello"}]
The agent pipeline expects strings, so these array payloads caused
silent failures or empty messages.
Add _normalize_chat_content() with defensive limits (recursion depth,
list size, output length) and apply it to both the Chat Completions
and Responses API endpoints. The Responses path had inline
normalization that only handled input_text/output_text — the shared
function also handles the standard 'text' type.
Salvaged from PR #7980 (ikelvingo) — only the content normalization;
the SSE and Weixin changes in that PR were regressions and are not
included.
Co-authored-by: ikelvingo <ikelvingo@users.noreply.github.com>
1887 lines
77 KiB
Python
1887 lines
77 KiB
Python
"""
|
|
OpenAI-compatible API server platform adapter.
|
|
|
|
Exposes an HTTP server with endpoints:
|
|
- POST /v1/chat/completions — OpenAI Chat Completions format (stateless; opt-in session continuity via X-Hermes-Session-Id header)
|
|
- POST /v1/responses — OpenAI Responses API format (stateful via previous_response_id)
|
|
- GET /v1/responses/{response_id} — Retrieve a stored response
|
|
- DELETE /v1/responses/{response_id} — Delete a stored response
|
|
- GET /v1/models — lists hermes-agent as an available model
|
|
- POST /v1/runs — start a run, returns run_id immediately (202)
|
|
- GET /v1/runs/{run_id}/events — SSE stream of structured lifecycle events
|
|
- GET /health — health check
|
|
|
|
Any OpenAI-compatible frontend (Open WebUI, LobeChat, LibreChat,
|
|
AnythingLLM, NextChat, ChatBox, etc.) can connect to hermes-agent
|
|
through this adapter by pointing at http://localhost:8642/v1.
|
|
|
|
Requires:
|
|
- aiohttp (already available in the gateway)
|
|
"""
|
|
|
|
import asyncio
|
|
import hashlib
|
|
import hmac
|
|
import json
|
|
import logging
|
|
import os
|
|
import socket as _socket
|
|
import re
|
|
import sqlite3
|
|
import time
|
|
import uuid
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
try:
|
|
from aiohttp import web
|
|
AIOHTTP_AVAILABLE = True
|
|
except ImportError:
|
|
AIOHTTP_AVAILABLE = False
|
|
web = None # type: ignore[assignment]
|
|
|
|
from gateway.config import Platform, PlatformConfig
|
|
from gateway.platforms.base import (
|
|
BasePlatformAdapter,
|
|
SendResult,
|
|
is_network_accessible,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Default settings
|
|
DEFAULT_HOST = "127.0.0.1"
|
|
DEFAULT_PORT = 8642
|
|
MAX_STORED_RESPONSES = 100
|
|
MAX_REQUEST_BYTES = 1_000_000 # 1 MB default limit for POST bodies
|
|
CHAT_COMPLETIONS_SSE_KEEPALIVE_SECONDS = 30.0
|
|
MAX_NORMALIZED_TEXT_LENGTH = 65_536 # 64 KB cap for normalized content parts
|
|
MAX_CONTENT_LIST_SIZE = 1_000 # Max items when content is an array
|
|
|
|
|
|
def _normalize_chat_content(
|
|
content: Any, *, _max_depth: int = 10, _depth: int = 0,
|
|
) -> str:
|
|
"""Normalize OpenAI chat message content into a plain text string.
|
|
|
|
Some clients (Open WebUI, LobeChat, etc.) send content as an array of
|
|
typed parts instead of a plain string::
|
|
|
|
[{"type": "text", "text": "hello"}, {"type": "input_text", "text": "..."}]
|
|
|
|
This function flattens those into a single string so the agent pipeline
|
|
(which expects strings) doesn't choke.
|
|
|
|
Defensive limits prevent abuse: recursion depth, list size, and output
|
|
length are all bounded.
|
|
"""
|
|
if _depth > _max_depth:
|
|
return ""
|
|
if content is None:
|
|
return ""
|
|
if isinstance(content, str):
|
|
return content[:MAX_NORMALIZED_TEXT_LENGTH] if len(content) > MAX_NORMALIZED_TEXT_LENGTH else content
|
|
|
|
if isinstance(content, list):
|
|
parts: List[str] = []
|
|
items = content[:MAX_CONTENT_LIST_SIZE] if len(content) > MAX_CONTENT_LIST_SIZE else content
|
|
for item in items:
|
|
if isinstance(item, str):
|
|
if item:
|
|
parts.append(item[:MAX_NORMALIZED_TEXT_LENGTH])
|
|
elif isinstance(item, dict):
|
|
item_type = str(item.get("type") or "").strip().lower()
|
|
if item_type in {"text", "input_text", "output_text"}:
|
|
text = item.get("text", "")
|
|
if text:
|
|
try:
|
|
parts.append(str(text)[:MAX_NORMALIZED_TEXT_LENGTH])
|
|
except Exception:
|
|
pass
|
|
# Silently skip image_url / other non-text parts
|
|
elif isinstance(item, list):
|
|
nested = _normalize_chat_content(item, _max_depth=_max_depth, _depth=_depth + 1)
|
|
if nested:
|
|
parts.append(nested)
|
|
# Check accumulated size
|
|
if sum(len(p) for p in parts) >= MAX_NORMALIZED_TEXT_LENGTH:
|
|
break
|
|
result = "\n".join(parts)
|
|
return result[:MAX_NORMALIZED_TEXT_LENGTH] if len(result) > MAX_NORMALIZED_TEXT_LENGTH else result
|
|
|
|
# Fallback for unexpected types (int, float, bool, etc.)
|
|
try:
|
|
result = str(content)
|
|
return result[:MAX_NORMALIZED_TEXT_LENGTH] if len(result) > MAX_NORMALIZED_TEXT_LENGTH else result
|
|
except Exception:
|
|
return ""
|
|
|
|
|
|
def check_api_server_requirements() -> bool:
|
|
"""Check if API server dependencies are available."""
|
|
return AIOHTTP_AVAILABLE
|
|
|
|
|
|
class ResponseStore:
|
|
"""
|
|
SQLite-backed LRU store for Responses API state.
|
|
|
|
Each stored response includes the full internal conversation history
|
|
(with tool calls and results) so it can be reconstructed on subsequent
|
|
requests via previous_response_id.
|
|
|
|
Persists across gateway restarts. Falls back to in-memory SQLite
|
|
if the on-disk path is unavailable.
|
|
"""
|
|
|
|
def __init__(self, max_size: int = MAX_STORED_RESPONSES, db_path: str = None):
|
|
self._max_size = max_size
|
|
if db_path is None:
|
|
try:
|
|
from hermes_cli.config import get_hermes_home
|
|
db_path = str(get_hermes_home() / "response_store.db")
|
|
except Exception:
|
|
db_path = ":memory:"
|
|
try:
|
|
self._conn = sqlite3.connect(db_path, check_same_thread=False)
|
|
except Exception:
|
|
self._conn = sqlite3.connect(":memory:", check_same_thread=False)
|
|
self._conn.execute("PRAGMA journal_mode=WAL")
|
|
self._conn.execute(
|
|
"""CREATE TABLE IF NOT EXISTS responses (
|
|
response_id TEXT PRIMARY KEY,
|
|
data TEXT NOT NULL,
|
|
accessed_at REAL NOT NULL
|
|
)"""
|
|
)
|
|
self._conn.execute(
|
|
"""CREATE TABLE IF NOT EXISTS conversations (
|
|
name TEXT PRIMARY KEY,
|
|
response_id TEXT NOT NULL
|
|
)"""
|
|
)
|
|
self._conn.commit()
|
|
|
|
def get(self, response_id: str) -> Optional[Dict[str, Any]]:
|
|
"""Retrieve a stored response by ID (updates access time for LRU)."""
|
|
row = self._conn.execute(
|
|
"SELECT data FROM responses WHERE response_id = ?", (response_id,)
|
|
).fetchone()
|
|
if row is None:
|
|
return None
|
|
import time
|
|
self._conn.execute(
|
|
"UPDATE responses SET accessed_at = ? WHERE response_id = ?",
|
|
(time.time(), response_id),
|
|
)
|
|
self._conn.commit()
|
|
return json.loads(row[0])
|
|
|
|
def put(self, response_id: str, data: Dict[str, Any]) -> None:
|
|
"""Store a response, evicting the oldest if at capacity."""
|
|
import time
|
|
self._conn.execute(
|
|
"INSERT OR REPLACE INTO responses (response_id, data, accessed_at) VALUES (?, ?, ?)",
|
|
(response_id, json.dumps(data, default=str), time.time()),
|
|
)
|
|
# Evict oldest entries beyond max_size
|
|
count = self._conn.execute("SELECT COUNT(*) FROM responses").fetchone()[0]
|
|
if count > self._max_size:
|
|
self._conn.execute(
|
|
"DELETE FROM responses WHERE response_id IN "
|
|
"(SELECT response_id FROM responses ORDER BY accessed_at ASC LIMIT ?)",
|
|
(count - self._max_size,),
|
|
)
|
|
self._conn.commit()
|
|
|
|
def delete(self, response_id: str) -> bool:
|
|
"""Remove a response from the store. Returns True if found and deleted."""
|
|
cursor = self._conn.execute(
|
|
"DELETE FROM responses WHERE response_id = ?", (response_id,)
|
|
)
|
|
self._conn.commit()
|
|
return cursor.rowcount > 0
|
|
|
|
def get_conversation(self, name: str) -> Optional[str]:
|
|
"""Get the latest response_id for a conversation name."""
|
|
row = self._conn.execute(
|
|
"SELECT response_id FROM conversations WHERE name = ?", (name,)
|
|
).fetchone()
|
|
return row[0] if row else None
|
|
|
|
def set_conversation(self, name: str, response_id: str) -> None:
|
|
"""Map a conversation name to its latest response_id."""
|
|
self._conn.execute(
|
|
"INSERT OR REPLACE INTO conversations (name, response_id) VALUES (?, ?)",
|
|
(name, response_id),
|
|
)
|
|
self._conn.commit()
|
|
|
|
def close(self) -> None:
|
|
"""Close the database connection."""
|
|
try:
|
|
self._conn.close()
|
|
except Exception:
|
|
pass
|
|
|
|
def __len__(self) -> int:
|
|
row = self._conn.execute("SELECT COUNT(*) FROM responses").fetchone()
|
|
return row[0] if row else 0
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# CORS middleware
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_CORS_HEADERS = {
|
|
"Access-Control-Allow-Methods": "GET, POST, DELETE, OPTIONS",
|
|
"Access-Control-Allow-Headers": "Authorization, Content-Type, Idempotency-Key",
|
|
}
|
|
|
|
|
|
if AIOHTTP_AVAILABLE:
|
|
@web.middleware
|
|
async def cors_middleware(request, handler):
|
|
"""Add CORS headers for explicitly allowed origins; handle OPTIONS preflight."""
|
|
adapter = request.app.get("api_server_adapter")
|
|
origin = request.headers.get("Origin", "")
|
|
cors_headers = None
|
|
if adapter is not None:
|
|
if not adapter._origin_allowed(origin):
|
|
return web.Response(status=403)
|
|
cors_headers = adapter._cors_headers_for_origin(origin)
|
|
|
|
if request.method == "OPTIONS":
|
|
if cors_headers is None:
|
|
return web.Response(status=403)
|
|
return web.Response(status=200, headers=cors_headers)
|
|
|
|
response = await handler(request)
|
|
if cors_headers is not None:
|
|
response.headers.update(cors_headers)
|
|
return response
|
|
else:
|
|
cors_middleware = None # type: ignore[assignment]
|
|
|
|
|
|
def _openai_error(message: str, err_type: str = "invalid_request_error", param: str = None, code: str = None) -> Dict[str, Any]:
|
|
"""OpenAI-style error envelope."""
|
|
return {
|
|
"error": {
|
|
"message": message,
|
|
"type": err_type,
|
|
"param": param,
|
|
"code": code,
|
|
}
|
|
}
|
|
|
|
|
|
if AIOHTTP_AVAILABLE:
|
|
@web.middleware
|
|
async def body_limit_middleware(request, handler):
|
|
"""Reject overly large request bodies early based on Content-Length."""
|
|
if request.method in ("POST", "PUT", "PATCH"):
|
|
cl = request.headers.get("Content-Length")
|
|
if cl is not None:
|
|
try:
|
|
if int(cl) > MAX_REQUEST_BYTES:
|
|
return web.json_response(_openai_error("Request body too large.", code="body_too_large"), status=413)
|
|
except ValueError:
|
|
return web.json_response(_openai_error("Invalid Content-Length header.", code="invalid_content_length"), status=400)
|
|
return await handler(request)
|
|
else:
|
|
body_limit_middleware = None # type: ignore[assignment]
|
|
|
|
_SECURITY_HEADERS = {
|
|
"X-Content-Type-Options": "nosniff",
|
|
"Referrer-Policy": "no-referrer",
|
|
}
|
|
|
|
|
|
if AIOHTTP_AVAILABLE:
|
|
@web.middleware
|
|
async def security_headers_middleware(request, handler):
|
|
"""Add security headers to all responses (including errors)."""
|
|
response = await handler(request)
|
|
for k, v in _SECURITY_HEADERS.items():
|
|
response.headers.setdefault(k, v)
|
|
return response
|
|
else:
|
|
security_headers_middleware = None # type: ignore[assignment]
|
|
|
|
|
|
class _IdempotencyCache:
|
|
"""In-memory idempotency cache with TTL and basic LRU semantics."""
|
|
def __init__(self, max_items: int = 1000, ttl_seconds: int = 300):
|
|
from collections import OrderedDict
|
|
self._store = OrderedDict()
|
|
self._ttl = ttl_seconds
|
|
self._max = max_items
|
|
|
|
def _purge(self):
|
|
import time as _t
|
|
now = _t.time()
|
|
expired = [k for k, v in self._store.items() if now - v["ts"] > self._ttl]
|
|
for k in expired:
|
|
self._store.pop(k, None)
|
|
while len(self._store) > self._max:
|
|
self._store.popitem(last=False)
|
|
|
|
async def get_or_set(self, key: str, fingerprint: str, compute_coro):
|
|
self._purge()
|
|
item = self._store.get(key)
|
|
if item and item["fp"] == fingerprint:
|
|
return item["resp"]
|
|
resp = await compute_coro()
|
|
import time as _t
|
|
self._store[key] = {"resp": resp, "fp": fingerprint, "ts": _t.time()}
|
|
self._purge()
|
|
return resp
|
|
|
|
|
|
_idem_cache = _IdempotencyCache()
|
|
|
|
|
|
def _make_request_fingerprint(body: Dict[str, Any], keys: List[str]) -> str:
|
|
from hashlib import sha256
|
|
subset = {k: body.get(k) for k in keys}
|
|
return sha256(repr(subset).encode("utf-8")).hexdigest()
|
|
|
|
|
|
def _derive_chat_session_id(
|
|
system_prompt: Optional[str],
|
|
first_user_message: str,
|
|
) -> str:
|
|
"""Derive a stable session ID from the conversation's first user message.
|
|
|
|
OpenAI-compatible frontends (Open WebUI, LibreChat, etc.) send the full
|
|
conversation history with every request. The system prompt and first user
|
|
message are constant across all turns of the same conversation, so hashing
|
|
them produces a deterministic session ID that lets the API server reuse
|
|
the same Hermes session (and therefore the same Docker container sandbox
|
|
directory) across turns.
|
|
"""
|
|
seed = f"{system_prompt or ''}\n{first_user_message}"
|
|
digest = hashlib.sha256(seed.encode("utf-8")).hexdigest()[:16]
|
|
return f"api-{digest}"
|
|
|
|
|
|
class APIServerAdapter(BasePlatformAdapter):
|
|
"""
|
|
OpenAI-compatible HTTP API server adapter.
|
|
|
|
Runs an aiohttp web server that accepts OpenAI-format requests
|
|
and routes them through hermes-agent's AIAgent.
|
|
"""
|
|
|
|
def __init__(self, config: PlatformConfig):
|
|
super().__init__(config, Platform.API_SERVER)
|
|
extra = config.extra or {}
|
|
self._host: str = extra.get("host", os.getenv("API_SERVER_HOST", DEFAULT_HOST))
|
|
self._port: int = int(extra.get("port", os.getenv("API_SERVER_PORT", str(DEFAULT_PORT))))
|
|
self._api_key: str = extra.get("key", os.getenv("API_SERVER_KEY", ""))
|
|
self._cors_origins: tuple[str, ...] = self._parse_cors_origins(
|
|
extra.get("cors_origins", os.getenv("API_SERVER_CORS_ORIGINS", "")),
|
|
)
|
|
self._model_name: str = self._resolve_model_name(
|
|
extra.get("model_name", os.getenv("API_SERVER_MODEL_NAME", "")),
|
|
)
|
|
self._app: Optional["web.Application"] = None
|
|
self._runner: Optional["web.AppRunner"] = None
|
|
self._site: Optional["web.TCPSite"] = None
|
|
self._response_store = ResponseStore()
|
|
# Active run streams: run_id -> asyncio.Queue of SSE event dicts
|
|
self._run_streams: Dict[str, "asyncio.Queue[Optional[Dict]]"] = {}
|
|
# Creation timestamps for orphaned-run TTL sweep
|
|
self._run_streams_created: Dict[str, float] = {}
|
|
self._session_db: Optional[Any] = None # Lazy-init SessionDB for session continuity
|
|
|
|
@staticmethod
|
|
def _parse_cors_origins(value: Any) -> tuple[str, ...]:
|
|
"""Normalize configured CORS origins into a stable tuple."""
|
|
if not value:
|
|
return ()
|
|
|
|
if isinstance(value, str):
|
|
items = value.split(",")
|
|
elif isinstance(value, (list, tuple, set)):
|
|
items = value
|
|
else:
|
|
items = [str(value)]
|
|
|
|
return tuple(str(item).strip() for item in items if str(item).strip())
|
|
|
|
@staticmethod
|
|
def _resolve_model_name(explicit: str) -> str:
|
|
"""Derive the advertised model name for /v1/models.
|
|
|
|
Priority:
|
|
1. Explicit override (config extra or API_SERVER_MODEL_NAME env var)
|
|
2. Active profile name (so each profile advertises a distinct model)
|
|
3. Fallback: "hermes-agent"
|
|
"""
|
|
if explicit and explicit.strip():
|
|
return explicit.strip()
|
|
try:
|
|
from hermes_cli.profiles import get_active_profile_name
|
|
profile = get_active_profile_name()
|
|
if profile and profile not in ("default", "custom"):
|
|
return profile
|
|
except Exception:
|
|
pass
|
|
return "hermes-agent"
|
|
|
|
def _cors_headers_for_origin(self, origin: str) -> Optional[Dict[str, str]]:
|
|
"""Return CORS headers for an allowed browser origin."""
|
|
if not origin or not self._cors_origins:
|
|
return None
|
|
|
|
if "*" in self._cors_origins:
|
|
headers = dict(_CORS_HEADERS)
|
|
headers["Access-Control-Allow-Origin"] = "*"
|
|
headers["Access-Control-Max-Age"] = "600"
|
|
return headers
|
|
|
|
if origin not in self._cors_origins:
|
|
return None
|
|
|
|
headers = dict(_CORS_HEADERS)
|
|
headers["Access-Control-Allow-Origin"] = origin
|
|
headers["Vary"] = "Origin"
|
|
headers["Access-Control-Max-Age"] = "600"
|
|
return headers
|
|
|
|
def _origin_allowed(self, origin: str) -> bool:
|
|
"""Allow non-browser clients and explicitly configured browser origins."""
|
|
if not origin:
|
|
return True
|
|
|
|
if not self._cors_origins:
|
|
return False
|
|
|
|
return "*" in self._cors_origins or origin in self._cors_origins
|
|
|
|
# ------------------------------------------------------------------
|
|
# Auth helper
|
|
# ------------------------------------------------------------------
|
|
|
|
def _check_auth(self, request: "web.Request") -> Optional["web.Response"]:
|
|
"""
|
|
Validate Bearer token from Authorization header.
|
|
|
|
Returns None if auth is OK, or a 401 web.Response on failure.
|
|
If no API key is configured, all requests are allowed (only when API
|
|
server is local).
|
|
"""
|
|
if not self._api_key:
|
|
return None # No key configured — allow all (local-only use)
|
|
|
|
auth_header = request.headers.get("Authorization", "")
|
|
if auth_header.startswith("Bearer "):
|
|
token = auth_header[7:].strip()
|
|
if hmac.compare_digest(token, self._api_key):
|
|
return None # Auth OK
|
|
|
|
return web.json_response(
|
|
{"error": {"message": "Invalid API key", "type": "invalid_request_error", "code": "invalid_api_key"}},
|
|
status=401,
|
|
)
|
|
|
|
# ------------------------------------------------------------------
|
|
# Session DB helper
|
|
# ------------------------------------------------------------------
|
|
|
|
def _ensure_session_db(self):
|
|
"""Lazily initialise and return the shared SessionDB instance.
|
|
|
|
Sessions are persisted to ``state.db`` so that ``hermes sessions list``
|
|
shows API-server conversations alongside CLI and gateway ones.
|
|
"""
|
|
if self._session_db is None:
|
|
try:
|
|
from hermes_state import SessionDB
|
|
self._session_db = SessionDB()
|
|
except Exception as e:
|
|
logger.debug("SessionDB unavailable for API server: %s", e)
|
|
return self._session_db
|
|
|
|
# ------------------------------------------------------------------
|
|
# Agent creation helper
|
|
# ------------------------------------------------------------------
|
|
|
|
def _create_agent(
|
|
self,
|
|
ephemeral_system_prompt: Optional[str] = None,
|
|
session_id: Optional[str] = None,
|
|
stream_delta_callback=None,
|
|
tool_progress_callback=None,
|
|
) -> Any:
|
|
"""
|
|
Create an AIAgent instance using the gateway's runtime config.
|
|
|
|
Uses _resolve_runtime_agent_kwargs() to pick up model, api_key,
|
|
base_url, etc. from config.yaml / env vars. Toolsets are resolved
|
|
from config.yaml platform_toolsets.api_server (same as all other
|
|
gateway platforms), falling back to the hermes-api-server default.
|
|
"""
|
|
from run_agent import AIAgent
|
|
from gateway.run import _resolve_runtime_agent_kwargs, _resolve_gateway_model, _load_gateway_config
|
|
from hermes_cli.tools_config import _get_platform_tools
|
|
|
|
runtime_kwargs = _resolve_runtime_agent_kwargs()
|
|
model = _resolve_gateway_model()
|
|
|
|
user_config = _load_gateway_config()
|
|
enabled_toolsets = sorted(_get_platform_tools(user_config, "api_server"))
|
|
|
|
max_iterations = int(os.getenv("HERMES_MAX_ITERATIONS", "90"))
|
|
|
|
# Load fallback provider chain so the API server platform has the
|
|
# same fallback behaviour as Telegram/Discord/Slack (fixes #4954).
|
|
from gateway.run import GatewayRunner
|
|
fallback_model = GatewayRunner._load_fallback_model()
|
|
|
|
agent = AIAgent(
|
|
model=model,
|
|
**runtime_kwargs,
|
|
max_iterations=max_iterations,
|
|
quiet_mode=True,
|
|
verbose_logging=False,
|
|
ephemeral_system_prompt=ephemeral_system_prompt or None,
|
|
enabled_toolsets=enabled_toolsets,
|
|
session_id=session_id,
|
|
platform="api_server",
|
|
stream_delta_callback=stream_delta_callback,
|
|
tool_progress_callback=tool_progress_callback,
|
|
session_db=self._ensure_session_db(),
|
|
fallback_model=fallback_model,
|
|
)
|
|
return agent
|
|
|
|
# ------------------------------------------------------------------
|
|
# HTTP Handlers
|
|
# ------------------------------------------------------------------
|
|
|
|
async def _handle_health(self, request: "web.Request") -> "web.Response":
|
|
"""GET /health — simple health check."""
|
|
return web.json_response({"status": "ok", "platform": "hermes-agent"})
|
|
|
|
async def _handle_models(self, request: "web.Request") -> "web.Response":
|
|
"""GET /v1/models — return hermes-agent as an available model."""
|
|
auth_err = self._check_auth(request)
|
|
if auth_err:
|
|
return auth_err
|
|
|
|
return web.json_response({
|
|
"object": "list",
|
|
"data": [
|
|
{
|
|
"id": self._model_name,
|
|
"object": "model",
|
|
"created": int(time.time()),
|
|
"owned_by": "hermes",
|
|
"permission": [],
|
|
"root": self._model_name,
|
|
"parent": None,
|
|
}
|
|
],
|
|
})
|
|
|
|
async def _handle_chat_completions(self, request: "web.Request") -> "web.Response":
|
|
"""POST /v1/chat/completions — OpenAI Chat Completions format."""
|
|
auth_err = self._check_auth(request)
|
|
if auth_err:
|
|
return auth_err
|
|
|
|
# Parse request body
|
|
try:
|
|
body = await request.json()
|
|
except (json.JSONDecodeError, Exception):
|
|
return web.json_response(_openai_error("Invalid JSON in request body"), status=400)
|
|
|
|
messages = body.get("messages")
|
|
if not messages or not isinstance(messages, list):
|
|
return web.json_response(
|
|
{"error": {"message": "Missing or invalid 'messages' field", "type": "invalid_request_error"}},
|
|
status=400,
|
|
)
|
|
|
|
stream = body.get("stream", False)
|
|
|
|
# Extract system message (becomes ephemeral system prompt layered ON TOP of core)
|
|
system_prompt = None
|
|
conversation_messages: List[Dict[str, str]] = []
|
|
|
|
for msg in messages:
|
|
role = msg.get("role", "")
|
|
content = _normalize_chat_content(msg.get("content", ""))
|
|
if role == "system":
|
|
# Accumulate system messages
|
|
if system_prompt is None:
|
|
system_prompt = content
|
|
else:
|
|
system_prompt = system_prompt + "\n" + content
|
|
elif role in ("user", "assistant"):
|
|
conversation_messages.append({"role": role, "content": content})
|
|
|
|
# Extract the last user message as the primary input
|
|
user_message = ""
|
|
history = []
|
|
if conversation_messages:
|
|
user_message = conversation_messages[-1].get("content", "")
|
|
history = conversation_messages[:-1]
|
|
|
|
if not user_message:
|
|
return web.json_response(
|
|
{"error": {"message": "No user message found in messages", "type": "invalid_request_error"}},
|
|
status=400,
|
|
)
|
|
|
|
# Allow caller to continue an existing session by passing X-Hermes-Session-Id.
|
|
# When provided, history is loaded from state.db instead of from the request body.
|
|
#
|
|
# Security: session continuation exposes conversation history, so it is
|
|
# only allowed when the API key is configured and the request is
|
|
# authenticated. Without this gate, any unauthenticated client could
|
|
# read arbitrary session history by guessing/enumerating session IDs.
|
|
provided_session_id = request.headers.get("X-Hermes-Session-Id", "").strip()
|
|
if provided_session_id:
|
|
if not self._api_key:
|
|
logger.warning(
|
|
"Session continuation via X-Hermes-Session-Id rejected: "
|
|
"no API key configured. Set API_SERVER_KEY to enable "
|
|
"session continuity."
|
|
)
|
|
return web.json_response(
|
|
_openai_error(
|
|
"Session continuation requires API key authentication. "
|
|
"Configure API_SERVER_KEY to enable this feature."
|
|
),
|
|
status=403,
|
|
)
|
|
# Sanitize: reject control characters that could enable header injection.
|
|
if re.search(r'[\r\n\x00]', provided_session_id):
|
|
return web.json_response(
|
|
{"error": {"message": "Invalid session ID", "type": "invalid_request_error"}},
|
|
status=400,
|
|
)
|
|
session_id = provided_session_id
|
|
try:
|
|
db = self._ensure_session_db()
|
|
if db is not None:
|
|
history = db.get_messages_as_conversation(session_id)
|
|
except Exception as e:
|
|
logger.warning("Failed to load session history for %s: %s", session_id, e)
|
|
history = []
|
|
else:
|
|
# Derive a stable session ID from the conversation fingerprint so
|
|
# that consecutive messages from the same Open WebUI (or similar)
|
|
# conversation map to the same Hermes session. The first user
|
|
# message + system prompt are constant across all turns.
|
|
first_user = ""
|
|
for cm in conversation_messages:
|
|
if cm.get("role") == "user":
|
|
first_user = cm.get("content", "")
|
|
break
|
|
session_id = _derive_chat_session_id(system_prompt, first_user)
|
|
# history already set from request body above
|
|
|
|
completion_id = f"chatcmpl-{uuid.uuid4().hex[:29]}"
|
|
model_name = body.get("model", self._model_name)
|
|
created = int(time.time())
|
|
|
|
if stream:
|
|
import queue as _q
|
|
_stream_q: _q.Queue = _q.Queue()
|
|
|
|
def _on_delta(delta):
|
|
# Filter out None — the agent fires stream_delta_callback(None)
|
|
# to signal the CLI display to close its response box before
|
|
# tool execution, but the SSE writer uses None as end-of-stream
|
|
# sentinel. Forwarding it would prematurely close the HTTP
|
|
# response, causing Open WebUI (and similar frontends) to miss
|
|
# the final answer after tool calls. The SSE loop detects
|
|
# completion via agent_task.done() instead.
|
|
if delta is not None:
|
|
_stream_q.put(delta)
|
|
|
|
def _on_tool_progress(event_type, name, preview, args, **kwargs):
|
|
"""Send tool progress as a separate SSE event.
|
|
|
|
Previously, progress markers like ``⏰ list`` were injected
|
|
directly into ``delta.content``. OpenAI-compatible frontends
|
|
(Open WebUI, LobeChat, …) store ``delta.content`` verbatim as
|
|
the assistant message and send it back on subsequent requests.
|
|
After enough turns the model learns to *emit* the markers as
|
|
plain text instead of issuing real tool calls — silently
|
|
hallucinating tool results. See #6972.
|
|
|
|
The fix: push a tagged tuple ``("__tool_progress__", payload)``
|
|
onto the stream queue. The SSE writer emits it as a custom
|
|
``event: hermes.tool.progress`` line that compliant frontends
|
|
can render for UX but will *not* persist into conversation
|
|
history. Clients that don't understand the custom event type
|
|
silently ignore it per the SSE specification.
|
|
"""
|
|
if event_type != "tool.started":
|
|
return
|
|
if name.startswith("_"):
|
|
return
|
|
from agent.display import get_tool_emoji
|
|
emoji = get_tool_emoji(name)
|
|
label = preview or name
|
|
_stream_q.put(("__tool_progress__", {
|
|
"tool": name,
|
|
"emoji": emoji,
|
|
"label": label,
|
|
}))
|
|
|
|
# Start agent in background. agent_ref is a mutable container
|
|
# so the SSE writer can interrupt the agent on client disconnect.
|
|
agent_ref = [None]
|
|
agent_task = asyncio.ensure_future(self._run_agent(
|
|
user_message=user_message,
|
|
conversation_history=history,
|
|
ephemeral_system_prompt=system_prompt,
|
|
session_id=session_id,
|
|
stream_delta_callback=_on_delta,
|
|
tool_progress_callback=_on_tool_progress,
|
|
agent_ref=agent_ref,
|
|
))
|
|
|
|
return await self._write_sse_chat_completion(
|
|
request, completion_id, model_name, created, _stream_q,
|
|
agent_task, agent_ref, session_id=session_id,
|
|
)
|
|
|
|
# Non-streaming: run the agent (with optional Idempotency-Key)
|
|
async def _compute_completion():
|
|
return await self._run_agent(
|
|
user_message=user_message,
|
|
conversation_history=history,
|
|
ephemeral_system_prompt=system_prompt,
|
|
session_id=session_id,
|
|
)
|
|
|
|
idempotency_key = request.headers.get("Idempotency-Key")
|
|
if idempotency_key:
|
|
fp = _make_request_fingerprint(body, keys=["model", "messages", "tools", "tool_choice", "stream"])
|
|
try:
|
|
result, usage = await _idem_cache.get_or_set(idempotency_key, fp, _compute_completion)
|
|
except Exception as e:
|
|
logger.error("Error running agent for chat completions: %s", e, exc_info=True)
|
|
return web.json_response(
|
|
_openai_error(f"Internal server error: {e}", err_type="server_error"),
|
|
status=500,
|
|
)
|
|
else:
|
|
try:
|
|
result, usage = await _compute_completion()
|
|
except Exception as e:
|
|
logger.error("Error running agent for chat completions: %s", e, exc_info=True)
|
|
return web.json_response(
|
|
_openai_error(f"Internal server error: {e}", err_type="server_error"),
|
|
status=500,
|
|
)
|
|
|
|
final_response = result.get("final_response", "")
|
|
if not final_response:
|
|
final_response = result.get("error", "(No response generated)")
|
|
|
|
response_data = {
|
|
"id": completion_id,
|
|
"object": "chat.completion",
|
|
"created": created,
|
|
"model": model_name,
|
|
"choices": [
|
|
{
|
|
"index": 0,
|
|
"message": {
|
|
"role": "assistant",
|
|
"content": final_response,
|
|
},
|
|
"finish_reason": "stop",
|
|
}
|
|
],
|
|
"usage": {
|
|
"prompt_tokens": usage.get("input_tokens", 0),
|
|
"completion_tokens": usage.get("output_tokens", 0),
|
|
"total_tokens": usage.get("total_tokens", 0),
|
|
},
|
|
}
|
|
|
|
return web.json_response(response_data, headers={"X-Hermes-Session-Id": session_id})
|
|
|
|
async def _write_sse_chat_completion(
|
|
self, request: "web.Request", completion_id: str, model: str,
|
|
created: int, stream_q, agent_task, agent_ref=None, session_id: str = None,
|
|
) -> "web.StreamResponse":
|
|
"""Write real streaming SSE from agent's stream_delta_callback queue.
|
|
|
|
If the client disconnects mid-stream (network drop, browser tab close),
|
|
the agent is interrupted via ``agent.interrupt()`` so it stops making
|
|
LLM API calls, and the asyncio task wrapper is cancelled.
|
|
"""
|
|
import queue as _q
|
|
|
|
sse_headers = {
|
|
"Content-Type": "text/event-stream",
|
|
"Cache-Control": "no-cache",
|
|
"X-Accel-Buffering": "no",
|
|
}
|
|
# CORS middleware can't inject headers into StreamResponse after
|
|
# prepare() flushes them, so resolve CORS headers up front.
|
|
origin = request.headers.get("Origin", "")
|
|
cors = self._cors_headers_for_origin(origin) if origin else None
|
|
if cors:
|
|
sse_headers.update(cors)
|
|
if session_id:
|
|
sse_headers["X-Hermes-Session-Id"] = session_id
|
|
response = web.StreamResponse(status=200, headers=sse_headers)
|
|
await response.prepare(request)
|
|
|
|
try:
|
|
last_activity = time.monotonic()
|
|
|
|
# Role chunk
|
|
role_chunk = {
|
|
"id": completion_id, "object": "chat.completion.chunk",
|
|
"created": created, "model": model,
|
|
"choices": [{"index": 0, "delta": {"role": "assistant"}, "finish_reason": None}],
|
|
}
|
|
await response.write(f"data: {json.dumps(role_chunk)}\n\n".encode())
|
|
last_activity = time.monotonic()
|
|
|
|
# Helper — route a queue item to the correct SSE event.
|
|
async def _emit(item):
|
|
"""Write a single queue item to the SSE stream.
|
|
|
|
Plain strings are sent as normal ``delta.content`` chunks.
|
|
Tagged tuples ``("__tool_progress__", payload)`` are sent
|
|
as a custom ``event: hermes.tool.progress`` SSE event so
|
|
frontends can display them without storing the markers in
|
|
conversation history. See #6972.
|
|
"""
|
|
if isinstance(item, tuple) and len(item) == 2 and item[0] == "__tool_progress__":
|
|
event_data = json.dumps(item[1])
|
|
await response.write(
|
|
f"event: hermes.tool.progress\ndata: {event_data}\n\n".encode()
|
|
)
|
|
else:
|
|
content_chunk = {
|
|
"id": completion_id, "object": "chat.completion.chunk",
|
|
"created": created, "model": model,
|
|
"choices": [{"index": 0, "delta": {"content": item}, "finish_reason": None}],
|
|
}
|
|
await response.write(f"data: {json.dumps(content_chunk)}\n\n".encode())
|
|
return time.monotonic()
|
|
|
|
# Stream content chunks as they arrive from the agent
|
|
loop = asyncio.get_event_loop()
|
|
while True:
|
|
try:
|
|
delta = await loop.run_in_executor(None, lambda: stream_q.get(timeout=0.5))
|
|
except _q.Empty:
|
|
if agent_task.done():
|
|
# Drain any remaining items
|
|
while True:
|
|
try:
|
|
delta = stream_q.get_nowait()
|
|
if delta is None:
|
|
break
|
|
last_activity = await _emit(delta)
|
|
except _q.Empty:
|
|
break
|
|
break
|
|
if time.monotonic() - last_activity >= CHAT_COMPLETIONS_SSE_KEEPALIVE_SECONDS:
|
|
await response.write(b": keepalive\n\n")
|
|
last_activity = time.monotonic()
|
|
continue
|
|
|
|
if delta is None: # End of stream sentinel
|
|
break
|
|
|
|
last_activity = await _emit(delta)
|
|
|
|
# Get usage from completed agent
|
|
usage = {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0}
|
|
try:
|
|
result, agent_usage = await agent_task
|
|
usage = agent_usage or usage
|
|
except Exception:
|
|
pass
|
|
|
|
# Finish chunk
|
|
finish_chunk = {
|
|
"id": completion_id, "object": "chat.completion.chunk",
|
|
"created": created, "model": model,
|
|
"choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}],
|
|
"usage": {
|
|
"prompt_tokens": usage.get("input_tokens", 0),
|
|
"completion_tokens": usage.get("output_tokens", 0),
|
|
"total_tokens": usage.get("total_tokens", 0),
|
|
},
|
|
}
|
|
await response.write(f"data: {json.dumps(finish_chunk)}\n\n".encode())
|
|
await response.write(b"data: [DONE]\n\n")
|
|
except (ConnectionResetError, ConnectionAbortedError, BrokenPipeError, OSError):
|
|
# Client disconnected mid-stream. Interrupt the agent so it
|
|
# stops making LLM API calls at the next loop iteration, then
|
|
# cancel the asyncio task wrapper.
|
|
agent = agent_ref[0] if agent_ref else None
|
|
if agent is not None:
|
|
try:
|
|
agent.interrupt("SSE client disconnected")
|
|
except Exception:
|
|
pass
|
|
if not agent_task.done():
|
|
agent_task.cancel()
|
|
try:
|
|
await agent_task
|
|
except (asyncio.CancelledError, Exception):
|
|
pass
|
|
logger.info("SSE client disconnected; interrupted agent task %s", completion_id)
|
|
|
|
return response
|
|
|
|
async def _handle_responses(self, request: "web.Request") -> "web.Response":
|
|
"""POST /v1/responses — OpenAI Responses API format."""
|
|
auth_err = self._check_auth(request)
|
|
if auth_err:
|
|
return auth_err
|
|
|
|
# Parse request body
|
|
try:
|
|
body = await request.json()
|
|
except (json.JSONDecodeError, Exception):
|
|
return web.json_response(
|
|
{"error": {"message": "Invalid JSON in request body", "type": "invalid_request_error"}},
|
|
status=400,
|
|
)
|
|
|
|
raw_input = body.get("input")
|
|
if raw_input is None:
|
|
return web.json_response(_openai_error("Missing 'input' field"), status=400)
|
|
|
|
instructions = body.get("instructions")
|
|
previous_response_id = body.get("previous_response_id")
|
|
conversation = body.get("conversation")
|
|
store = body.get("store", True)
|
|
|
|
# conversation and previous_response_id are mutually exclusive
|
|
if conversation and previous_response_id:
|
|
return web.json_response(_openai_error("Cannot use both 'conversation' and 'previous_response_id'"), status=400)
|
|
|
|
# Resolve conversation name to latest response_id
|
|
if conversation:
|
|
previous_response_id = self._response_store.get_conversation(conversation)
|
|
# No error if conversation doesn't exist yet — it's a new conversation
|
|
|
|
# Normalize input to message list
|
|
input_messages: List[Dict[str, str]] = []
|
|
if isinstance(raw_input, str):
|
|
input_messages = [{"role": "user", "content": raw_input}]
|
|
elif isinstance(raw_input, list):
|
|
for item in raw_input:
|
|
if isinstance(item, str):
|
|
input_messages.append({"role": "user", "content": item})
|
|
elif isinstance(item, dict):
|
|
role = item.get("role", "user")
|
|
content = _normalize_chat_content(item.get("content", ""))
|
|
input_messages.append({"role": role, "content": content})
|
|
else:
|
|
return web.json_response(_openai_error("'input' must be a string or array"), status=400)
|
|
|
|
# Accept explicit conversation_history from the request body.
|
|
# This lets stateless clients supply their own history instead of
|
|
# relying on server-side response chaining via previous_response_id.
|
|
# Precedence: explicit conversation_history > previous_response_id.
|
|
conversation_history: List[Dict[str, str]] = []
|
|
raw_history = body.get("conversation_history")
|
|
if raw_history:
|
|
if not isinstance(raw_history, list):
|
|
return web.json_response(
|
|
_openai_error("'conversation_history' must be an array of message objects"),
|
|
status=400,
|
|
)
|
|
for i, entry in enumerate(raw_history):
|
|
if not isinstance(entry, dict) or "role" not in entry or "content" not in entry:
|
|
return web.json_response(
|
|
_openai_error(f"conversation_history[{i}] must have 'role' and 'content' fields"),
|
|
status=400,
|
|
)
|
|
conversation_history.append({"role": str(entry["role"]), "content": str(entry["content"])})
|
|
if previous_response_id:
|
|
logger.debug("Both conversation_history and previous_response_id provided; using conversation_history")
|
|
|
|
if not conversation_history and previous_response_id:
|
|
stored = self._response_store.get(previous_response_id)
|
|
if stored is None:
|
|
return web.json_response(_openai_error(f"Previous response not found: {previous_response_id}"), status=404)
|
|
conversation_history = list(stored.get("conversation_history", []))
|
|
# If no instructions provided, carry forward from previous
|
|
if instructions is None:
|
|
instructions = stored.get("instructions")
|
|
|
|
# Append new input messages to history (all but the last become history)
|
|
for msg in input_messages[:-1]:
|
|
conversation_history.append(msg)
|
|
|
|
# Last input message is the user_message
|
|
user_message = input_messages[-1].get("content", "") if input_messages else ""
|
|
if not user_message:
|
|
return web.json_response(_openai_error("No user message found in input"), status=400)
|
|
|
|
# Truncation support
|
|
if body.get("truncation") == "auto" and len(conversation_history) > 100:
|
|
conversation_history = conversation_history[-100:]
|
|
|
|
# Run the agent (with Idempotency-Key support)
|
|
session_id = str(uuid.uuid4())
|
|
|
|
async def _compute_response():
|
|
return await self._run_agent(
|
|
user_message=user_message,
|
|
conversation_history=conversation_history,
|
|
ephemeral_system_prompt=instructions,
|
|
session_id=session_id,
|
|
)
|
|
|
|
idempotency_key = request.headers.get("Idempotency-Key")
|
|
if idempotency_key:
|
|
fp = _make_request_fingerprint(
|
|
body,
|
|
keys=["input", "instructions", "previous_response_id", "conversation", "model", "tools"],
|
|
)
|
|
try:
|
|
result, usage = await _idem_cache.get_or_set(idempotency_key, fp, _compute_response)
|
|
except Exception as e:
|
|
logger.error("Error running agent for responses: %s", e, exc_info=True)
|
|
return web.json_response(
|
|
_openai_error(f"Internal server error: {e}", err_type="server_error"),
|
|
status=500,
|
|
)
|
|
else:
|
|
try:
|
|
result, usage = await _compute_response()
|
|
except Exception as e:
|
|
logger.error("Error running agent for responses: %s", e, exc_info=True)
|
|
return web.json_response(
|
|
_openai_error(f"Internal server error: {e}", err_type="server_error"),
|
|
status=500,
|
|
)
|
|
|
|
final_response = result.get("final_response", "")
|
|
if not final_response:
|
|
final_response = result.get("error", "(No response generated)")
|
|
|
|
response_id = f"resp_{uuid.uuid4().hex[:28]}"
|
|
created_at = int(time.time())
|
|
|
|
# Build the full conversation history for storage
|
|
# (includes tool calls from the agent run)
|
|
full_history = list(conversation_history)
|
|
full_history.append({"role": "user", "content": user_message})
|
|
# Add agent's internal messages if available
|
|
agent_messages = result.get("messages", [])
|
|
if agent_messages:
|
|
full_history.extend(agent_messages)
|
|
else:
|
|
full_history.append({"role": "assistant", "content": final_response})
|
|
|
|
# Build output items (includes tool calls + final message)
|
|
output_items = self._extract_output_items(result)
|
|
|
|
response_data = {
|
|
"id": response_id,
|
|
"object": "response",
|
|
"status": "completed",
|
|
"created_at": created_at,
|
|
"model": body.get("model", self._model_name),
|
|
"output": output_items,
|
|
"usage": {
|
|
"input_tokens": usage.get("input_tokens", 0),
|
|
"output_tokens": usage.get("output_tokens", 0),
|
|
"total_tokens": usage.get("total_tokens", 0),
|
|
},
|
|
}
|
|
|
|
# Store the complete response object for future chaining / GET retrieval
|
|
if store:
|
|
self._response_store.put(response_id, {
|
|
"response": response_data,
|
|
"conversation_history": full_history,
|
|
"instructions": instructions,
|
|
})
|
|
# Update conversation mapping so the next request with the same
|
|
# conversation name automatically chains to this response
|
|
if conversation:
|
|
self._response_store.set_conversation(conversation, response_id)
|
|
|
|
return web.json_response(response_data)
|
|
|
|
# ------------------------------------------------------------------
|
|
# GET / DELETE response endpoints
|
|
# ------------------------------------------------------------------
|
|
|
|
async def _handle_get_response(self, request: "web.Request") -> "web.Response":
|
|
"""GET /v1/responses/{response_id} — retrieve a stored response."""
|
|
auth_err = self._check_auth(request)
|
|
if auth_err:
|
|
return auth_err
|
|
|
|
response_id = request.match_info["response_id"]
|
|
stored = self._response_store.get(response_id)
|
|
if stored is None:
|
|
return web.json_response(_openai_error(f"Response not found: {response_id}"), status=404)
|
|
|
|
return web.json_response(stored["response"])
|
|
|
|
async def _handle_delete_response(self, request: "web.Request") -> "web.Response":
|
|
"""DELETE /v1/responses/{response_id} — delete a stored response."""
|
|
auth_err = self._check_auth(request)
|
|
if auth_err:
|
|
return auth_err
|
|
|
|
response_id = request.match_info["response_id"]
|
|
deleted = self._response_store.delete(response_id)
|
|
if not deleted:
|
|
return web.json_response(_openai_error(f"Response not found: {response_id}"), status=404)
|
|
|
|
return web.json_response({
|
|
"id": response_id,
|
|
"object": "response",
|
|
"deleted": True,
|
|
})
|
|
|
|
# ------------------------------------------------------------------
|
|
# Cron jobs API
|
|
# ------------------------------------------------------------------
|
|
|
|
# Check cron module availability once (not per-request)
|
|
_CRON_AVAILABLE = False
|
|
try:
|
|
from cron.jobs import (
|
|
list_jobs as _cron_list,
|
|
get_job as _cron_get,
|
|
create_job as _cron_create,
|
|
update_job as _cron_update,
|
|
remove_job as _cron_remove,
|
|
pause_job as _cron_pause,
|
|
resume_job as _cron_resume,
|
|
trigger_job as _cron_trigger,
|
|
)
|
|
# Wrap as staticmethod to prevent descriptor binding — these are plain
|
|
# module functions, not instance methods. Without this, self._cron_*()
|
|
# injects ``self`` as the first positional argument and every call
|
|
# raises TypeError.
|
|
_cron_list = staticmethod(_cron_list)
|
|
_cron_get = staticmethod(_cron_get)
|
|
_cron_create = staticmethod(_cron_create)
|
|
_cron_update = staticmethod(_cron_update)
|
|
_cron_remove = staticmethod(_cron_remove)
|
|
_cron_pause = staticmethod(_cron_pause)
|
|
_cron_resume = staticmethod(_cron_resume)
|
|
_cron_trigger = staticmethod(_cron_trigger)
|
|
_CRON_AVAILABLE = True
|
|
except ImportError:
|
|
pass
|
|
|
|
_JOB_ID_RE = __import__("re").compile(r"[a-f0-9]{12}")
|
|
# Allowed fields for update — prevents clients injecting arbitrary keys
|
|
_UPDATE_ALLOWED_FIELDS = {"name", "schedule", "prompt", "deliver", "skills", "skill", "repeat", "enabled"}
|
|
_MAX_NAME_LENGTH = 200
|
|
_MAX_PROMPT_LENGTH = 5000
|
|
|
|
def _check_jobs_available(self) -> Optional["web.Response"]:
|
|
"""Return error response if cron module isn't available."""
|
|
if not self._CRON_AVAILABLE:
|
|
return web.json_response(
|
|
{"error": "Cron module not available"}, status=501,
|
|
)
|
|
return None
|
|
|
|
def _check_job_id(self, request: "web.Request") -> tuple:
|
|
"""Validate and extract job_id. Returns (job_id, error_response)."""
|
|
job_id = request.match_info["job_id"]
|
|
if not self._JOB_ID_RE.fullmatch(job_id):
|
|
return job_id, web.json_response(
|
|
{"error": "Invalid job ID format"}, status=400,
|
|
)
|
|
return job_id, None
|
|
|
|
async def _handle_list_jobs(self, request: "web.Request") -> "web.Response":
|
|
"""GET /api/jobs — list all cron jobs."""
|
|
auth_err = self._check_auth(request)
|
|
if auth_err:
|
|
return auth_err
|
|
cron_err = self._check_jobs_available()
|
|
if cron_err:
|
|
return cron_err
|
|
try:
|
|
include_disabled = request.query.get("include_disabled", "").lower() in ("true", "1")
|
|
jobs = self._cron_list(include_disabled=include_disabled)
|
|
return web.json_response({"jobs": jobs})
|
|
except Exception as e:
|
|
return web.json_response({"error": str(e)}, status=500)
|
|
|
|
async def _handle_create_job(self, request: "web.Request") -> "web.Response":
|
|
"""POST /api/jobs — create a new cron job."""
|
|
auth_err = self._check_auth(request)
|
|
if auth_err:
|
|
return auth_err
|
|
cron_err = self._check_jobs_available()
|
|
if cron_err:
|
|
return cron_err
|
|
try:
|
|
body = await request.json()
|
|
name = (body.get("name") or "").strip()
|
|
schedule = (body.get("schedule") or "").strip()
|
|
prompt = body.get("prompt", "")
|
|
deliver = body.get("deliver", "local")
|
|
skills = body.get("skills")
|
|
repeat = body.get("repeat")
|
|
|
|
if not name:
|
|
return web.json_response({"error": "Name is required"}, status=400)
|
|
if len(name) > self._MAX_NAME_LENGTH:
|
|
return web.json_response(
|
|
{"error": f"Name must be ≤ {self._MAX_NAME_LENGTH} characters"}, status=400,
|
|
)
|
|
if not schedule:
|
|
return web.json_response({"error": "Schedule is required"}, status=400)
|
|
if len(prompt) > self._MAX_PROMPT_LENGTH:
|
|
return web.json_response(
|
|
{"error": f"Prompt must be ≤ {self._MAX_PROMPT_LENGTH} characters"}, status=400,
|
|
)
|
|
if repeat is not None and (not isinstance(repeat, int) or repeat < 1):
|
|
return web.json_response({"error": "Repeat must be a positive integer"}, status=400)
|
|
|
|
kwargs = {
|
|
"prompt": prompt,
|
|
"schedule": schedule,
|
|
"name": name,
|
|
"deliver": deliver,
|
|
}
|
|
if skills:
|
|
kwargs["skills"] = skills
|
|
if repeat is not None:
|
|
kwargs["repeat"] = repeat
|
|
|
|
job = self._cron_create(**kwargs)
|
|
return web.json_response({"job": job})
|
|
except Exception as e:
|
|
return web.json_response({"error": str(e)}, status=500)
|
|
|
|
async def _handle_get_job(self, request: "web.Request") -> "web.Response":
|
|
"""GET /api/jobs/{job_id} — get a single cron job."""
|
|
auth_err = self._check_auth(request)
|
|
if auth_err:
|
|
return auth_err
|
|
cron_err = self._check_jobs_available()
|
|
if cron_err:
|
|
return cron_err
|
|
job_id, id_err = self._check_job_id(request)
|
|
if id_err:
|
|
return id_err
|
|
try:
|
|
job = self._cron_get(job_id)
|
|
if not job:
|
|
return web.json_response({"error": "Job not found"}, status=404)
|
|
return web.json_response({"job": job})
|
|
except Exception as e:
|
|
return web.json_response({"error": str(e)}, status=500)
|
|
|
|
async def _handle_update_job(self, request: "web.Request") -> "web.Response":
|
|
"""PATCH /api/jobs/{job_id} — update a cron job."""
|
|
auth_err = self._check_auth(request)
|
|
if auth_err:
|
|
return auth_err
|
|
cron_err = self._check_jobs_available()
|
|
if cron_err:
|
|
return cron_err
|
|
job_id, id_err = self._check_job_id(request)
|
|
if id_err:
|
|
return id_err
|
|
try:
|
|
body = await request.json()
|
|
# Whitelist allowed fields to prevent arbitrary key injection
|
|
sanitized = {k: v for k, v in body.items() if k in self._UPDATE_ALLOWED_FIELDS}
|
|
if not sanitized:
|
|
return web.json_response({"error": "No valid fields to update"}, status=400)
|
|
# Validate lengths if present
|
|
if "name" in sanitized and len(sanitized["name"]) > self._MAX_NAME_LENGTH:
|
|
return web.json_response(
|
|
{"error": f"Name must be ≤ {self._MAX_NAME_LENGTH} characters"}, status=400,
|
|
)
|
|
if "prompt" in sanitized and len(sanitized["prompt"]) > self._MAX_PROMPT_LENGTH:
|
|
return web.json_response(
|
|
{"error": f"Prompt must be ≤ {self._MAX_PROMPT_LENGTH} characters"}, status=400,
|
|
)
|
|
job = self._cron_update(job_id, sanitized)
|
|
if not job:
|
|
return web.json_response({"error": "Job not found"}, status=404)
|
|
return web.json_response({"job": job})
|
|
except Exception as e:
|
|
return web.json_response({"error": str(e)}, status=500)
|
|
|
|
async def _handle_delete_job(self, request: "web.Request") -> "web.Response":
|
|
"""DELETE /api/jobs/{job_id} — delete a cron job."""
|
|
auth_err = self._check_auth(request)
|
|
if auth_err:
|
|
return auth_err
|
|
cron_err = self._check_jobs_available()
|
|
if cron_err:
|
|
return cron_err
|
|
job_id, id_err = self._check_job_id(request)
|
|
if id_err:
|
|
return id_err
|
|
try:
|
|
success = self._cron_remove(job_id)
|
|
if not success:
|
|
return web.json_response({"error": "Job not found"}, status=404)
|
|
return web.json_response({"ok": True})
|
|
except Exception as e:
|
|
return web.json_response({"error": str(e)}, status=500)
|
|
|
|
async def _handle_pause_job(self, request: "web.Request") -> "web.Response":
|
|
"""POST /api/jobs/{job_id}/pause — pause a cron job."""
|
|
auth_err = self._check_auth(request)
|
|
if auth_err:
|
|
return auth_err
|
|
cron_err = self._check_jobs_available()
|
|
if cron_err:
|
|
return cron_err
|
|
job_id, id_err = self._check_job_id(request)
|
|
if id_err:
|
|
return id_err
|
|
try:
|
|
job = self._cron_pause(job_id)
|
|
if not job:
|
|
return web.json_response({"error": "Job not found"}, status=404)
|
|
return web.json_response({"job": job})
|
|
except Exception as e:
|
|
return web.json_response({"error": str(e)}, status=500)
|
|
|
|
async def _handle_resume_job(self, request: "web.Request") -> "web.Response":
|
|
"""POST /api/jobs/{job_id}/resume — resume a paused cron job."""
|
|
auth_err = self._check_auth(request)
|
|
if auth_err:
|
|
return auth_err
|
|
cron_err = self._check_jobs_available()
|
|
if cron_err:
|
|
return cron_err
|
|
job_id, id_err = self._check_job_id(request)
|
|
if id_err:
|
|
return id_err
|
|
try:
|
|
job = self._cron_resume(job_id)
|
|
if not job:
|
|
return web.json_response({"error": "Job not found"}, status=404)
|
|
return web.json_response({"job": job})
|
|
except Exception as e:
|
|
return web.json_response({"error": str(e)}, status=500)
|
|
|
|
async def _handle_run_job(self, request: "web.Request") -> "web.Response":
|
|
"""POST /api/jobs/{job_id}/run — trigger immediate execution."""
|
|
auth_err = self._check_auth(request)
|
|
if auth_err:
|
|
return auth_err
|
|
cron_err = self._check_jobs_available()
|
|
if cron_err:
|
|
return cron_err
|
|
job_id, id_err = self._check_job_id(request)
|
|
if id_err:
|
|
return id_err
|
|
try:
|
|
job = self._cron_trigger(job_id)
|
|
if not job:
|
|
return web.json_response({"error": "Job not found"}, status=404)
|
|
return web.json_response({"job": job})
|
|
except Exception as e:
|
|
return web.json_response({"error": str(e)}, status=500)
|
|
|
|
# ------------------------------------------------------------------
|
|
# Output extraction helper
|
|
# ------------------------------------------------------------------
|
|
|
|
@staticmethod
|
|
def _extract_output_items(result: Dict[str, Any]) -> List[Dict[str, Any]]:
|
|
"""
|
|
Build the full output item array from the agent's messages.
|
|
|
|
Walks *result["messages"]* and emits:
|
|
- ``function_call`` items for each tool_call on assistant messages
|
|
- ``function_call_output`` items for each tool-role message
|
|
- a final ``message`` item with the assistant's text reply
|
|
"""
|
|
items: List[Dict[str, Any]] = []
|
|
messages = result.get("messages", [])
|
|
|
|
for msg in messages:
|
|
role = msg.get("role")
|
|
if role == "assistant" and msg.get("tool_calls"):
|
|
for tc in msg["tool_calls"]:
|
|
func = tc.get("function", {})
|
|
items.append({
|
|
"type": "function_call",
|
|
"name": func.get("name", ""),
|
|
"arguments": func.get("arguments", ""),
|
|
"call_id": tc.get("id", ""),
|
|
})
|
|
elif role == "tool":
|
|
items.append({
|
|
"type": "function_call_output",
|
|
"call_id": msg.get("tool_call_id", ""),
|
|
"output": msg.get("content", ""),
|
|
})
|
|
|
|
# Final assistant message
|
|
final = result.get("final_response", "")
|
|
if not final:
|
|
final = result.get("error", "(No response generated)")
|
|
|
|
items.append({
|
|
"type": "message",
|
|
"role": "assistant",
|
|
"content": [
|
|
{
|
|
"type": "output_text",
|
|
"text": final,
|
|
}
|
|
],
|
|
})
|
|
return items
|
|
|
|
# ------------------------------------------------------------------
|
|
# Agent execution
|
|
# ------------------------------------------------------------------
|
|
|
|
async def _run_agent(
|
|
self,
|
|
user_message: str,
|
|
conversation_history: List[Dict[str, str]],
|
|
ephemeral_system_prompt: Optional[str] = None,
|
|
session_id: Optional[str] = None,
|
|
stream_delta_callback=None,
|
|
tool_progress_callback=None,
|
|
agent_ref: Optional[list] = None,
|
|
) -> tuple:
|
|
"""
|
|
Create an agent and run a conversation in a thread executor.
|
|
|
|
Returns ``(result_dict, usage_dict)`` where *usage_dict* contains
|
|
``input_tokens``, ``output_tokens`` and ``total_tokens``.
|
|
|
|
If *agent_ref* is a one-element list, the AIAgent instance is stored
|
|
at ``agent_ref[0]`` before ``run_conversation`` begins. This allows
|
|
callers (e.g. the SSE writer) to call ``agent.interrupt()`` from
|
|
another thread to stop in-progress LLM calls.
|
|
"""
|
|
loop = asyncio.get_event_loop()
|
|
|
|
def _run():
|
|
agent = self._create_agent(
|
|
ephemeral_system_prompt=ephemeral_system_prompt,
|
|
session_id=session_id,
|
|
stream_delta_callback=stream_delta_callback,
|
|
tool_progress_callback=tool_progress_callback,
|
|
)
|
|
if agent_ref is not None:
|
|
agent_ref[0] = agent
|
|
result = agent.run_conversation(
|
|
user_message=user_message,
|
|
conversation_history=conversation_history,
|
|
task_id="default",
|
|
)
|
|
usage = {
|
|
"input_tokens": getattr(agent, "session_prompt_tokens", 0) or 0,
|
|
"output_tokens": getattr(agent, "session_completion_tokens", 0) or 0,
|
|
"total_tokens": getattr(agent, "session_total_tokens", 0) or 0,
|
|
}
|
|
return result, usage
|
|
|
|
return await loop.run_in_executor(None, _run)
|
|
|
|
# ------------------------------------------------------------------
|
|
# /v1/runs — structured event streaming
|
|
# ------------------------------------------------------------------
|
|
|
|
_MAX_CONCURRENT_RUNS = 10 # Prevent unbounded resource allocation
|
|
_RUN_STREAM_TTL = 300 # seconds before orphaned runs are swept
|
|
|
|
def _make_run_event_callback(self, run_id: str, loop: "asyncio.AbstractEventLoop"):
|
|
"""Return a tool_progress_callback that pushes structured events to the run's SSE queue."""
|
|
def _push(event: Dict[str, Any]) -> None:
|
|
q = self._run_streams.get(run_id)
|
|
if q is None:
|
|
return
|
|
try:
|
|
loop.call_soon_threadsafe(q.put_nowait, event)
|
|
except Exception:
|
|
pass
|
|
|
|
def _callback(event_type: str, tool_name: str = None, preview: str = None, args=None, **kwargs):
|
|
ts = time.time()
|
|
if event_type == "tool.started":
|
|
_push({
|
|
"event": "tool.started",
|
|
"run_id": run_id,
|
|
"timestamp": ts,
|
|
"tool": tool_name,
|
|
"preview": preview,
|
|
})
|
|
elif event_type == "tool.completed":
|
|
_push({
|
|
"event": "tool.completed",
|
|
"run_id": run_id,
|
|
"timestamp": ts,
|
|
"tool": tool_name,
|
|
"duration": round(kwargs.get("duration", 0), 3),
|
|
"error": kwargs.get("is_error", False),
|
|
})
|
|
elif event_type == "reasoning.available":
|
|
_push({
|
|
"event": "reasoning.available",
|
|
"run_id": run_id,
|
|
"timestamp": ts,
|
|
"text": preview or "",
|
|
})
|
|
# _thinking and subagent_progress are intentionally not forwarded
|
|
|
|
return _callback
|
|
|
|
async def _handle_runs(self, request: "web.Request") -> "web.Response":
|
|
"""POST /v1/runs — start an agent run, return run_id immediately."""
|
|
auth_err = self._check_auth(request)
|
|
if auth_err:
|
|
return auth_err
|
|
|
|
# Enforce concurrency limit
|
|
if len(self._run_streams) >= self._MAX_CONCURRENT_RUNS:
|
|
return web.json_response(
|
|
_openai_error(f"Too many concurrent runs (max {self._MAX_CONCURRENT_RUNS})", code="rate_limit_exceeded"),
|
|
status=429,
|
|
)
|
|
|
|
try:
|
|
body = await request.json()
|
|
except Exception:
|
|
return web.json_response(_openai_error("Invalid JSON"), status=400)
|
|
|
|
raw_input = body.get("input")
|
|
if not raw_input:
|
|
return web.json_response(_openai_error("Missing 'input' field"), status=400)
|
|
|
|
user_message = raw_input if isinstance(raw_input, str) else (raw_input[-1].get("content", "") if isinstance(raw_input, list) else "")
|
|
if not user_message:
|
|
return web.json_response(_openai_error("No user message found in input"), status=400)
|
|
|
|
run_id = f"run_{uuid.uuid4().hex}"
|
|
loop = asyncio.get_running_loop()
|
|
q: "asyncio.Queue[Optional[Dict]]" = asyncio.Queue()
|
|
self._run_streams[run_id] = q
|
|
self._run_streams_created[run_id] = time.time()
|
|
|
|
event_cb = self._make_run_event_callback(run_id, loop)
|
|
|
|
# Also wire stream_delta_callback so message.delta events flow through
|
|
def _text_cb(delta: Optional[str]) -> None:
|
|
if delta is None:
|
|
return
|
|
try:
|
|
loop.call_soon_threadsafe(q.put_nowait, {
|
|
"event": "message.delta",
|
|
"run_id": run_id,
|
|
"timestamp": time.time(),
|
|
"delta": delta,
|
|
})
|
|
except Exception:
|
|
pass
|
|
|
|
instructions = body.get("instructions")
|
|
previous_response_id = body.get("previous_response_id")
|
|
|
|
# Accept explicit conversation_history from the request body.
|
|
# Precedence: explicit conversation_history > previous_response_id.
|
|
conversation_history: List[Dict[str, str]] = []
|
|
raw_history = body.get("conversation_history")
|
|
if raw_history:
|
|
if not isinstance(raw_history, list):
|
|
return web.json_response(
|
|
_openai_error("'conversation_history' must be an array of message objects"),
|
|
status=400,
|
|
)
|
|
for i, entry in enumerate(raw_history):
|
|
if not isinstance(entry, dict) or "role" not in entry or "content" not in entry:
|
|
return web.json_response(
|
|
_openai_error(f"conversation_history[{i}] must have 'role' and 'content' fields"),
|
|
status=400,
|
|
)
|
|
conversation_history.append({"role": str(entry["role"]), "content": str(entry["content"])})
|
|
if previous_response_id:
|
|
logger.debug("Both conversation_history and previous_response_id provided; using conversation_history")
|
|
|
|
if not conversation_history and previous_response_id:
|
|
stored = self._response_store.get(previous_response_id)
|
|
if stored:
|
|
conversation_history = list(stored.get("conversation_history", []))
|
|
if instructions is None:
|
|
instructions = stored.get("instructions")
|
|
|
|
# When input is a multi-message array, extract all but the last
|
|
# message as conversation history (the last becomes user_message).
|
|
# Only fires when no explicit history was provided.
|
|
if not conversation_history and isinstance(raw_input, list) and len(raw_input) > 1:
|
|
for msg in raw_input[:-1]:
|
|
if isinstance(msg, dict) and msg.get("role") and msg.get("content"):
|
|
content = msg["content"]
|
|
if isinstance(content, list):
|
|
# Flatten multi-part content blocks to text
|
|
content = " ".join(
|
|
part.get("text", "") for part in content
|
|
if isinstance(part, dict) and part.get("type") == "text"
|
|
)
|
|
conversation_history.append({"role": msg["role"], "content": str(content)})
|
|
|
|
session_id = body.get("session_id") or run_id
|
|
ephemeral_system_prompt = instructions
|
|
|
|
async def _run_and_close():
|
|
try:
|
|
agent = self._create_agent(
|
|
ephemeral_system_prompt=ephemeral_system_prompt,
|
|
session_id=session_id,
|
|
stream_delta_callback=_text_cb,
|
|
tool_progress_callback=event_cb,
|
|
)
|
|
def _run_sync():
|
|
r = agent.run_conversation(
|
|
user_message=user_message,
|
|
conversation_history=conversation_history,
|
|
task_id="default",
|
|
)
|
|
u = {
|
|
"input_tokens": getattr(agent, "session_prompt_tokens", 0) or 0,
|
|
"output_tokens": getattr(agent, "session_completion_tokens", 0) or 0,
|
|
"total_tokens": getattr(agent, "session_total_tokens", 0) or 0,
|
|
}
|
|
return r, u
|
|
|
|
result, usage = await asyncio.get_running_loop().run_in_executor(None, _run_sync)
|
|
final_response = result.get("final_response", "") if isinstance(result, dict) else ""
|
|
q.put_nowait({
|
|
"event": "run.completed",
|
|
"run_id": run_id,
|
|
"timestamp": time.time(),
|
|
"output": final_response,
|
|
"usage": usage,
|
|
})
|
|
except Exception as exc:
|
|
logger.exception("[api_server] run %s failed", run_id)
|
|
try:
|
|
q.put_nowait({
|
|
"event": "run.failed",
|
|
"run_id": run_id,
|
|
"timestamp": time.time(),
|
|
"error": str(exc),
|
|
})
|
|
except Exception:
|
|
pass
|
|
finally:
|
|
# Sentinel: signal SSE stream to close
|
|
try:
|
|
q.put_nowait(None)
|
|
except Exception:
|
|
pass
|
|
|
|
task = asyncio.create_task(_run_and_close())
|
|
try:
|
|
self._background_tasks.add(task)
|
|
except TypeError:
|
|
pass
|
|
if hasattr(task, "add_done_callback"):
|
|
task.add_done_callback(self._background_tasks.discard)
|
|
|
|
return web.json_response({"run_id": run_id, "status": "started"}, status=202)
|
|
|
|
async def _handle_run_events(self, request: "web.Request") -> "web.StreamResponse":
|
|
"""GET /v1/runs/{run_id}/events — SSE stream of structured agent lifecycle events."""
|
|
auth_err = self._check_auth(request)
|
|
if auth_err:
|
|
return auth_err
|
|
|
|
run_id = request.match_info["run_id"]
|
|
|
|
# Allow subscribing slightly before the run is registered (race condition window)
|
|
for _ in range(20):
|
|
if run_id in self._run_streams:
|
|
break
|
|
await asyncio.sleep(0.05)
|
|
else:
|
|
return web.json_response(_openai_error(f"Run not found: {run_id}", code="run_not_found"), status=404)
|
|
|
|
q = self._run_streams[run_id]
|
|
|
|
response = web.StreamResponse(
|
|
status=200,
|
|
headers={
|
|
"Content-Type": "text/event-stream",
|
|
"Cache-Control": "no-cache",
|
|
"X-Accel-Buffering": "no",
|
|
},
|
|
)
|
|
await response.prepare(request)
|
|
|
|
try:
|
|
while True:
|
|
try:
|
|
event = await asyncio.wait_for(q.get(), timeout=30.0)
|
|
except asyncio.TimeoutError:
|
|
await response.write(b": keepalive\n\n")
|
|
continue
|
|
if event is None:
|
|
# Run finished — send final SSE comment and close
|
|
await response.write(b": stream closed\n\n")
|
|
break
|
|
payload = f"data: {json.dumps(event)}\n\n"
|
|
await response.write(payload.encode())
|
|
except Exception as exc:
|
|
logger.debug("[api_server] SSE stream error for run %s: %s", run_id, exc)
|
|
finally:
|
|
self._run_streams.pop(run_id, None)
|
|
self._run_streams_created.pop(run_id, None)
|
|
|
|
return response
|
|
|
|
async def _sweep_orphaned_runs(self) -> None:
|
|
"""Periodically clean up run streams that were never consumed."""
|
|
while True:
|
|
await asyncio.sleep(60)
|
|
now = time.time()
|
|
stale = [
|
|
run_id
|
|
for run_id, created_at in list(self._run_streams_created.items())
|
|
if now - created_at > self._RUN_STREAM_TTL
|
|
]
|
|
for run_id in stale:
|
|
logger.debug("[api_server] sweeping orphaned run %s", run_id)
|
|
self._run_streams.pop(run_id, None)
|
|
self._run_streams_created.pop(run_id, None)
|
|
|
|
# ------------------------------------------------------------------
|
|
# BasePlatformAdapter interface
|
|
# ------------------------------------------------------------------
|
|
|
|
async def connect(self) -> bool:
|
|
"""Start the aiohttp web server."""
|
|
if not AIOHTTP_AVAILABLE:
|
|
logger.warning("[%s] aiohttp not installed", self.name)
|
|
return False
|
|
|
|
try:
|
|
mws = [mw for mw in (cors_middleware, body_limit_middleware, security_headers_middleware) if mw is not None]
|
|
self._app = web.Application(middlewares=mws)
|
|
self._app["api_server_adapter"] = self
|
|
self._app.router.add_get("/health", self._handle_health)
|
|
self._app.router.add_get("/v1/health", self._handle_health)
|
|
self._app.router.add_get("/v1/models", self._handle_models)
|
|
self._app.router.add_post("/v1/chat/completions", self._handle_chat_completions)
|
|
self._app.router.add_post("/v1/responses", self._handle_responses)
|
|
self._app.router.add_get("/v1/responses/{response_id}", self._handle_get_response)
|
|
self._app.router.add_delete("/v1/responses/{response_id}", self._handle_delete_response)
|
|
# Cron jobs management API
|
|
self._app.router.add_get("/api/jobs", self._handle_list_jobs)
|
|
self._app.router.add_post("/api/jobs", self._handle_create_job)
|
|
self._app.router.add_get("/api/jobs/{job_id}", self._handle_get_job)
|
|
self._app.router.add_patch("/api/jobs/{job_id}", self._handle_update_job)
|
|
self._app.router.add_delete("/api/jobs/{job_id}", self._handle_delete_job)
|
|
self._app.router.add_post("/api/jobs/{job_id}/pause", self._handle_pause_job)
|
|
self._app.router.add_post("/api/jobs/{job_id}/resume", self._handle_resume_job)
|
|
self._app.router.add_post("/api/jobs/{job_id}/run", self._handle_run_job)
|
|
# Structured event streaming
|
|
self._app.router.add_post("/v1/runs", self._handle_runs)
|
|
self._app.router.add_get("/v1/runs/{run_id}/events", self._handle_run_events)
|
|
# Start background sweep to clean up orphaned (unconsumed) run streams
|
|
sweep_task = asyncio.create_task(self._sweep_orphaned_runs())
|
|
try:
|
|
self._background_tasks.add(sweep_task)
|
|
except TypeError:
|
|
pass
|
|
if hasattr(sweep_task, "add_done_callback"):
|
|
sweep_task.add_done_callback(self._background_tasks.discard)
|
|
|
|
# Refuse to start network-accessible without authentication
|
|
if is_network_accessible(self._host) and not self._api_key:
|
|
logger.error(
|
|
"[%s] Refusing to start: binding to %s requires API_SERVER_KEY. "
|
|
"Set API_SERVER_KEY or use the default 127.0.0.1.",
|
|
self.name, self._host,
|
|
)
|
|
return False
|
|
|
|
# Port conflict detection — fail fast if port is already in use
|
|
try:
|
|
with _socket.socket(_socket.AF_INET, _socket.SOCK_STREAM) as _s:
|
|
_s.settimeout(1)
|
|
_s.connect(('127.0.0.1', self._port))
|
|
logger.error('[%s] Port %d already in use. Set a different port in config.yaml: platforms.api_server.port', self.name, self._port)
|
|
return False
|
|
except (ConnectionRefusedError, OSError):
|
|
pass # port is free
|
|
|
|
self._runner = web.AppRunner(self._app)
|
|
await self._runner.setup()
|
|
self._site = web.TCPSite(self._runner, self._host, self._port)
|
|
await self._site.start()
|
|
|
|
self._mark_connected()
|
|
if not self._api_key:
|
|
logger.warning(
|
|
"[%s] ⚠️ No API key configured (API_SERVER_KEY / platforms.api_server.key). "
|
|
"All requests will be accepted without authentication. "
|
|
"Set an API key for production deployments to prevent "
|
|
"unauthorized access to sessions, responses, and cron jobs.",
|
|
self.name,
|
|
)
|
|
logger.info(
|
|
"[%s] API server listening on http://%s:%d (model: %s)",
|
|
self.name, self._host, self._port, self._model_name,
|
|
)
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error("[%s] Failed to start API server: %s", self.name, e)
|
|
return False
|
|
|
|
async def disconnect(self) -> None:
|
|
"""Stop the aiohttp web server."""
|
|
self._mark_disconnected()
|
|
if self._site:
|
|
await self._site.stop()
|
|
self._site = None
|
|
if self._runner:
|
|
await self._runner.cleanup()
|
|
self._runner = None
|
|
self._app = None
|
|
logger.info("[%s] API server stopped", self.name)
|
|
|
|
async def send(
|
|
self,
|
|
chat_id: str,
|
|
content: str,
|
|
reply_to: Optional[str] = None,
|
|
metadata: Optional[Dict[str, Any]] = None,
|
|
) -> SendResult:
|
|
"""
|
|
Not used — HTTP request/response cycle handles delivery directly.
|
|
"""
|
|
return SendResult(success=False, error="API server uses HTTP request/response, not send()")
|
|
|
|
async def get_chat_info(self, chat_id: str) -> Dict[str, Any]:
|
|
"""Return basic info about the API server."""
|
|
return {
|
|
"name": "API Server",
|
|
"type": "api",
|
|
"host": self._host,
|
|
"port": self._port,
|
|
}
|