mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-29 06:31:32 +00:00
feat: add API server session controls
This commit is contained in:
parent
f0be32232d
commit
f7527b0fdb
2 changed files with 715 additions and 1 deletions
|
|
@ -8,6 +8,12 @@ Exposes an HTTP server with endpoints:
|
|||
- DELETE /v1/responses/{response_id} — Delete a stored response
|
||||
- GET /v1/models — lists hermes-agent as an available model
|
||||
- GET /v1/capabilities — machine-readable API capabilities for external UIs
|
||||
- GET /api/sessions — list client-visible Hermes sessions
|
||||
- POST /api/sessions — create an empty Hermes session
|
||||
- GET/PATCH/DELETE /api/sessions/{session_id} — read/update/delete a session
|
||||
- GET /api/sessions/{session_id}/messages — read session message history
|
||||
- POST /api/sessions/{session_id}/fork — branch a session using SessionDB lineage
|
||||
- POST /api/sessions/{session_id}/chat[/stream] — chat with a persisted session
|
||||
- POST /v1/runs — start a run, returns run_id immediately (202)
|
||||
- GET /v1/runs/{run_id} — retrieve current run status
|
||||
- GET /v1/runs/{run_id}/events — SSE stream of structured lifecycle events
|
||||
|
|
@ -1086,6 +1092,16 @@ class APIServerAdapter(BasePlatformAdapter):
|
|||
"run_approval_response": True,
|
||||
"tool_progress_events": True,
|
||||
"approval_events": True,
|
||||
"session_resources": True,
|
||||
"session_chat": True,
|
||||
"session_chat_streaming": True,
|
||||
"session_fork": True,
|
||||
"admin_config_rw": False,
|
||||
"jobs_admin": False,
|
||||
"memory_write_api": False,
|
||||
"skills_api": False,
|
||||
"audio_api": False,
|
||||
"realtime_voice": False,
|
||||
"session_continuity_header": "X-Hermes-Session-Id",
|
||||
"session_key_header": "X-Hermes-Session-Key",
|
||||
"cors": bool(self._cors_origins),
|
||||
|
|
@ -1103,6 +1119,15 @@ class APIServerAdapter(BasePlatformAdapter):
|
|||
"run_stop": {"method": "POST", "path": "/v1/runs/{run_id}/stop"},
|
||||
"skills": {"method": "GET", "path": "/v1/skills"},
|
||||
"toolsets": {"method": "GET", "path": "/v1/toolsets"},
|
||||
"sessions": {"method": "GET", "path": "/api/sessions"},
|
||||
"session_create": {"method": "POST", "path": "/api/sessions"},
|
||||
"session": {"method": "GET", "path": "/api/sessions/{session_id}"},
|
||||
"session_update": {"method": "PATCH", "path": "/api/sessions/{session_id}"},
|
||||
"session_delete": {"method": "DELETE", "path": "/api/sessions/{session_id}"},
|
||||
"session_messages": {"method": "GET", "path": "/api/sessions/{session_id}/messages"},
|
||||
"session_fork": {"method": "POST", "path": "/api/sessions/{session_id}/fork"},
|
||||
"session_chat": {"method": "POST", "path": "/api/sessions/{session_id}/chat"},
|
||||
"session_chat_stream": {"method": "POST", "path": "/api/sessions/{session_id}/chat/stream"},
|
||||
},
|
||||
})
|
||||
|
||||
|
|
@ -1193,6 +1218,439 @@ class APIServerAdapter(BasePlatformAdapter):
|
|||
"data": data,
|
||||
})
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# /api/sessions — thin client/session resource API
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
@staticmethod
|
||||
def _parse_nonnegative_int(value: Any, default: int, maximum: int) -> int:
|
||||
try:
|
||||
parsed = int(value)
|
||||
except (TypeError, ValueError):
|
||||
return default
|
||||
if parsed < 0:
|
||||
return default
|
||||
return min(parsed, maximum)
|
||||
|
||||
@staticmethod
|
||||
def _session_response(session: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Return a stable, client-safe session representation."""
|
||||
safe_keys = (
|
||||
"id", "source", "user_id", "model", "title", "started_at", "ended_at",
|
||||
"end_reason", "message_count", "tool_call_count", "input_tokens",
|
||||
"output_tokens", "cache_read_tokens", "cache_write_tokens",
|
||||
"reasoning_tokens", "estimated_cost_usd", "actual_cost_usd",
|
||||
"api_call_count", "parent_session_id", "last_active", "preview",
|
||||
"_lineage_root_id",
|
||||
)
|
||||
payload = {key: session.get(key) for key in safe_keys if key in session}
|
||||
# Avoid exposing full system prompts/model_config through the client API;
|
||||
# callers only need to know whether those snapshots exist.
|
||||
payload["has_system_prompt"] = bool(session.get("system_prompt"))
|
||||
payload["has_model_config"] = bool(session.get("model_config"))
|
||||
return payload
|
||||
|
||||
@staticmethod
|
||||
def _message_response(message: Dict[str, Any]) -> Dict[str, Any]:
|
||||
safe_keys = (
|
||||
"id", "session_id", "role", "content", "tool_call_id", "tool_calls",
|
||||
"tool_name", "timestamp", "token_count", "finish_reason", "reasoning",
|
||||
"reasoning_content",
|
||||
)
|
||||
return {key: message.get(key) for key in safe_keys if key in message}
|
||||
|
||||
async def _read_json_body(self, request: "web.Request") -> tuple[Dict[str, Any], Optional["web.Response"]]:
|
||||
try:
|
||||
body = await request.json()
|
||||
except Exception:
|
||||
return {}, web.json_response(_openai_error("Invalid JSON in request body"), status=400)
|
||||
if not isinstance(body, dict):
|
||||
return {}, web.json_response(_openai_error("Request body must be a JSON object"), status=400)
|
||||
return body, None
|
||||
|
||||
def _get_existing_session_or_404(self, session_id: str) -> tuple[Optional[Dict[str, Any]], Optional["web.Response"]]:
|
||||
db = self._ensure_session_db()
|
||||
if db is None:
|
||||
return None, web.json_response(_openai_error("Session database unavailable", code="session_db_unavailable"), status=503)
|
||||
session = db.get_session(session_id)
|
||||
if not session:
|
||||
return None, web.json_response(_openai_error(f"Session not found: {session_id}", code="session_not_found"), status=404)
|
||||
return session, None
|
||||
|
||||
def _conversation_history_for_session(self, session_id: str) -> List[Dict[str, Any]]:
|
||||
db = self._ensure_session_db()
|
||||
if db is None:
|
||||
return []
|
||||
try:
|
||||
return db.get_messages_as_conversation(session_id)
|
||||
except Exception as exc:
|
||||
logger.warning("Failed to load session history for %s: %s", session_id, exc)
|
||||
return []
|
||||
|
||||
async def _handle_list_sessions(self, request: "web.Request") -> "web.Response":
|
||||
"""GET /api/sessions — list persisted Hermes sessions."""
|
||||
auth_err = self._check_auth(request)
|
||||
if auth_err:
|
||||
return auth_err
|
||||
|
||||
db = self._ensure_session_db()
|
||||
if db is None:
|
||||
return web.json_response(_openai_error("Session database unavailable", code="session_db_unavailable"), status=503)
|
||||
|
||||
limit = self._parse_nonnegative_int(request.query.get("limit"), default=50, maximum=200)
|
||||
offset = self._parse_nonnegative_int(request.query.get("offset"), default=0, maximum=1_000_000)
|
||||
source = request.query.get("source") or None
|
||||
include_children = _coerce_request_bool(request.query.get("include_children"), default=False)
|
||||
sessions = db.list_sessions_rich(
|
||||
source=source,
|
||||
limit=limit,
|
||||
offset=offset,
|
||||
include_children=include_children,
|
||||
order_by_last_active=True,
|
||||
)
|
||||
return web.json_response({
|
||||
"object": "list",
|
||||
"data": [self._session_response(s) for s in sessions],
|
||||
"limit": limit,
|
||||
"offset": offset,
|
||||
"has_more": len(sessions) == limit,
|
||||
})
|
||||
|
||||
async def _handle_create_session(self, request: "web.Request") -> "web.Response":
|
||||
"""POST /api/sessions — create an empty Hermes session row."""
|
||||
auth_err = self._check_auth(request)
|
||||
if auth_err:
|
||||
return auth_err
|
||||
body, err = await self._read_json_body(request)
|
||||
if err:
|
||||
return err
|
||||
|
||||
db = self._ensure_session_db()
|
||||
if db is None:
|
||||
return web.json_response(_openai_error("Session database unavailable", code="session_db_unavailable"), status=503)
|
||||
|
||||
raw_id = body.get("id") or body.get("session_id")
|
||||
session_id = str(raw_id).strip() if raw_id else f"api_{int(time.time())}_{uuid.uuid4().hex[:8]}"
|
||||
if not session_id or re.search(r'[\r\n\x00]', session_id):
|
||||
return web.json_response(_openai_error("Invalid session ID", code="invalid_session_id"), status=400)
|
||||
if len(session_id) > self._MAX_SESSION_HEADER_LEN:
|
||||
return web.json_response(_openai_error("Session ID too long", code="invalid_session_id"), status=400)
|
||||
if db.get_session(session_id):
|
||||
return web.json_response(_openai_error(f"Session already exists: {session_id}", code="session_exists"), status=409)
|
||||
|
||||
model = body.get("model") or self._model_name
|
||||
system_prompt = body.get("system_prompt")
|
||||
if system_prompt is not None and not isinstance(system_prompt, str):
|
||||
return web.json_response(_openai_error("system_prompt must be a string", code="invalid_system_prompt"), status=400)
|
||||
db.create_session(session_id, "api_server", model=str(model) if model else None, system_prompt=system_prompt)
|
||||
title = body.get("title")
|
||||
if title is not None:
|
||||
try:
|
||||
db.set_session_title(session_id, str(title))
|
||||
except ValueError as exc:
|
||||
db.delete_session(session_id)
|
||||
return web.json_response(_openai_error(str(exc), code="invalid_title"), status=400)
|
||||
session = db.get_session(session_id) or {"id": session_id, "source": "api_server", "model": model, "title": title}
|
||||
return web.json_response({"object": "hermes.session", "session": self._session_response(session)}, status=201)
|
||||
|
||||
async def _handle_get_session(self, request: "web.Request") -> "web.Response":
|
||||
"""GET /api/sessions/{session_id}."""
|
||||
auth_err = self._check_auth(request)
|
||||
if auth_err:
|
||||
return auth_err
|
||||
session, err = self._get_existing_session_or_404(request.match_info["session_id"])
|
||||
if err:
|
||||
return err
|
||||
return web.json_response({"object": "hermes.session", "session": self._session_response(session)})
|
||||
|
||||
async def _handle_patch_session(self, request: "web.Request") -> "web.Response":
|
||||
"""PATCH /api/sessions/{session_id} — update client-safe session metadata."""
|
||||
auth_err = self._check_auth(request)
|
||||
if auth_err:
|
||||
return auth_err
|
||||
session_id = request.match_info["session_id"]
|
||||
session, err = self._get_existing_session_or_404(session_id)
|
||||
if err:
|
||||
return err
|
||||
body, err = await self._read_json_body(request)
|
||||
if err:
|
||||
return err
|
||||
allowed = {"title", "end_reason"}
|
||||
unknown = sorted(set(body) - allowed)
|
||||
if unknown:
|
||||
return web.json_response(_openai_error(f"Unsupported session fields: {', '.join(unknown)}", code="unsupported_session_field"), status=400)
|
||||
|
||||
db = self._ensure_session_db()
|
||||
if "title" in body:
|
||||
try:
|
||||
db.set_session_title(session_id, "" if body["title"] is None else str(body["title"]))
|
||||
except ValueError as exc:
|
||||
return web.json_response(_openai_error(str(exc), code="invalid_title"), status=400)
|
||||
if body.get("end_reason"):
|
||||
db.end_session(session_id, str(body["end_reason"]))
|
||||
session = db.get_session(session_id) or session
|
||||
return web.json_response({"object": "hermes.session", "session": self._session_response(session)})
|
||||
|
||||
async def _handle_delete_session(self, request: "web.Request") -> "web.Response":
|
||||
"""DELETE /api/sessions/{session_id}."""
|
||||
auth_err = self._check_auth(request)
|
||||
if auth_err:
|
||||
return auth_err
|
||||
session_id = request.match_info["session_id"]
|
||||
session, err = self._get_existing_session_or_404(session_id)
|
||||
if err:
|
||||
return err
|
||||
db = self._ensure_session_db()
|
||||
deleted = db.delete_session(session_id)
|
||||
return web.json_response({"object": "hermes.session.deleted", "id": session_id, "deleted": bool(deleted)})
|
||||
|
||||
async def _handle_session_messages(self, request: "web.Request") -> "web.Response":
|
||||
"""GET /api/sessions/{session_id}/messages."""
|
||||
auth_err = self._check_auth(request)
|
||||
if auth_err:
|
||||
return auth_err
|
||||
session_id = request.match_info["session_id"]
|
||||
_, err = self._get_existing_session_or_404(session_id)
|
||||
if err:
|
||||
return err
|
||||
db = self._ensure_session_db()
|
||||
messages = db.get_messages(session_id)
|
||||
return web.json_response({
|
||||
"object": "list",
|
||||
"session_id": session_id,
|
||||
"data": [self._message_response(m) for m in messages],
|
||||
})
|
||||
|
||||
async def _handle_fork_session(self, request: "web.Request") -> "web.Response":
|
||||
"""POST /api/sessions/{session_id}/fork — branch via current SessionDB primitives."""
|
||||
auth_err = self._check_auth(request)
|
||||
if auth_err:
|
||||
return auth_err
|
||||
source_id = request.match_info["session_id"]
|
||||
source, err = self._get_existing_session_or_404(source_id)
|
||||
if err:
|
||||
return err
|
||||
body, err = await self._read_json_body(request)
|
||||
if err:
|
||||
return err
|
||||
db = self._ensure_session_db()
|
||||
fork_id = str(body.get("id") or body.get("session_id") or f"api_{int(time.time())}_{uuid.uuid4().hex[:8]}").strip()
|
||||
if not fork_id or re.search(r'[\r\n\x00]', fork_id):
|
||||
return web.json_response(_openai_error("Invalid session ID", code="invalid_session_id"), status=400)
|
||||
if db.get_session(fork_id):
|
||||
return web.json_response(_openai_error(f"Session already exists: {fork_id}", code="session_exists"), status=409)
|
||||
|
||||
# Match the CLI /branch semantics: mark the original as branched, then
|
||||
# create a child session that carries the transcript forward. This uses
|
||||
# SessionDB's native parent_session_id/end_reason visibility model rather
|
||||
# than inventing a parallel fork store.
|
||||
db.end_session(source_id, "branched")
|
||||
db.create_session(
|
||||
fork_id,
|
||||
"api_server",
|
||||
model=source.get("model"),
|
||||
system_prompt=source.get("system_prompt"),
|
||||
parent_session_id=source_id,
|
||||
)
|
||||
messages = db.get_messages(source_id)
|
||||
db.replace_messages(fork_id, messages)
|
||||
title = body.get("title")
|
||||
if title is None:
|
||||
base = source.get("title") or "fork"
|
||||
try:
|
||||
title = db.get_next_title_in_lineage(base)
|
||||
except Exception:
|
||||
title = f"{base} fork"
|
||||
try:
|
||||
db.set_session_title(fork_id, str(title))
|
||||
except ValueError as exc:
|
||||
return web.json_response(_openai_error(str(exc), code="invalid_title"), status=400)
|
||||
fork = db.get_session(fork_id) or {"id": fork_id, "parent_session_id": source_id}
|
||||
return web.json_response({"object": "hermes.session", "session": self._session_response(fork)}, status=201)
|
||||
|
||||
async def _handle_session_chat(self, request: "web.Request") -> "web.Response":
|
||||
"""POST /api/sessions/{session_id}/chat — one synchronous agent turn."""
|
||||
auth_err = self._check_auth(request)
|
||||
if auth_err:
|
||||
return auth_err
|
||||
gateway_session_key, key_err = self._parse_session_key_header(request)
|
||||
if key_err is not None:
|
||||
return key_err
|
||||
session_id = request.match_info["session_id"]
|
||||
_, err = self._get_existing_session_or_404(session_id)
|
||||
if err:
|
||||
return err
|
||||
body, err = await self._read_json_body(request)
|
||||
if err:
|
||||
return err
|
||||
user_message = body.get("message") or body.get("input")
|
||||
if not _content_has_visible_payload(user_message):
|
||||
return web.json_response(_openai_error("Missing 'message' field", code="missing_message"), status=400)
|
||||
system_prompt = body.get("system_message") or body.get("instructions")
|
||||
if system_prompt is not None and not isinstance(system_prompt, str):
|
||||
return web.json_response(_openai_error("system_message must be a string", code="invalid_system_message"), status=400)
|
||||
history = self._conversation_history_for_session(session_id)
|
||||
result, usage = await self._run_agent(
|
||||
user_message=user_message,
|
||||
conversation_history=history,
|
||||
ephemeral_system_prompt=system_prompt,
|
||||
session_id=session_id,
|
||||
gateway_session_key=gateway_session_key,
|
||||
)
|
||||
effective_session_id = result.get("session_id") if isinstance(result, dict) else session_id
|
||||
final_response = result.get("final_response", "") if isinstance(result, dict) else ""
|
||||
headers = {"X-Hermes-Session-Id": effective_session_id or session_id}
|
||||
if gateway_session_key:
|
||||
headers["X-Hermes-Session-Key"] = gateway_session_key
|
||||
return web.json_response(
|
||||
{
|
||||
"object": "hermes.session.chat.completion",
|
||||
"session_id": effective_session_id or session_id,
|
||||
"message": {"role": "assistant", "content": final_response},
|
||||
"usage": usage,
|
||||
},
|
||||
headers=headers,
|
||||
)
|
||||
|
||||
async def _handle_session_chat_stream(self, request: "web.Request") -> "web.StreamResponse":
|
||||
"""POST /api/sessions/{session_id}/chat/stream — SSE wrapper over _run_agent."""
|
||||
auth_err = self._check_auth(request)
|
||||
if auth_err:
|
||||
return auth_err
|
||||
gateway_session_key, key_err = self._parse_session_key_header(request)
|
||||
if key_err is not None:
|
||||
return key_err
|
||||
session_id = request.match_info["session_id"]
|
||||
_, err = self._get_existing_session_or_404(session_id)
|
||||
if err:
|
||||
return err
|
||||
body, err = await self._read_json_body(request)
|
||||
if err:
|
||||
return err
|
||||
user_message = body.get("message") or body.get("input")
|
||||
if not _content_has_visible_payload(user_message):
|
||||
return web.json_response(_openai_error("Missing 'message' field", code="missing_message"), status=400)
|
||||
system_prompt = body.get("system_message") or body.get("instructions")
|
||||
if system_prompt is not None and not isinstance(system_prompt, str):
|
||||
return web.json_response(_openai_error("system_message must be a string", code="invalid_system_message"), status=400)
|
||||
|
||||
loop = asyncio.get_running_loop()
|
||||
queue: "asyncio.Queue[Optional[tuple[str, Dict[str, Any]]]]" = asyncio.Queue()
|
||||
message_id = f"msg_{uuid.uuid4().hex}"
|
||||
run_id = f"run_{uuid.uuid4().hex}"
|
||||
seq = 0
|
||||
|
||||
def _event_payload(name: str, payload: Dict[str, Any]) -> tuple[str, Dict[str, Any]]:
|
||||
nonlocal seq
|
||||
seq += 1
|
||||
payload.setdefault("session_id", session_id)
|
||||
payload.setdefault("run_id", run_id)
|
||||
payload.setdefault("seq", seq)
|
||||
payload.setdefault("ts", time.time())
|
||||
return name, payload
|
||||
|
||||
def _enqueue(name: str, payload: Dict[str, Any]) -> None:
|
||||
event = _event_payload(name, payload)
|
||||
try:
|
||||
running_loop = asyncio.get_running_loop()
|
||||
except RuntimeError:
|
||||
running_loop = None
|
||||
try:
|
||||
if running_loop is loop:
|
||||
queue.put_nowait(event)
|
||||
else:
|
||||
loop.call_soon_threadsafe(queue.put_nowait, event)
|
||||
except RuntimeError:
|
||||
pass
|
||||
|
||||
def _delta(delta: str) -> None:
|
||||
if delta:
|
||||
_enqueue("assistant.delta", {"message_id": message_id, "delta": delta})
|
||||
|
||||
def _tool_progress(event_type: str, tool_name: str = None, preview: str = None, args=None, **kwargs) -> None:
|
||||
if event_type == "reasoning.available":
|
||||
_enqueue("tool.progress", {"message_id": message_id, "tool_name": tool_name or "_thinking", "delta": preview or ""})
|
||||
elif event_type in {"tool.started", "tool.completed", "tool.failed"}:
|
||||
event_name = event_type.replace("tool.", "tool.")
|
||||
_enqueue(event_name, {"message_id": message_id, "tool_name": tool_name, "preview": preview, "args": args})
|
||||
|
||||
async def _run_and_signal() -> None:
|
||||
try:
|
||||
await queue.put(_event_payload("run.started", {"user_message": {"role": "user", "content": user_message}}))
|
||||
await queue.put(_event_payload("message.started", {"message": {"id": message_id, "role": "assistant"}}))
|
||||
history = self._conversation_history_for_session(session_id)
|
||||
result, usage = await self._run_agent(
|
||||
user_message=user_message,
|
||||
conversation_history=history,
|
||||
ephemeral_system_prompt=system_prompt,
|
||||
session_id=session_id,
|
||||
stream_delta_callback=_delta,
|
||||
tool_progress_callback=_tool_progress,
|
||||
gateway_session_key=gateway_session_key,
|
||||
)
|
||||
final_response = result.get("final_response", "") if isinstance(result, dict) else ""
|
||||
effective_session_id = result.get("session_id", session_id) if isinstance(result, dict) else session_id
|
||||
await queue.put(_event_payload("assistant.completed", {
|
||||
"session_id": effective_session_id,
|
||||
"message_id": message_id,
|
||||
"content": final_response,
|
||||
"completed": True,
|
||||
"partial": False,
|
||||
"interrupted": False,
|
||||
}))
|
||||
await queue.put(_event_payload("run.completed", {
|
||||
"session_id": effective_session_id,
|
||||
"message_id": message_id,
|
||||
"completed": True,
|
||||
"usage": usage,
|
||||
}))
|
||||
except Exception as exc:
|
||||
logger.exception("[api_server] session chat stream failed")
|
||||
await queue.put(_event_payload("error", {"message": str(exc)}))
|
||||
finally:
|
||||
await queue.put(_event_payload("done", {}))
|
||||
await queue.put(None)
|
||||
|
||||
task = asyncio.create_task(_run_and_signal())
|
||||
try:
|
||||
self._background_tasks.add(task)
|
||||
except TypeError:
|
||||
pass
|
||||
if hasattr(task, "add_done_callback"):
|
||||
task.add_done_callback(self._background_tasks.discard)
|
||||
|
||||
headers = {
|
||||
"Content-Type": "text/event-stream",
|
||||
"Cache-Control": "no-cache",
|
||||
"X-Accel-Buffering": "no",
|
||||
"X-Hermes-Session-Id": session_id,
|
||||
}
|
||||
if gateway_session_key:
|
||||
headers["X-Hermes-Session-Key"] = gateway_session_key
|
||||
response = web.StreamResponse(status=200, headers=headers)
|
||||
await response.prepare(request)
|
||||
last_write = time.monotonic()
|
||||
try:
|
||||
while True:
|
||||
try:
|
||||
item = await asyncio.wait_for(queue.get(), timeout=CHAT_COMPLETIONS_SSE_KEEPALIVE_SECONDS)
|
||||
except asyncio.TimeoutError:
|
||||
await response.write(b": keepalive\n\n")
|
||||
last_write = time.monotonic()
|
||||
continue
|
||||
if item is None:
|
||||
break
|
||||
name, payload = item
|
||||
data = json.dumps(payload, ensure_ascii=False)
|
||||
await response.write(f"event: {name}\ndata: {data}\n\n".encode("utf-8"))
|
||||
last_write = time.monotonic()
|
||||
except (asyncio.CancelledError, ConnectionResetError):
|
||||
task.cancel()
|
||||
raise
|
||||
except Exception as exc:
|
||||
logger.debug("[api_server] session SSE stream error: %s", exc)
|
||||
return response
|
||||
|
||||
async def _handle_chat_completions(self, request: "web.Request") -> "web.Response":
|
||||
"""POST /v1/chat/completions — OpenAI Chat Completions format."""
|
||||
auth_err = self._check_auth(request)
|
||||
|
|
@ -3575,7 +4033,7 @@ class APIServerAdapter(BasePlatformAdapter):
|
|||
try:
|
||||
mws = [mw for mw in (cors_middleware, body_limit_middleware, security_headers_middleware) if mw is not None]
|
||||
self._app = web.Application(middlewares=mws, client_max_size=MAX_REQUEST_BYTES)
|
||||
self._app["api_server_adapter"] = self
|
||||
assert self._app is not None
|
||||
self._app.router.add_get("/health", self._handle_health)
|
||||
self._app.router.add_get("/health/detailed", self._handle_health_detailed)
|
||||
self._app.router.add_get("/v1/health", self._handle_health)
|
||||
|
|
@ -3583,6 +4041,16 @@ class APIServerAdapter(BasePlatformAdapter):
|
|||
self._app.router.add_get("/v1/capabilities", self._handle_capabilities)
|
||||
self._app.router.add_get("/v1/skills", self._handle_skills)
|
||||
self._app.router.add_get("/v1/toolsets", self._handle_toolsets)
|
||||
# Session/client control surface (thin wrappers over SessionDB + _run_agent)
|
||||
self._app.router.add_get("/api/sessions", self._handle_list_sessions)
|
||||
self._app.router.add_post("/api/sessions", self._handle_create_session)
|
||||
self._app.router.add_get("/api/sessions/{session_id}", self._handle_get_session)
|
||||
self._app.router.add_patch("/api/sessions/{session_id}", self._handle_patch_session)
|
||||
self._app.router.add_delete("/api/sessions/{session_id}", self._handle_delete_session)
|
||||
self._app.router.add_get("/api/sessions/{session_id}/messages", self._handle_session_messages)
|
||||
self._app.router.add_post("/api/sessions/{session_id}/fork", self._handle_fork_session)
|
||||
self._app.router.add_post("/api/sessions/{session_id}/chat", self._handle_session_chat)
|
||||
self._app.router.add_post("/api/sessions/{session_id}/chat/stream", self._handle_session_chat_stream)
|
||||
self._app.router.add_post("/v1/chat/completions", self._handle_chat_completions)
|
||||
self._app.router.add_post("/v1/responses", self._handle_responses)
|
||||
self._app.router.add_get("/v1/responses/{response_id}", self._handle_get_response)
|
||||
|
|
@ -3602,6 +4070,12 @@ class APIServerAdapter(BasePlatformAdapter):
|
|||
self._app.router.add_get("/v1/runs/{run_id}/events", self._handle_run_events)
|
||||
self._app.router.add_post("/v1/runs/{run_id}/approval", self._handle_run_approval)
|
||||
self._app.router.add_post("/v1/runs/{run_id}/stop", self._handle_stop_run)
|
||||
# Store the adapter after native routes are registered. Local Hermes-Relay
|
||||
# bootstrap shims use this key as a feature-detection hook; registering
|
||||
# native routes first lets those shims no-op instead of shadowing the
|
||||
# upstream session-control handlers.
|
||||
self._app["api_server_adapter"] = self
|
||||
|
||||
# Start background sweep to clean up orphaned (unconsumed) run streams
|
||||
sweep_task = asyncio.create_task(self._sweep_orphaned_runs())
|
||||
try:
|
||||
|
|
|
|||
240
tests/gateway/test_session_api.py
Normal file
240
tests/gateway/test_session_api.py
Normal file
|
|
@ -0,0 +1,240 @@
|
|||
"""Focused tests for API server session-control endpoints."""
|
||||
|
||||
import asyncio
|
||||
from unittest.mock import AsyncMock, patch
|
||||
|
||||
import pytest
|
||||
from aiohttp import web
|
||||
from aiohttp.test_utils import TestClient, TestServer
|
||||
|
||||
from gateway.config import PlatformConfig
|
||||
from gateway.platforms.api_server import APIServerAdapter
|
||||
from hermes_state import SessionDB
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def session_db(tmp_path):
|
||||
db = SessionDB(tmp_path / "state.db")
|
||||
try:
|
||||
yield db
|
||||
finally:
|
||||
close = getattr(db, "close", None)
|
||||
if callable(close):
|
||||
close()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def adapter(session_db):
|
||||
adapter = APIServerAdapter(PlatformConfig(enabled=True))
|
||||
adapter._session_db = session_db
|
||||
return adapter
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def auth_adapter(session_db):
|
||||
adapter = APIServerAdapter(PlatformConfig(enabled=True, extra={"key": "sk-test"}))
|
||||
adapter._session_db = session_db
|
||||
return adapter
|
||||
|
||||
|
||||
def _create_session_app(adapter: APIServerAdapter) -> web.Application:
|
||||
app = web.Application()
|
||||
app.router.add_get("/v1/capabilities", adapter._handle_capabilities)
|
||||
app.router.add_get("/api/sessions", adapter._handle_list_sessions)
|
||||
app.router.add_post("/api/sessions", adapter._handle_create_session)
|
||||
app.router.add_get("/api/sessions/{session_id}", adapter._handle_get_session)
|
||||
app.router.add_patch("/api/sessions/{session_id}", adapter._handle_patch_session)
|
||||
app.router.add_delete("/api/sessions/{session_id}", adapter._handle_delete_session)
|
||||
app.router.add_get("/api/sessions/{session_id}/messages", adapter._handle_session_messages)
|
||||
app.router.add_post("/api/sessions/{session_id}/fork", adapter._handle_fork_session)
|
||||
app.router.add_post("/api/sessions/{session_id}/chat", adapter._handle_session_chat)
|
||||
app.router.add_post("/api/sessions/{session_id}/chat/stream", adapter._handle_session_chat_stream)
|
||||
return app
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_capabilities_advertises_session_control_surface(adapter):
|
||||
app = _create_session_app(adapter)
|
||||
async with TestClient(TestServer(app)) as cli:
|
||||
resp = await cli.get("/v1/capabilities")
|
||||
assert resp.status == 200
|
||||
data = await resp.json()
|
||||
|
||||
features = data["features"]
|
||||
assert features["session_resources"] is True
|
||||
assert features["session_chat"] is True
|
||||
assert features["session_chat_streaming"] is True
|
||||
assert features["session_fork"] is True
|
||||
assert features["admin_config_rw"] is False
|
||||
assert features["memory_write_api"] is False
|
||||
assert features["skills_api"] is False
|
||||
assert features["realtime_voice"] is False
|
||||
assert data["endpoints"]["sessions"] == {"method": "GET", "path": "/api/sessions"}
|
||||
assert data["endpoints"]["session_chat_stream"] == {
|
||||
"method": "POST",
|
||||
"path": "/api/sessions/{session_id}/chat/stream",
|
||||
}
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_session_crud_and_message_history(adapter, session_db):
|
||||
app = _create_session_app(adapter)
|
||||
async with TestClient(TestServer(app)) as cli:
|
||||
create_resp = await cli.post("/api/sessions", json={"title": "Mobile chat", "model": "test-model"})
|
||||
assert create_resp.status == 201
|
||||
created = await create_resp.json()
|
||||
session_id = created["session"]["id"]
|
||||
assert created["object"] == "hermes.session"
|
||||
assert created["session"]["title"] == "Mobile chat"
|
||||
|
||||
session_db.append_message(session_id, "user", "hello from phone")
|
||||
session_db.append_message(session_id, "assistant", "hello from hermes")
|
||||
|
||||
list_resp = await cli.get("/api/sessions?limit=10&offset=0")
|
||||
assert list_resp.status == 200
|
||||
listed = await list_resp.json()
|
||||
assert listed["object"] == "list"
|
||||
assert [s["id"] for s in listed["data"]] == [session_id]
|
||||
assert listed["data"][0]["message_count"] == 2
|
||||
|
||||
get_resp = await cli.get(f"/api/sessions/{session_id}")
|
||||
assert get_resp.status == 200
|
||||
got = await get_resp.json()
|
||||
assert got["session"]["id"] == session_id
|
||||
assert got["session"]["message_count"] == 2
|
||||
|
||||
messages_resp = await cli.get(f"/api/sessions/{session_id}/messages")
|
||||
assert messages_resp.status == 200
|
||||
messages = await messages_resp.json()
|
||||
assert messages["object"] == "list"
|
||||
assert [m["role"] for m in messages["data"]] == ["user", "assistant"]
|
||||
assert messages["data"][0]["content"] == "hello from phone"
|
||||
|
||||
patch_resp = await cli.patch(f"/api/sessions/{session_id}", json={"title": "Renamed"})
|
||||
assert patch_resp.status == 200
|
||||
patched = await patch_resp.json()
|
||||
assert patched["session"]["title"] == "Renamed"
|
||||
|
||||
delete_resp = await cli.delete(f"/api/sessions/{session_id}")
|
||||
assert delete_resp.status == 200
|
||||
deleted = await delete_resp.json()
|
||||
assert deleted == {"object": "hermes.session.deleted", "id": session_id, "deleted": True}
|
||||
assert session_db.get_session(session_id) is None
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_session_fork_uses_current_sessiondb_branch_primitives(adapter, session_db):
|
||||
source_id = session_db.create_session("source-session", "api_server", model="test-model")
|
||||
session_db.set_session_title(source_id, "Original")
|
||||
session_db.append_message(source_id, "user", "first path")
|
||||
session_db.append_message(source_id, "assistant", "answer")
|
||||
|
||||
app = _create_session_app(adapter)
|
||||
async with TestClient(TestServer(app)) as cli:
|
||||
resp = await cli.post(f"/api/sessions/{source_id}/fork", json={"title": "Alternative"})
|
||||
assert resp.status == 201
|
||||
payload = await resp.json()
|
||||
|
||||
fork = payload["session"]
|
||||
assert payload["object"] == "hermes.session"
|
||||
assert fork["id"] != source_id
|
||||
assert fork["parent_session_id"] == source_id
|
||||
assert fork["title"] == "Alternative"
|
||||
assert [m["content"] for m in session_db.get_messages(fork["id"])] == ["first path", "answer"]
|
||||
assert session_db.get_session(source_id)["end_reason"] == "branched"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_session_chat_loads_history_and_preserves_session_headers(auth_adapter, session_db):
|
||||
session_id = session_db.create_session("chat-session", "api_server")
|
||||
session_db.set_session_title(session_id, "Chat")
|
||||
session_db.append_message(session_id, "user", "earlier")
|
||||
session_db.append_message(session_id, "assistant", "prior answer")
|
||||
|
||||
mock_run = AsyncMock(return_value=({"final_response": "fresh answer", "session_id": session_id}, {"total_tokens": 3}))
|
||||
app = _create_session_app(auth_adapter)
|
||||
with patch.object(auth_adapter, "_run_agent", mock_run):
|
||||
async with TestClient(TestServer(app)) as cli:
|
||||
resp = await cli.post(
|
||||
f"/api/sessions/{session_id}/chat",
|
||||
json={"message": "next", "system_message": "stay focused"},
|
||||
headers={"Authorization": "Bearer sk-test", "X-Hermes-Session-Key": "client-42"},
|
||||
)
|
||||
assert resp.status == 200
|
||||
payload = await resp.json()
|
||||
|
||||
assert resp.headers["X-Hermes-Session-Id"] == session_id
|
||||
assert resp.headers["X-Hermes-Session-Key"] == "client-42"
|
||||
assert payload["object"] == "hermes.session.chat.completion"
|
||||
assert payload["session_id"] == session_id
|
||||
assert payload["message"]["role"] == "assistant"
|
||||
assert payload["message"]["content"] == "fresh answer"
|
||||
mock_run.assert_awaited_once()
|
||||
_, kwargs = mock_run.call_args
|
||||
assert kwargs["session_id"] == session_id
|
||||
assert kwargs["gateway_session_key"] == "client-42"
|
||||
assert kwargs["ephemeral_system_prompt"] == "stay focused"
|
||||
assert kwargs["conversation_history"] == [
|
||||
{"role": "user", "content": "earlier"},
|
||||
{"role": "assistant", "content": "prior answer"},
|
||||
]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_session_chat_stream_emits_lifecycle_events_and_keepalive_safe_shape(adapter, session_db):
|
||||
session_id = session_db.create_session("stream-session", "api_server")
|
||||
session_db.set_session_title(session_id, "Stream")
|
||||
|
||||
async def fake_run(**kwargs):
|
||||
kwargs["stream_delta_callback"]("Hello")
|
||||
kwargs["stream_delta_callback"](" world")
|
||||
kwargs["tool_progress_callback"]("reasoning.available", tool_name="_thinking", preview="thinking")
|
||||
return {"final_response": "Hello world", "session_id": session_id}, {"total_tokens": 2}
|
||||
|
||||
app = _create_session_app(adapter)
|
||||
with patch.object(adapter, "_run_agent", side_effect=fake_run):
|
||||
async with TestClient(TestServer(app)) as cli:
|
||||
resp = await cli.post(f"/api/sessions/{session_id}/chat/stream", json={"message": "stream please"})
|
||||
assert resp.status == 200
|
||||
assert resp.headers["Content-Type"].startswith("text/event-stream")
|
||||
body = await resp.text()
|
||||
|
||||
assert "event: run.started" in body
|
||||
assert "event: message.started" in body
|
||||
assert "event: assistant.delta" in body
|
||||
assert "Hello world" in body
|
||||
assert "event: tool.progress" in body
|
||||
assert "event: assistant.completed" in body
|
||||
assert "event: run.completed" in body
|
||||
assert "event: done" in body
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_session_endpoints_require_auth_when_key_configured(auth_adapter):
|
||||
app = _create_session_app(auth_adapter)
|
||||
async with TestClient(TestServer(app)) as cli:
|
||||
resp = await cli.get("/api/sessions")
|
||||
assert resp.status == 401
|
||||
body = await resp.json()
|
||||
assert body["error"]["code"] == "invalid_api_key"
|
||||
|
||||
ok = await cli.get("/api/sessions", headers={"Authorization": "Bearer sk-test"})
|
||||
assert ok.status == 200
|
||||
data = await ok.json()
|
||||
assert data["object"] == "list"
|
||||
assert data["data"] == []
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_session_header_rejected_without_api_key(adapter, session_db):
|
||||
session_id = session_db.create_session("unsafe-session", "api_server")
|
||||
app = _create_session_app(adapter)
|
||||
async with TestClient(TestServer(app)) as cli:
|
||||
resp = await cli.post(
|
||||
f"/api/sessions/{session_id}/chat",
|
||||
json={"message": "hello"},
|
||||
headers={"X-Hermes-Session-Key": "client-42"},
|
||||
)
|
||||
assert resp.status == 403
|
||||
data = await resp.json()
|
||||
assert "X-Hermes-Session-Key requires API key" in data["error"]["message"]
|
||||
Loading…
Add table
Add a link
Reference in a new issue