diff --git a/gateway/platforms/api_server.py b/gateway/platforms/api_server.py index 2077c9c859..60b03a2c61 100644 --- a/gateway/platforms/api_server.py +++ b/gateway/platforms/api_server.py @@ -515,6 +515,8 @@ class APIServerAdapter(BasePlatformAdapter): session_id: Optional[str] = None, stream_delta_callback=None, tool_progress_callback=None, + tool_start_callback=None, + tool_complete_callback=None, ) -> Any: """ Create an AIAgent instance using the gateway's runtime config. @@ -553,6 +555,8 @@ class APIServerAdapter(BasePlatformAdapter): platform="api_server", stream_delta_callback=stream_delta_callback, tool_progress_callback=tool_progress_callback, + tool_start_callback=tool_start_callback, + tool_complete_callback=tool_complete_callback, session_db=self._ensure_session_db(), fallback_model=fallback_model, ) @@ -965,6 +969,410 @@ class APIServerAdapter(BasePlatformAdapter): return response + async def _write_sse_responses( + self, + request: "web.Request", + response_id: str, + model: str, + created_at: int, + stream_q, + agent_task, + agent_ref, + conversation_history: List[Dict[str, str]], + user_message: str, + instructions: Optional[str], + conversation: Optional[str], + store: bool, + session_id: str, + ) -> "web.StreamResponse": + """Write an SSE stream for POST /v1/responses (OpenAI Responses API). + + Emits spec-compliant event types as the agent runs: + + - ``response.created`` — initial envelope (status=in_progress) + - ``response.output_text.delta`` / ``response.output_text.done`` — + streamed assistant text + - ``response.output_item.added`` / ``response.output_item.done`` + with ``item.type == "function_call"`` — when the agent invokes a + tool (both events fire; the ``done`` event carries the finalized + ``arguments`` string) + - ``response.output_item.added`` with + ``item.type == "function_call_output"`` — tool result with + ``{call_id, output, status}`` + - ``response.completed`` — terminal event carrying the full + response object with all output items + usage (same payload + shape as the non-streaming path for parity) + - ``response.failed`` — terminal event on agent error + + If the client disconnects mid-stream, ``agent.interrupt()`` is + called so the agent stops issuing upstream LLM calls, then the + asyncio task is cancelled. When ``store=True`` the full response + is persisted to the ResponseStore in a ``finally`` block so GET + /v1/responses/{id} and ``previous_response_id`` chaining work the + same as the batch path. + """ + import queue as _q + + sse_headers = { + "Content-Type": "text/event-stream", + "Cache-Control": "no-cache", + "X-Accel-Buffering": "no", + } + origin = request.headers.get("Origin", "") + cors = self._cors_headers_for_origin(origin) if origin else None + if cors: + sse_headers.update(cors) + if session_id: + sse_headers["X-Hermes-Session-Id"] = session_id + response = web.StreamResponse(status=200, headers=sse_headers) + await response.prepare(request) + + # State accumulated during the stream + final_text_parts: List[str] = [] + # Track open function_call items by name so we can emit a matching + # ``done`` event when the tool completes. Order preserved. + pending_tool_calls: List[Dict[str, Any]] = [] + # Output items we've emitted so far (used to build the terminal + # response.completed payload). Kept in the order they appeared. + emitted_items: List[Dict[str, Any]] = [] + # Monotonic counter for output_index (spec requires it). + output_index = 0 + # Monotonic counter for call_id generation if the agent doesn't + # provide one (it doesn't, from tool_progress_callback). + call_counter = 0 + # Track the assistant message item id + content index for text + # delta events — the spec ties deltas to a specific item. + message_item_id = f"msg_{uuid.uuid4().hex[:24]}" + message_output_index: Optional[int] = None + message_opened = False + + async def _write_event(event_type: str, data: Dict[str, Any]) -> None: + payload = f"event: {event_type}\ndata: {json.dumps(data)}\n\n" + await response.write(payload.encode()) + + def _envelope(status: str) -> Dict[str, Any]: + env: Dict[str, Any] = { + "id": response_id, + "object": "response", + "status": status, + "created_at": created_at, + "model": model, + } + return env + + final_response_text = "" + agent_error: Optional[str] = None + usage: Dict[str, int] = {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0} + + try: + # response.created — initial envelope, status=in_progress + created_env = _envelope("in_progress") + created_env["output"] = [] + await _write_event("response.created", { + "type": "response.created", + "response": created_env, + }) + last_activity = time.monotonic() + + async def _open_message_item() -> None: + """Emit response.output_item.added for the assistant message + the first time any text delta arrives.""" + nonlocal message_opened, message_output_index, output_index + if message_opened: + return + message_opened = True + message_output_index = output_index + output_index += 1 + item = { + "id": message_item_id, + "type": "message", + "status": "in_progress", + "role": "assistant", + "content": [], + } + await _write_event("response.output_item.added", { + "type": "response.output_item.added", + "output_index": message_output_index, + "item": item, + }) + + async def _emit_text_delta(delta_text: str) -> None: + await _open_message_item() + final_text_parts.append(delta_text) + await _write_event("response.output_text.delta", { + "type": "response.output_text.delta", + "item_id": message_item_id, + "output_index": message_output_index, + "content_index": 0, + "delta": delta_text, + }) + + async def _emit_tool_started(payload: Dict[str, Any]) -> str: + """Emit response.output_item.added for a function_call. + + Returns the call_id so the matching completion event can + reference it. Prefer the real ``tool_call_id`` from the + agent when available; fall back to a generated call id for + safety in tests or older code paths. + """ + nonlocal output_index, call_counter + call_counter += 1 + call_id = payload.get("tool_call_id") or f"call_{response_id[5:]}_{call_counter}" + args = payload.get("arguments", {}) + if isinstance(args, dict): + arguments_str = json.dumps(args) + else: + arguments_str = str(args) + item = { + "id": f"fc_{uuid.uuid4().hex[:24]}", + "type": "function_call", + "status": "in_progress", + "name": payload.get("name", ""), + "call_id": call_id, + "arguments": arguments_str, + } + idx = output_index + output_index += 1 + pending_tool_calls.append({ + "call_id": call_id, + "name": payload.get("name", ""), + "arguments": arguments_str, + "item_id": item["id"], + "output_index": idx, + }) + emitted_items.append({ + "type": "function_call", + "name": payload.get("name", ""), + "arguments": arguments_str, + "call_id": call_id, + }) + await _write_event("response.output_item.added", { + "type": "response.output_item.added", + "output_index": idx, + "item": item, + }) + return call_id + + async def _emit_tool_completed(payload: Dict[str, Any]) -> None: + """Emit response.output_item.done (function_call) followed + by response.output_item.added (function_call_output).""" + nonlocal output_index + call_id = payload.get("tool_call_id") + result = payload.get("result", "") + pending = None + if call_id: + for i, p in enumerate(pending_tool_calls): + if p["call_id"] == call_id: + pending = pending_tool_calls.pop(i) + break + if pending is None: + # Completion without a matching start — skip to avoid + # emitting orphaned done events. + return + + # function_call done + done_item = { + "id": pending["item_id"], + "type": "function_call", + "status": "completed", + "name": pending["name"], + "call_id": pending["call_id"], + "arguments": pending["arguments"], + } + await _write_event("response.output_item.done", { + "type": "response.output_item.done", + "output_index": pending["output_index"], + "item": done_item, + }) + + # function_call_output added (result) + result_str = result if isinstance(result, str) else json.dumps(result) + output_item = { + "id": f"fco_{uuid.uuid4().hex[:24]}", + "type": "function_call_output", + "call_id": pending["call_id"], + "output": result_str, + "status": "completed", + } + idx = output_index + output_index += 1 + emitted_items.append({ + "type": "function_call_output", + "call_id": pending["call_id"], + "output": result_str, + }) + await _write_event("response.output_item.added", { + "type": "response.output_item.added", + "output_index": idx, + "item": output_item, + }) + + # Main drain loop — thread-safe queue fed by agent callbacks. + async def _dispatch(it) -> None: + """Route a queue item to the correct SSE emitter. + + Plain strings are text deltas. Tagged tuples with + ``__tool_started__`` / ``__tool_completed__`` prefixes + are tool lifecycle events. + """ + if isinstance(it, tuple) and len(it) == 2 and isinstance(it[0], str): + tag, payload = it + if tag == "__tool_started__": + await _emit_tool_started(payload) + elif tag == "__tool_completed__": + await _emit_tool_completed(payload) + # Unknown tags are silently ignored (forward-compat). + elif isinstance(it, str): + await _emit_text_delta(it) + # Other types (non-string, non-tuple) are silently dropped. + + loop = asyncio.get_event_loop() + while True: + try: + item = await loop.run_in_executor(None, lambda: stream_q.get(timeout=0.5)) + except _q.Empty: + if agent_task.done(): + # Drain remaining + while True: + try: + item = stream_q.get_nowait() + if item is None: + break + await _dispatch(item) + last_activity = time.monotonic() + except _q.Empty: + break + break + if time.monotonic() - last_activity >= CHAT_COMPLETIONS_SSE_KEEPALIVE_SECONDS: + await response.write(b": keepalive\n\n") + last_activity = time.monotonic() + continue + + if item is None: # EOS sentinel + break + + await _dispatch(item) + last_activity = time.monotonic() + + # Pick up agent result + usage from the completed task + try: + result, agent_usage = await agent_task + usage = agent_usage or usage + # If the agent produced a final_response but no text + # deltas were streamed (e.g. some providers only emit + # the full response at the end), emit a single fallback + # delta so Responses clients still receive a live text part. + agent_final = result.get("final_response", "") if isinstance(result, dict) else "" + if agent_final and not final_text_parts: + await _emit_text_delta(agent_final) + if agent_final and not final_response_text: + final_response_text = agent_final + if isinstance(result, dict) and result.get("error") and not final_response_text: + agent_error = result["error"] + except Exception as e: # noqa: BLE001 + logger.error("Error running agent for streaming responses: %s", e, exc_info=True) + agent_error = str(e) + + # Close the message item if it was opened + final_response_text = "".join(final_text_parts) or final_response_text + if message_opened: + await _write_event("response.output_text.done", { + "type": "response.output_text.done", + "item_id": message_item_id, + "output_index": message_output_index, + "content_index": 0, + "text": final_response_text, + }) + msg_done_item = { + "id": message_item_id, + "type": "message", + "status": "completed", + "role": "assistant", + "content": [ + {"type": "output_text", "text": final_response_text} + ], + } + await _write_event("response.output_item.done", { + "type": "response.output_item.done", + "output_index": message_output_index, + "item": msg_done_item, + }) + + # Always append a final message item in the completed + # response envelope so clients that only parse the terminal + # payload still see the assistant text. This mirrors the + # shape produced by _extract_output_items in the batch path. + final_items: List[Dict[str, Any]] = list(emitted_items) + final_items.append({ + "type": "message", + "role": "assistant", + "content": [ + {"type": "output_text", "text": final_response_text or (agent_error or "")} + ], + }) + + if agent_error: + failed_env = _envelope("failed") + failed_env["output"] = final_items + failed_env["error"] = {"message": agent_error, "type": "server_error"} + failed_env["usage"] = { + "input_tokens": usage.get("input_tokens", 0), + "output_tokens": usage.get("output_tokens", 0), + "total_tokens": usage.get("total_tokens", 0), + } + await _write_event("response.failed", { + "type": "response.failed", + "response": failed_env, + }) + else: + completed_env = _envelope("completed") + completed_env["output"] = final_items + completed_env["usage"] = { + "input_tokens": usage.get("input_tokens", 0), + "output_tokens": usage.get("output_tokens", 0), + "total_tokens": usage.get("total_tokens", 0), + } + await _write_event("response.completed", { + "type": "response.completed", + "response": completed_env, + }) + + # Persist for future chaining / GET retrieval, mirroring + # the batch path behavior. + if store: + full_history = list(conversation_history) + full_history.append({"role": "user", "content": user_message}) + if isinstance(result, dict) and result.get("messages"): + full_history.extend(result["messages"]) + else: + full_history.append({"role": "assistant", "content": final_response_text}) + self._response_store.put(response_id, { + "response": completed_env, + "conversation_history": full_history, + "instructions": instructions, + }) + if conversation: + self._response_store.set_conversation(conversation, response_id) + + except (ConnectionResetError, ConnectionAbortedError, BrokenPipeError, OSError): + # Client disconnected — interrupt the agent so it stops + # making upstream LLM calls, then cancel the task. + agent = agent_ref[0] if agent_ref else None + if agent is not None: + try: + agent.interrupt("SSE client disconnected") + except Exception: + pass + if not agent_task.done(): + agent_task.cancel() + try: + await agent_task + except (asyncio.CancelledError, Exception): + pass + logger.info("SSE client disconnected; interrupted agent task %s", response_id) + + return response + async def _handle_responses(self, request: "web.Request") -> "web.Response": """POST /v1/responses — OpenAI Responses API format.""" auth_err = self._check_auth(request) @@ -1060,6 +1468,80 @@ class APIServerAdapter(BasePlatformAdapter): # Run the agent (with Idempotency-Key support) session_id = str(uuid.uuid4()) + stream = bool(body.get("stream", False)) + if stream: + # Streaming branch — emit OpenAI Responses SSE events as the + # agent runs so frontends can render text deltas and tool + # calls in real time. See _write_sse_responses for details. + import queue as _q + _stream_q: _q.Queue = _q.Queue() + + def _on_delta(delta): + # None from the agent is a CLI box-close signal, not EOS. + # Forwarding would kill the SSE stream prematurely; the + # SSE writer detects completion via agent_task.done(). + if delta is not None: + _stream_q.put(delta) + + def _on_tool_progress(event_type, name, preview, args, **kwargs): + """Queue non-start tool progress events if needed in future. + + The structured Responses stream uses ``tool_start_callback`` + and ``tool_complete_callback`` for exact call-id correlation, + so progress events are currently ignored here. + """ + return + + def _on_tool_start(tool_call_id, function_name, function_args): + """Queue a started tool for live function_call streaming.""" + _stream_q.put(("__tool_started__", { + "tool_call_id": tool_call_id, + "name": function_name, + "arguments": function_args or {}, + })) + + def _on_tool_complete(tool_call_id, function_name, function_args, function_result): + """Queue a completed tool result for live function_call_output streaming.""" + _stream_q.put(("__tool_completed__", { + "tool_call_id": tool_call_id, + "name": function_name, + "arguments": function_args or {}, + "result": function_result, + })) + + agent_ref = [None] + agent_task = asyncio.ensure_future(self._run_agent( + user_message=user_message, + conversation_history=conversation_history, + ephemeral_system_prompt=instructions, + session_id=session_id, + stream_delta_callback=_on_delta, + tool_progress_callback=_on_tool_progress, + tool_start_callback=_on_tool_start, + tool_complete_callback=_on_tool_complete, + agent_ref=agent_ref, + )) + + response_id = f"resp_{uuid.uuid4().hex[:28]}" + model_name = body.get("model", self._model_name) + created_at = int(time.time()) + + return await self._write_sse_responses( + request=request, + response_id=response_id, + model=model_name, + created_at=created_at, + stream_q=_stream_q, + agent_task=agent_task, + agent_ref=agent_ref, + conversation_history=conversation_history, + user_message=user_message, + instructions=instructions, + conversation=conversation, + store=store, + session_id=session_id, + ) + async def _compute_response(): return await self._run_agent( user_message=user_message, @@ -1486,6 +1968,8 @@ class APIServerAdapter(BasePlatformAdapter): session_id: Optional[str] = None, stream_delta_callback=None, tool_progress_callback=None, + tool_start_callback=None, + tool_complete_callback=None, agent_ref: Optional[list] = None, ) -> tuple: """ @@ -1507,6 +1991,8 @@ class APIServerAdapter(BasePlatformAdapter): session_id=session_id, stream_delta_callback=stream_delta_callback, tool_progress_callback=tool_progress_callback, + tool_start_callback=tool_start_callback, + tool_complete_callback=tool_complete_callback, ) if agent_ref is not None: agent_ref[0] = agent diff --git a/tests/gateway/test_api_server.py b/tests/gateway/test_api_server.py index be1fc63bf4..28df94e0fa 100644 --- a/tests/gateway/test_api_server.py +++ b/tests/gateway/test_api_server.py @@ -1115,6 +1115,131 @@ class TestResponsesEndpoint: assert resp.status == 400 +class TestResponsesStreaming: + @pytest.mark.asyncio + async def test_stream_true_returns_responses_sse(self, adapter): + app = _create_app(adapter) + async with TestClient(TestServer(app)) as cli: + async def _mock_run_agent(**kwargs): + cb = kwargs.get("stream_delta_callback") + if cb: + cb("Hello") + cb(" world") + return ( + {"final_response": "Hello world", "messages": [], "api_calls": 1}, + {"input_tokens": 10, "output_tokens": 5, "total_tokens": 15}, + ) + + with patch.object(adapter, "_run_agent", side_effect=_mock_run_agent): + resp = await cli.post( + "/v1/responses", + json={"model": "hermes-agent", "input": "hi", "stream": True}, + ) + assert resp.status == 200 + assert "text/event-stream" in resp.headers.get("Content-Type", "") + body = await resp.text() + assert "event: response.created" in body + assert "event: response.output_text.delta" in body + assert "event: response.output_text.done" in body + assert "event: response.completed" in body + assert "Hello" in body + assert " world" in body + + @pytest.mark.asyncio + async def test_stream_emits_function_call_and_output_items(self, adapter): + app = _create_app(adapter) + async with TestClient(TestServer(app)) as cli: + async def _mock_run_agent(**kwargs): + start_cb = kwargs.get("tool_start_callback") + complete_cb = kwargs.get("tool_complete_callback") + text_cb = kwargs.get("stream_delta_callback") + if start_cb: + start_cb("call_123", "read_file", {"path": "/tmp/test.txt"}) + if complete_cb: + complete_cb("call_123", "read_file", {"path": "/tmp/test.txt"}, '{"content":"hello"}') + if text_cb: + text_cb("Done.") + return ( + { + "final_response": "Done.", + "messages": [ + { + "role": "assistant", + "tool_calls": [ + { + "id": "call_123", + "function": { + "name": "read_file", + "arguments": '{"path":"/tmp/test.txt"}', + }, + } + ], + }, + { + "role": "tool", + "tool_call_id": "call_123", + "content": '{"content":"hello"}', + }, + ], + "api_calls": 1, + }, + {"input_tokens": 10, "output_tokens": 5, "total_tokens": 15}, + ) + + with patch.object(adapter, "_run_agent", side_effect=_mock_run_agent): + resp = await cli.post( + "/v1/responses", + json={"model": "hermes-agent", "input": "read the file", "stream": True}, + ) + assert resp.status == 200 + body = await resp.text() + assert "event: response.output_item.added" in body + assert "event: response.output_item.done" in body + assert '"type": "function_call"' in body + assert '"type": "function_call_output"' in body + assert '"call_id": "call_123"' in body + assert '"name": "read_file"' in body + assert '"output": "{\\"content\\":\\"hello\\"}"' in body + + @pytest.mark.asyncio + async def test_streamed_response_is_stored_for_get(self, adapter): + app = _create_app(adapter) + async with TestClient(TestServer(app)) as cli: + async def _mock_run_agent(**kwargs): + cb = kwargs.get("stream_delta_callback") + if cb: + cb("Stored response") + return ( + {"final_response": "Stored response", "messages": [], "api_calls": 1}, + {"input_tokens": 1, "output_tokens": 2, "total_tokens": 3}, + ) + + with patch.object(adapter, "_run_agent", side_effect=_mock_run_agent): + resp = await cli.post( + "/v1/responses", + json={"model": "hermes-agent", "input": "store this", "stream": True}, + ) + body = await resp.text() + response_id = None + for line in body.splitlines(): + if line.startswith("data: "): + try: + payload = json.loads(line[len("data: "):]) + except json.JSONDecodeError: + continue + if payload.get("type") == "response.completed": + response_id = payload["response"]["id"] + break + assert response_id + + get_resp = await cli.get(f"/v1/responses/{response_id}") + assert get_resp.status == 200 + data = await get_resp.json() + assert data["id"] == response_id + assert data["status"] == "completed" + assert data["output"][-1]["content"][0]["text"] == "Stored response" + + # --------------------------------------------------------------------------- # Auth on endpoints # --------------------------------------------------------------------------- diff --git a/website/docs/user-guide/features/api-server.md b/website/docs/user-guide/features/api-server.md index efb254a006..52ed8e8938 100644 --- a/website/docs/user-guide/features/api-server.md +++ b/website/docs/user-guide/features/api-server.md @@ -83,9 +83,11 @@ Standard OpenAI Chat Completions format. Stateless — the full conversation is } ``` -**Streaming** (`"stream": true`): Returns Server-Sent Events (SSE) with token-by-token response chunks. When streaming is enabled in config, tokens are emitted live as the LLM generates them. When disabled, the full response is sent as a single SSE chunk. +**Streaming** (`"stream": true`): Returns Server-Sent Events (SSE) with token-by-token response chunks. For **Chat Completions**, the stream uses standard `chat.completion.chunk` events plus Hermes' custom `hermes.tool.progress` event for tool-start UX. For **Responses**, the stream uses OpenAI Responses event types such as `response.created`, `response.output_text.delta`, `response.output_item.added`, `response.output_item.done`, and `response.completed`. -**Tool progress in streams**: When the agent calls tools during a streaming request, brief progress indicators are injected into the content stream as the tools start executing (e.g. `` `💻 pwd` ``, `` `🔍 Python docs` ``). These appear as inline markdown before the agent's response text, giving frontends like Open WebUI real-time visibility into tool execution. +**Tool progress in streams**: +- **Chat Completions**: Hermes emits `event: hermes.tool.progress` for tool-start visibility without polluting persisted assistant text. +- **Responses**: Hermes emits spec-native `function_call` and `function_call_output` output items during the SSE stream, so clients can render structured tool UI in real time. ### POST /v1/responses diff --git a/website/docs/user-guide/messaging/open-webui.md b/website/docs/user-guide/messaging/open-webui.md index 71860d367f..b26d23eddf 100644 --- a/website/docs/user-guide/messaging/open-webui.md +++ b/website/docs/user-guide/messaging/open-webui.md @@ -134,10 +134,10 @@ To use the Responses API mode: 3. Change **API Type** from "Chat Completions" to **"Responses (Experimental)"** 4. Save -With the Responses API, Open WebUI sends requests in the Responses format (`input` array + `instructions`), and Hermes Agent can preserve full tool call history across turns via `previous_response_id`. +With the Responses API, Open WebUI sends requests in the Responses format (`input` array + `instructions`), and Hermes Agent can preserve full tool call history across turns via `previous_response_id`. When `stream: true`, Hermes also streams spec-native `function_call` and `function_call_output` items, which enables custom structured tool-call UI in clients that render Responses events. :::note -Open WebUI currently manages conversation history client-side even in Responses mode — it sends the full message history in each request rather than using `previous_response_id`. The Responses API mode is mainly useful for future compatibility as frontends evolve. +Open WebUI currently manages conversation history client-side even in Responses mode — it sends the full message history in each request rather than using `previous_response_id`. The main advantage of Responses mode today is the structured event stream: text deltas, `function_call`, and `function_call_output` items arrive as OpenAI Responses SSE events instead of Chat Completions chunks. ::: ## How It Works