From f7527b0fdb54f01691547df03fc65a6d367f9fde Mon Sep 17 00:00:00 2001 From: Bailey Dixon Date: Wed, 20 May 2026 08:45:55 -0400 Subject: [PATCH] feat: add API server session controls --- gateway/platforms/api_server.py | 476 +++++++++++++++++++++++++++++- tests/gateway/test_session_api.py | 240 +++++++++++++++ 2 files changed, 715 insertions(+), 1 deletion(-) create mode 100644 tests/gateway/test_session_api.py diff --git a/gateway/platforms/api_server.py b/gateway/platforms/api_server.py index 82486d4ed9d..1a132fdec16 100644 --- a/gateway/platforms/api_server.py +++ b/gateway/platforms/api_server.py @@ -8,6 +8,12 @@ Exposes an HTTP server with endpoints: - DELETE /v1/responses/{response_id} — Delete a stored response - GET /v1/models — lists hermes-agent as an available model - GET /v1/capabilities — machine-readable API capabilities for external UIs +- GET /api/sessions — list client-visible Hermes sessions +- POST /api/sessions — create an empty Hermes session +- GET/PATCH/DELETE /api/sessions/{session_id} — read/update/delete a session +- GET /api/sessions/{session_id}/messages — read session message history +- POST /api/sessions/{session_id}/fork — branch a session using SessionDB lineage +- POST /api/sessions/{session_id}/chat[/stream] — chat with a persisted session - POST /v1/runs — start a run, returns run_id immediately (202) - GET /v1/runs/{run_id} — retrieve current run status - GET /v1/runs/{run_id}/events — SSE stream of structured lifecycle events @@ -1086,6 +1092,16 @@ class APIServerAdapter(BasePlatformAdapter): "run_approval_response": True, "tool_progress_events": True, "approval_events": True, + "session_resources": True, + "session_chat": True, + "session_chat_streaming": True, + "session_fork": True, + "admin_config_rw": False, + "jobs_admin": False, + "memory_write_api": False, + "skills_api": False, + "audio_api": False, + "realtime_voice": False, "session_continuity_header": "X-Hermes-Session-Id", "session_key_header": "X-Hermes-Session-Key", "cors": bool(self._cors_origins), @@ -1103,6 +1119,15 @@ class APIServerAdapter(BasePlatformAdapter): "run_stop": {"method": "POST", "path": "/v1/runs/{run_id}/stop"}, "skills": {"method": "GET", "path": "/v1/skills"}, "toolsets": {"method": "GET", "path": "/v1/toolsets"}, + "sessions": {"method": "GET", "path": "/api/sessions"}, + "session_create": {"method": "POST", "path": "/api/sessions"}, + "session": {"method": "GET", "path": "/api/sessions/{session_id}"}, + "session_update": {"method": "PATCH", "path": "/api/sessions/{session_id}"}, + "session_delete": {"method": "DELETE", "path": "/api/sessions/{session_id}"}, + "session_messages": {"method": "GET", "path": "/api/sessions/{session_id}/messages"}, + "session_fork": {"method": "POST", "path": "/api/sessions/{session_id}/fork"}, + "session_chat": {"method": "POST", "path": "/api/sessions/{session_id}/chat"}, + "session_chat_stream": {"method": "POST", "path": "/api/sessions/{session_id}/chat/stream"}, }, }) @@ -1193,6 +1218,439 @@ class APIServerAdapter(BasePlatformAdapter): "data": data, }) + # ------------------------------------------------------------------ + # /api/sessions — thin client/session resource API + # ------------------------------------------------------------------ + + @staticmethod + def _parse_nonnegative_int(value: Any, default: int, maximum: int) -> int: + try: + parsed = int(value) + except (TypeError, ValueError): + return default + if parsed < 0: + return default + return min(parsed, maximum) + + @staticmethod + def _session_response(session: Dict[str, Any]) -> Dict[str, Any]: + """Return a stable, client-safe session representation.""" + safe_keys = ( + "id", "source", "user_id", "model", "title", "started_at", "ended_at", + "end_reason", "message_count", "tool_call_count", "input_tokens", + "output_tokens", "cache_read_tokens", "cache_write_tokens", + "reasoning_tokens", "estimated_cost_usd", "actual_cost_usd", + "api_call_count", "parent_session_id", "last_active", "preview", + "_lineage_root_id", + ) + payload = {key: session.get(key) for key in safe_keys if key in session} + # Avoid exposing full system prompts/model_config through the client API; + # callers only need to know whether those snapshots exist. + payload["has_system_prompt"] = bool(session.get("system_prompt")) + payload["has_model_config"] = bool(session.get("model_config")) + return payload + + @staticmethod + def _message_response(message: Dict[str, Any]) -> Dict[str, Any]: + safe_keys = ( + "id", "session_id", "role", "content", "tool_call_id", "tool_calls", + "tool_name", "timestamp", "token_count", "finish_reason", "reasoning", + "reasoning_content", + ) + return {key: message.get(key) for key in safe_keys if key in message} + + async def _read_json_body(self, request: "web.Request") -> tuple[Dict[str, Any], Optional["web.Response"]]: + try: + body = await request.json() + except Exception: + return {}, web.json_response(_openai_error("Invalid JSON in request body"), status=400) + if not isinstance(body, dict): + return {}, web.json_response(_openai_error("Request body must be a JSON object"), status=400) + return body, None + + def _get_existing_session_or_404(self, session_id: str) -> tuple[Optional[Dict[str, Any]], Optional["web.Response"]]: + db = self._ensure_session_db() + if db is None: + return None, web.json_response(_openai_error("Session database unavailable", code="session_db_unavailable"), status=503) + session = db.get_session(session_id) + if not session: + return None, web.json_response(_openai_error(f"Session not found: {session_id}", code="session_not_found"), status=404) + return session, None + + def _conversation_history_for_session(self, session_id: str) -> List[Dict[str, Any]]: + db = self._ensure_session_db() + if db is None: + return [] + try: + return db.get_messages_as_conversation(session_id) + except Exception as exc: + logger.warning("Failed to load session history for %s: %s", session_id, exc) + return [] + + async def _handle_list_sessions(self, request: "web.Request") -> "web.Response": + """GET /api/sessions — list persisted Hermes sessions.""" + auth_err = self._check_auth(request) + if auth_err: + return auth_err + + db = self._ensure_session_db() + if db is None: + return web.json_response(_openai_error("Session database unavailable", code="session_db_unavailable"), status=503) + + limit = self._parse_nonnegative_int(request.query.get("limit"), default=50, maximum=200) + offset = self._parse_nonnegative_int(request.query.get("offset"), default=0, maximum=1_000_000) + source = request.query.get("source") or None + include_children = _coerce_request_bool(request.query.get("include_children"), default=False) + sessions = db.list_sessions_rich( + source=source, + limit=limit, + offset=offset, + include_children=include_children, + order_by_last_active=True, + ) + return web.json_response({ + "object": "list", + "data": [self._session_response(s) for s in sessions], + "limit": limit, + "offset": offset, + "has_more": len(sessions) == limit, + }) + + async def _handle_create_session(self, request: "web.Request") -> "web.Response": + """POST /api/sessions — create an empty Hermes session row.""" + auth_err = self._check_auth(request) + if auth_err: + return auth_err + body, err = await self._read_json_body(request) + if err: + return err + + db = self._ensure_session_db() + if db is None: + return web.json_response(_openai_error("Session database unavailable", code="session_db_unavailable"), status=503) + + raw_id = body.get("id") or body.get("session_id") + session_id = str(raw_id).strip() if raw_id else f"api_{int(time.time())}_{uuid.uuid4().hex[:8]}" + if not session_id or re.search(r'[\r\n\x00]', session_id): + return web.json_response(_openai_error("Invalid session ID", code="invalid_session_id"), status=400) + if len(session_id) > self._MAX_SESSION_HEADER_LEN: + return web.json_response(_openai_error("Session ID too long", code="invalid_session_id"), status=400) + if db.get_session(session_id): + return web.json_response(_openai_error(f"Session already exists: {session_id}", code="session_exists"), status=409) + + model = body.get("model") or self._model_name + system_prompt = body.get("system_prompt") + if system_prompt is not None and not isinstance(system_prompt, str): + return web.json_response(_openai_error("system_prompt must be a string", code="invalid_system_prompt"), status=400) + db.create_session(session_id, "api_server", model=str(model) if model else None, system_prompt=system_prompt) + title = body.get("title") + if title is not None: + try: + db.set_session_title(session_id, str(title)) + except ValueError as exc: + db.delete_session(session_id) + return web.json_response(_openai_error(str(exc), code="invalid_title"), status=400) + session = db.get_session(session_id) or {"id": session_id, "source": "api_server", "model": model, "title": title} + return web.json_response({"object": "hermes.session", "session": self._session_response(session)}, status=201) + + async def _handle_get_session(self, request: "web.Request") -> "web.Response": + """GET /api/sessions/{session_id}.""" + auth_err = self._check_auth(request) + if auth_err: + return auth_err + session, err = self._get_existing_session_or_404(request.match_info["session_id"]) + if err: + return err + return web.json_response({"object": "hermes.session", "session": self._session_response(session)}) + + async def _handle_patch_session(self, request: "web.Request") -> "web.Response": + """PATCH /api/sessions/{session_id} — update client-safe session metadata.""" + auth_err = self._check_auth(request) + if auth_err: + return auth_err + session_id = request.match_info["session_id"] + session, err = self._get_existing_session_or_404(session_id) + if err: + return err + body, err = await self._read_json_body(request) + if err: + return err + allowed = {"title", "end_reason"} + unknown = sorted(set(body) - allowed) + if unknown: + return web.json_response(_openai_error(f"Unsupported session fields: {', '.join(unknown)}", code="unsupported_session_field"), status=400) + + db = self._ensure_session_db() + if "title" in body: + try: + db.set_session_title(session_id, "" if body["title"] is None else str(body["title"])) + except ValueError as exc: + return web.json_response(_openai_error(str(exc), code="invalid_title"), status=400) + if body.get("end_reason"): + db.end_session(session_id, str(body["end_reason"])) + session = db.get_session(session_id) or session + return web.json_response({"object": "hermes.session", "session": self._session_response(session)}) + + async def _handle_delete_session(self, request: "web.Request") -> "web.Response": + """DELETE /api/sessions/{session_id}.""" + auth_err = self._check_auth(request) + if auth_err: + return auth_err + session_id = request.match_info["session_id"] + session, err = self._get_existing_session_or_404(session_id) + if err: + return err + db = self._ensure_session_db() + deleted = db.delete_session(session_id) + return web.json_response({"object": "hermes.session.deleted", "id": session_id, "deleted": bool(deleted)}) + + async def _handle_session_messages(self, request: "web.Request") -> "web.Response": + """GET /api/sessions/{session_id}/messages.""" + auth_err = self._check_auth(request) + if auth_err: + return auth_err + session_id = request.match_info["session_id"] + _, err = self._get_existing_session_or_404(session_id) + if err: + return err + db = self._ensure_session_db() + messages = db.get_messages(session_id) + return web.json_response({ + "object": "list", + "session_id": session_id, + "data": [self._message_response(m) for m in messages], + }) + + async def _handle_fork_session(self, request: "web.Request") -> "web.Response": + """POST /api/sessions/{session_id}/fork — branch via current SessionDB primitives.""" + auth_err = self._check_auth(request) + if auth_err: + return auth_err + source_id = request.match_info["session_id"] + source, err = self._get_existing_session_or_404(source_id) + if err: + return err + body, err = await self._read_json_body(request) + if err: + return err + db = self._ensure_session_db() + fork_id = str(body.get("id") or body.get("session_id") or f"api_{int(time.time())}_{uuid.uuid4().hex[:8]}").strip() + if not fork_id or re.search(r'[\r\n\x00]', fork_id): + return web.json_response(_openai_error("Invalid session ID", code="invalid_session_id"), status=400) + if db.get_session(fork_id): + return web.json_response(_openai_error(f"Session already exists: {fork_id}", code="session_exists"), status=409) + + # Match the CLI /branch semantics: mark the original as branched, then + # create a child session that carries the transcript forward. This uses + # SessionDB's native parent_session_id/end_reason visibility model rather + # than inventing a parallel fork store. + db.end_session(source_id, "branched") + db.create_session( + fork_id, + "api_server", + model=source.get("model"), + system_prompt=source.get("system_prompt"), + parent_session_id=source_id, + ) + messages = db.get_messages(source_id) + db.replace_messages(fork_id, messages) + title = body.get("title") + if title is None: + base = source.get("title") or "fork" + try: + title = db.get_next_title_in_lineage(base) + except Exception: + title = f"{base} fork" + try: + db.set_session_title(fork_id, str(title)) + except ValueError as exc: + return web.json_response(_openai_error(str(exc), code="invalid_title"), status=400) + fork = db.get_session(fork_id) or {"id": fork_id, "parent_session_id": source_id} + return web.json_response({"object": "hermes.session", "session": self._session_response(fork)}, status=201) + + async def _handle_session_chat(self, request: "web.Request") -> "web.Response": + """POST /api/sessions/{session_id}/chat — one synchronous agent turn.""" + auth_err = self._check_auth(request) + if auth_err: + return auth_err + gateway_session_key, key_err = self._parse_session_key_header(request) + if key_err is not None: + return key_err + session_id = request.match_info["session_id"] + _, err = self._get_existing_session_or_404(session_id) + if err: + return err + body, err = await self._read_json_body(request) + if err: + return err + user_message = body.get("message") or body.get("input") + if not _content_has_visible_payload(user_message): + return web.json_response(_openai_error("Missing 'message' field", code="missing_message"), status=400) + system_prompt = body.get("system_message") or body.get("instructions") + if system_prompt is not None and not isinstance(system_prompt, str): + return web.json_response(_openai_error("system_message must be a string", code="invalid_system_message"), status=400) + history = self._conversation_history_for_session(session_id) + result, usage = await self._run_agent( + user_message=user_message, + conversation_history=history, + ephemeral_system_prompt=system_prompt, + session_id=session_id, + gateway_session_key=gateway_session_key, + ) + effective_session_id = result.get("session_id") if isinstance(result, dict) else session_id + final_response = result.get("final_response", "") if isinstance(result, dict) else "" + headers = {"X-Hermes-Session-Id": effective_session_id or session_id} + if gateway_session_key: + headers["X-Hermes-Session-Key"] = gateway_session_key + return web.json_response( + { + "object": "hermes.session.chat.completion", + "session_id": effective_session_id or session_id, + "message": {"role": "assistant", "content": final_response}, + "usage": usage, + }, + headers=headers, + ) + + async def _handle_session_chat_stream(self, request: "web.Request") -> "web.StreamResponse": + """POST /api/sessions/{session_id}/chat/stream — SSE wrapper over _run_agent.""" + auth_err = self._check_auth(request) + if auth_err: + return auth_err + gateway_session_key, key_err = self._parse_session_key_header(request) + if key_err is not None: + return key_err + session_id = request.match_info["session_id"] + _, err = self._get_existing_session_or_404(session_id) + if err: + return err + body, err = await self._read_json_body(request) + if err: + return err + user_message = body.get("message") or body.get("input") + if not _content_has_visible_payload(user_message): + return web.json_response(_openai_error("Missing 'message' field", code="missing_message"), status=400) + system_prompt = body.get("system_message") or body.get("instructions") + if system_prompt is not None and not isinstance(system_prompt, str): + return web.json_response(_openai_error("system_message must be a string", code="invalid_system_message"), status=400) + + loop = asyncio.get_running_loop() + queue: "asyncio.Queue[Optional[tuple[str, Dict[str, Any]]]]" = asyncio.Queue() + message_id = f"msg_{uuid.uuid4().hex}" + run_id = f"run_{uuid.uuid4().hex}" + seq = 0 + + def _event_payload(name: str, payload: Dict[str, Any]) -> tuple[str, Dict[str, Any]]: + nonlocal seq + seq += 1 + payload.setdefault("session_id", session_id) + payload.setdefault("run_id", run_id) + payload.setdefault("seq", seq) + payload.setdefault("ts", time.time()) + return name, payload + + def _enqueue(name: str, payload: Dict[str, Any]) -> None: + event = _event_payload(name, payload) + try: + running_loop = asyncio.get_running_loop() + except RuntimeError: + running_loop = None + try: + if running_loop is loop: + queue.put_nowait(event) + else: + loop.call_soon_threadsafe(queue.put_nowait, event) + except RuntimeError: + pass + + def _delta(delta: str) -> None: + if delta: + _enqueue("assistant.delta", {"message_id": message_id, "delta": delta}) + + def _tool_progress(event_type: str, tool_name: str = None, preview: str = None, args=None, **kwargs) -> None: + if event_type == "reasoning.available": + _enqueue("tool.progress", {"message_id": message_id, "tool_name": tool_name or "_thinking", "delta": preview or ""}) + elif event_type in {"tool.started", "tool.completed", "tool.failed"}: + event_name = event_type.replace("tool.", "tool.") + _enqueue(event_name, {"message_id": message_id, "tool_name": tool_name, "preview": preview, "args": args}) + + async def _run_and_signal() -> None: + try: + await queue.put(_event_payload("run.started", {"user_message": {"role": "user", "content": user_message}})) + await queue.put(_event_payload("message.started", {"message": {"id": message_id, "role": "assistant"}})) + history = self._conversation_history_for_session(session_id) + result, usage = await self._run_agent( + user_message=user_message, + conversation_history=history, + ephemeral_system_prompt=system_prompt, + session_id=session_id, + stream_delta_callback=_delta, + tool_progress_callback=_tool_progress, + gateway_session_key=gateway_session_key, + ) + final_response = result.get("final_response", "") if isinstance(result, dict) else "" + effective_session_id = result.get("session_id", session_id) if isinstance(result, dict) else session_id + await queue.put(_event_payload("assistant.completed", { + "session_id": effective_session_id, + "message_id": message_id, + "content": final_response, + "completed": True, + "partial": False, + "interrupted": False, + })) + await queue.put(_event_payload("run.completed", { + "session_id": effective_session_id, + "message_id": message_id, + "completed": True, + "usage": usage, + })) + except Exception as exc: + logger.exception("[api_server] session chat stream failed") + await queue.put(_event_payload("error", {"message": str(exc)})) + finally: + await queue.put(_event_payload("done", {})) + await queue.put(None) + + task = asyncio.create_task(_run_and_signal()) + try: + self._background_tasks.add(task) + except TypeError: + pass + if hasattr(task, "add_done_callback"): + task.add_done_callback(self._background_tasks.discard) + + headers = { + "Content-Type": "text/event-stream", + "Cache-Control": "no-cache", + "X-Accel-Buffering": "no", + "X-Hermes-Session-Id": session_id, + } + if gateway_session_key: + headers["X-Hermes-Session-Key"] = gateway_session_key + response = web.StreamResponse(status=200, headers=headers) + await response.prepare(request) + last_write = time.monotonic() + try: + while True: + try: + item = await asyncio.wait_for(queue.get(), timeout=CHAT_COMPLETIONS_SSE_KEEPALIVE_SECONDS) + except asyncio.TimeoutError: + await response.write(b": keepalive\n\n") + last_write = time.monotonic() + continue + if item is None: + break + name, payload = item + data = json.dumps(payload, ensure_ascii=False) + await response.write(f"event: {name}\ndata: {data}\n\n".encode("utf-8")) + last_write = time.monotonic() + except (asyncio.CancelledError, ConnectionResetError): + task.cancel() + raise + except Exception as exc: + logger.debug("[api_server] session SSE stream error: %s", exc) + return response + async def _handle_chat_completions(self, request: "web.Request") -> "web.Response": """POST /v1/chat/completions — OpenAI Chat Completions format.""" auth_err = self._check_auth(request) @@ -3575,7 +4033,7 @@ class APIServerAdapter(BasePlatformAdapter): try: mws = [mw for mw in (cors_middleware, body_limit_middleware, security_headers_middleware) if mw is not None] self._app = web.Application(middlewares=mws, client_max_size=MAX_REQUEST_BYTES) - self._app["api_server_adapter"] = self + assert self._app is not None self._app.router.add_get("/health", self._handle_health) self._app.router.add_get("/health/detailed", self._handle_health_detailed) self._app.router.add_get("/v1/health", self._handle_health) @@ -3583,6 +4041,16 @@ class APIServerAdapter(BasePlatformAdapter): self._app.router.add_get("/v1/capabilities", self._handle_capabilities) self._app.router.add_get("/v1/skills", self._handle_skills) self._app.router.add_get("/v1/toolsets", self._handle_toolsets) + # Session/client control surface (thin wrappers over SessionDB + _run_agent) + self._app.router.add_get("/api/sessions", self._handle_list_sessions) + self._app.router.add_post("/api/sessions", self._handle_create_session) + self._app.router.add_get("/api/sessions/{session_id}", self._handle_get_session) + self._app.router.add_patch("/api/sessions/{session_id}", self._handle_patch_session) + self._app.router.add_delete("/api/sessions/{session_id}", self._handle_delete_session) + self._app.router.add_get("/api/sessions/{session_id}/messages", self._handle_session_messages) + self._app.router.add_post("/api/sessions/{session_id}/fork", self._handle_fork_session) + self._app.router.add_post("/api/sessions/{session_id}/chat", self._handle_session_chat) + self._app.router.add_post("/api/sessions/{session_id}/chat/stream", self._handle_session_chat_stream) self._app.router.add_post("/v1/chat/completions", self._handle_chat_completions) self._app.router.add_post("/v1/responses", self._handle_responses) self._app.router.add_get("/v1/responses/{response_id}", self._handle_get_response) @@ -3602,6 +4070,12 @@ class APIServerAdapter(BasePlatformAdapter): self._app.router.add_get("/v1/runs/{run_id}/events", self._handle_run_events) self._app.router.add_post("/v1/runs/{run_id}/approval", self._handle_run_approval) self._app.router.add_post("/v1/runs/{run_id}/stop", self._handle_stop_run) + # Store the adapter after native routes are registered. Local Hermes-Relay + # bootstrap shims use this key as a feature-detection hook; registering + # native routes first lets those shims no-op instead of shadowing the + # upstream session-control handlers. + self._app["api_server_adapter"] = self + # Start background sweep to clean up orphaned (unconsumed) run streams sweep_task = asyncio.create_task(self._sweep_orphaned_runs()) try: diff --git a/tests/gateway/test_session_api.py b/tests/gateway/test_session_api.py new file mode 100644 index 00000000000..7c99bc8c24a --- /dev/null +++ b/tests/gateway/test_session_api.py @@ -0,0 +1,240 @@ +"""Focused tests for API server session-control endpoints.""" + +import asyncio +from unittest.mock import AsyncMock, patch + +import pytest +from aiohttp import web +from aiohttp.test_utils import TestClient, TestServer + +from gateway.config import PlatformConfig +from gateway.platforms.api_server import APIServerAdapter +from hermes_state import SessionDB + + +@pytest.fixture +def session_db(tmp_path): + db = SessionDB(tmp_path / "state.db") + try: + yield db + finally: + close = getattr(db, "close", None) + if callable(close): + close() + + +@pytest.fixture +def adapter(session_db): + adapter = APIServerAdapter(PlatformConfig(enabled=True)) + adapter._session_db = session_db + return adapter + + +@pytest.fixture +def auth_adapter(session_db): + adapter = APIServerAdapter(PlatformConfig(enabled=True, extra={"key": "sk-test"})) + adapter._session_db = session_db + return adapter + + +def _create_session_app(adapter: APIServerAdapter) -> web.Application: + app = web.Application() + app.router.add_get("/v1/capabilities", adapter._handle_capabilities) + app.router.add_get("/api/sessions", adapter._handle_list_sessions) + app.router.add_post("/api/sessions", adapter._handle_create_session) + app.router.add_get("/api/sessions/{session_id}", adapter._handle_get_session) + app.router.add_patch("/api/sessions/{session_id}", adapter._handle_patch_session) + app.router.add_delete("/api/sessions/{session_id}", adapter._handle_delete_session) + app.router.add_get("/api/sessions/{session_id}/messages", adapter._handle_session_messages) + app.router.add_post("/api/sessions/{session_id}/fork", adapter._handle_fork_session) + app.router.add_post("/api/sessions/{session_id}/chat", adapter._handle_session_chat) + app.router.add_post("/api/sessions/{session_id}/chat/stream", adapter._handle_session_chat_stream) + return app + + +@pytest.mark.asyncio +async def test_capabilities_advertises_session_control_surface(adapter): + app = _create_session_app(adapter) + async with TestClient(TestServer(app)) as cli: + resp = await cli.get("/v1/capabilities") + assert resp.status == 200 + data = await resp.json() + + features = data["features"] + assert features["session_resources"] is True + assert features["session_chat"] is True + assert features["session_chat_streaming"] is True + assert features["session_fork"] is True + assert features["admin_config_rw"] is False + assert features["memory_write_api"] is False + assert features["skills_api"] is False + assert features["realtime_voice"] is False + assert data["endpoints"]["sessions"] == {"method": "GET", "path": "/api/sessions"} + assert data["endpoints"]["session_chat_stream"] == { + "method": "POST", + "path": "/api/sessions/{session_id}/chat/stream", + } + + +@pytest.mark.asyncio +async def test_session_crud_and_message_history(adapter, session_db): + app = _create_session_app(adapter) + async with TestClient(TestServer(app)) as cli: + create_resp = await cli.post("/api/sessions", json={"title": "Mobile chat", "model": "test-model"}) + assert create_resp.status == 201 + created = await create_resp.json() + session_id = created["session"]["id"] + assert created["object"] == "hermes.session" + assert created["session"]["title"] == "Mobile chat" + + session_db.append_message(session_id, "user", "hello from phone") + session_db.append_message(session_id, "assistant", "hello from hermes") + + list_resp = await cli.get("/api/sessions?limit=10&offset=0") + assert list_resp.status == 200 + listed = await list_resp.json() + assert listed["object"] == "list" + assert [s["id"] for s in listed["data"]] == [session_id] + assert listed["data"][0]["message_count"] == 2 + + get_resp = await cli.get(f"/api/sessions/{session_id}") + assert get_resp.status == 200 + got = await get_resp.json() + assert got["session"]["id"] == session_id + assert got["session"]["message_count"] == 2 + + messages_resp = await cli.get(f"/api/sessions/{session_id}/messages") + assert messages_resp.status == 200 + messages = await messages_resp.json() + assert messages["object"] == "list" + assert [m["role"] for m in messages["data"]] == ["user", "assistant"] + assert messages["data"][0]["content"] == "hello from phone" + + patch_resp = await cli.patch(f"/api/sessions/{session_id}", json={"title": "Renamed"}) + assert patch_resp.status == 200 + patched = await patch_resp.json() + assert patched["session"]["title"] == "Renamed" + + delete_resp = await cli.delete(f"/api/sessions/{session_id}") + assert delete_resp.status == 200 + deleted = await delete_resp.json() + assert deleted == {"object": "hermes.session.deleted", "id": session_id, "deleted": True} + assert session_db.get_session(session_id) is None + + +@pytest.mark.asyncio +async def test_session_fork_uses_current_sessiondb_branch_primitives(adapter, session_db): + source_id = session_db.create_session("source-session", "api_server", model="test-model") + session_db.set_session_title(source_id, "Original") + session_db.append_message(source_id, "user", "first path") + session_db.append_message(source_id, "assistant", "answer") + + app = _create_session_app(adapter) + async with TestClient(TestServer(app)) as cli: + resp = await cli.post(f"/api/sessions/{source_id}/fork", json={"title": "Alternative"}) + assert resp.status == 201 + payload = await resp.json() + + fork = payload["session"] + assert payload["object"] == "hermes.session" + assert fork["id"] != source_id + assert fork["parent_session_id"] == source_id + assert fork["title"] == "Alternative" + assert [m["content"] for m in session_db.get_messages(fork["id"])] == ["first path", "answer"] + assert session_db.get_session(source_id)["end_reason"] == "branched" + + +@pytest.mark.asyncio +async def test_session_chat_loads_history_and_preserves_session_headers(auth_adapter, session_db): + session_id = session_db.create_session("chat-session", "api_server") + session_db.set_session_title(session_id, "Chat") + session_db.append_message(session_id, "user", "earlier") + session_db.append_message(session_id, "assistant", "prior answer") + + mock_run = AsyncMock(return_value=({"final_response": "fresh answer", "session_id": session_id}, {"total_tokens": 3})) + app = _create_session_app(auth_adapter) + with patch.object(auth_adapter, "_run_agent", mock_run): + async with TestClient(TestServer(app)) as cli: + resp = await cli.post( + f"/api/sessions/{session_id}/chat", + json={"message": "next", "system_message": "stay focused"}, + headers={"Authorization": "Bearer sk-test", "X-Hermes-Session-Key": "client-42"}, + ) + assert resp.status == 200 + payload = await resp.json() + + assert resp.headers["X-Hermes-Session-Id"] == session_id + assert resp.headers["X-Hermes-Session-Key"] == "client-42" + assert payload["object"] == "hermes.session.chat.completion" + assert payload["session_id"] == session_id + assert payload["message"]["role"] == "assistant" + assert payload["message"]["content"] == "fresh answer" + mock_run.assert_awaited_once() + _, kwargs = mock_run.call_args + assert kwargs["session_id"] == session_id + assert kwargs["gateway_session_key"] == "client-42" + assert kwargs["ephemeral_system_prompt"] == "stay focused" + assert kwargs["conversation_history"] == [ + {"role": "user", "content": "earlier"}, + {"role": "assistant", "content": "prior answer"}, + ] + + +@pytest.mark.asyncio +async def test_session_chat_stream_emits_lifecycle_events_and_keepalive_safe_shape(adapter, session_db): + session_id = session_db.create_session("stream-session", "api_server") + session_db.set_session_title(session_id, "Stream") + + async def fake_run(**kwargs): + kwargs["stream_delta_callback"]("Hello") + kwargs["stream_delta_callback"](" world") + kwargs["tool_progress_callback"]("reasoning.available", tool_name="_thinking", preview="thinking") + return {"final_response": "Hello world", "session_id": session_id}, {"total_tokens": 2} + + app = _create_session_app(adapter) + with patch.object(adapter, "_run_agent", side_effect=fake_run): + async with TestClient(TestServer(app)) as cli: + resp = await cli.post(f"/api/sessions/{session_id}/chat/stream", json={"message": "stream please"}) + assert resp.status == 200 + assert resp.headers["Content-Type"].startswith("text/event-stream") + body = await resp.text() + + assert "event: run.started" in body + assert "event: message.started" in body + assert "event: assistant.delta" in body + assert "Hello world" in body + assert "event: tool.progress" in body + assert "event: assistant.completed" in body + assert "event: run.completed" in body + assert "event: done" in body + + +@pytest.mark.asyncio +async def test_session_endpoints_require_auth_when_key_configured(auth_adapter): + app = _create_session_app(auth_adapter) + async with TestClient(TestServer(app)) as cli: + resp = await cli.get("/api/sessions") + assert resp.status == 401 + body = await resp.json() + assert body["error"]["code"] == "invalid_api_key" + + ok = await cli.get("/api/sessions", headers={"Authorization": "Bearer sk-test"}) + assert ok.status == 200 + data = await ok.json() + assert data["object"] == "list" + assert data["data"] == [] + + +@pytest.mark.asyncio +async def test_session_header_rejected_without_api_key(adapter, session_db): + session_id = session_db.create_session("unsafe-session", "api_server") + app = _create_session_app(adapter) + async with TestClient(TestServer(app)) as cli: + resp = await cli.post( + f"/api/sessions/{session_id}/chat", + json={"message": "hello"}, + headers={"X-Hermes-Session-Key": "client-42"}, + ) + assert resp.status == 403 + data = await resp.json() + assert "X-Hermes-Session-Key requires API key" in data["error"]["message"]