From b3c798d1b6e6e86c566d33422d8806c8be63aaf1 Mon Sep 17 00:00:00 2001 From: teknium1 Date: Wed, 11 Mar 2026 08:54:07 -0700 Subject: [PATCH] feat: add pseudo-streaming SSE + conversation parameter Cherry-picked from PR #828. --- gateway/platforms/api_server.py | 68 ++++++++++++++++++++++++++++---- tests/gateway/test_api_server.py | 31 ++++++++++----- 2 files changed, 82 insertions(+), 17 deletions(-) diff --git a/gateway/platforms/api_server.py b/gateway/platforms/api_server.py index b8e273c785..831d981dff 100644 --- a/gateway/platforms/api_server.py +++ b/gateway/platforms/api_server.py @@ -268,11 +268,6 @@ class APIServerAdapter(BasePlatformAdapter): ) stream = body.get("stream", False) - if stream: - return web.json_response( - {"error": {"message": "Streaming is not yet supported. Set stream=false.", "type": "invalid_request_error"}}, - status=501, - ) # Extract system message (becomes ephemeral system prompt layered ON TOP of core) system_prompt = None @@ -324,11 +319,22 @@ class APIServerAdapter(BasePlatformAdapter): final_response = result.get("error", "(No response generated)") completion_id = f"chatcmpl-{uuid.uuid4().hex[:29]}" + model_name = body.get("model", "hermes-agent") + created = int(time.time()) + + if stream: + # Pseudo-streaming: return the full response as SSE chunks. + # Not true token-by-token streaming, but compatible with clients + # (like Open WebUI) that expect SSE format. + return await self._write_sse_chat_completion( + request, completion_id, model_name, created, final_response, usage + ) + response_data = { "id": completion_id, "object": "chat.completion", - "created": int(time.time()), - "model": body.get("model", "hermes-agent"), + "created": created, + "model": model_name, "choices": [ { "index": 0, @@ -348,6 +354,54 @@ class APIServerAdapter(BasePlatformAdapter): return web.json_response(response_data) + async def _write_sse_chat_completion( + self, request: "web.Request", completion_id: str, model: str, + created: int, content: str, usage: Dict[str, int], + ) -> "web.StreamResponse": + """Write a chat completion as SSE chunks (pseudo-streaming). + + Returns the full response as three SSE events (role, content, finish) + followed by [DONE]. Not true token-by-token streaming, but compatible + with clients like Open WebUI that require SSE format. + """ + response = web.StreamResponse( + status=200, + headers={"Content-Type": "text/event-stream", "Cache-Control": "no-cache"}, + ) + await response.prepare(request) + + # Role chunk + role_chunk = { + "id": completion_id, "object": "chat.completion.chunk", + "created": created, "model": model, + "choices": [{"index": 0, "delta": {"role": "assistant"}, "finish_reason": None}], + } + await response.write(f"data: {json.dumps(role_chunk)}\n\n".encode()) + + # Content chunk (full response in one chunk for now) + content_chunk = { + "id": completion_id, "object": "chat.completion.chunk", + "created": created, "model": model, + "choices": [{"index": 0, "delta": {"content": content}, "finish_reason": None}], + } + await response.write(f"data: {json.dumps(content_chunk)}\n\n".encode()) + + # Finish chunk + finish_chunk = { + "id": completion_id, "object": "chat.completion.chunk", + "created": created, "model": model, + "choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}], + "usage": { + "prompt_tokens": usage.get("input_tokens", 0), + "completion_tokens": usage.get("output_tokens", 0), + "total_tokens": usage.get("total_tokens", 0), + }, + } + await response.write(f"data: {json.dumps(finish_chunk)}\n\n".encode()) + await response.write(b"data: [DONE]\n\n") + + return response + async def _handle_responses(self, request: "web.Request") -> "web.Response": """POST /v1/responses — OpenAI Responses API format.""" auth_err = self._check_auth(request) diff --git a/tests/gateway/test_api_server.py b/tests/gateway/test_api_server.py index 89cc8b82bc..bee3fe91d0 100644 --- a/tests/gateway/test_api_server.py +++ b/tests/gateway/test_api_server.py @@ -310,18 +310,29 @@ class TestChatCompletionsEndpoint: assert resp.status == 400 @pytest.mark.asyncio - async def test_stream_true_returns_501(self, adapter): + async def test_stream_true_returns_sse(self, adapter): + """stream=true returns SSE format with the full response.""" app = _create_app(adapter) async with TestClient(TestServer(app)) as cli: - resp = await cli.post( - "/v1/chat/completions", - json={ - "model": "test", - "messages": [{"role": "user", "content": "hi"}], - "stream": True, - }, - ) - assert resp.status == 501 + with patch.object(adapter, "_run_agent", new_callable=AsyncMock) as mock_run: + mock_run.return_value = ( + {"final_response": "Hello!", "messages": [], "api_calls": 1}, + {"input_tokens": 10, "output_tokens": 5, "total_tokens": 15}, + ) + resp = await cli.post( + "/v1/chat/completions", + json={ + "model": "test", + "messages": [{"role": "user", "content": "hi"}], + "stream": True, + }, + ) + assert resp.status == 200 + assert "text/event-stream" in resp.headers.get("Content-Type", "") + body = await resp.text() + assert "data: " in body + assert "[DONE]" in body + assert "Hello!" in body @pytest.mark.asyncio async def test_no_user_message_returns_400(self, adapter):