diff --git a/gateway/platforms/api_server.py b/gateway/platforms/api_server.py index 60b03a2c61..62196973d1 100644 --- a/gateway/platforms/api_server.py +++ b/gateway/platforms/api_server.py @@ -1040,6 +1040,10 @@ class APIServerAdapter(BasePlatformAdapter): # Monotonic counter for call_id generation if the agent doesn't # provide one (it doesn't, from tool_progress_callback). call_counter = 0 + # Canonical Responses SSE events include a monotonically increasing + # sequence_number. Add it server-side for every emitted event so + # clients that validate the OpenAI event schema can parse our stream. + sequence_number = 0 # Track the assistant message item id + content index for text # delta events — the spec ties deltas to a specific item. message_item_id = f"msg_{uuid.uuid4().hex[:24]}" @@ -1047,6 +1051,10 @@ class APIServerAdapter(BasePlatformAdapter): message_opened = False async def _write_event(event_type: str, data: Dict[str, Any]) -> None: + nonlocal sequence_number + if "sequence_number" not in data: + data["sequence_number"] = sequence_number + sequence_number += 1 payload = f"event: {event_type}\ndata: {json.dumps(data)}\n\n" await response.write(payload.encode()) @@ -1105,6 +1113,7 @@ class APIServerAdapter(BasePlatformAdapter): "output_index": message_output_index, "content_index": 0, "delta": delta_text, + "logprobs": [], }) async def _emit_tool_started(payload: Dict[str, Any]) -> str: @@ -1187,11 +1196,12 @@ class APIServerAdapter(BasePlatformAdapter): # function_call_output added (result) result_str = result if isinstance(result, str) else json.dumps(result) + output_parts = [{"type": "input_text", "text": result_str}] output_item = { "id": f"fco_{uuid.uuid4().hex[:24]}", "type": "function_call_output", "call_id": pending["call_id"], - "output": result_str, + "output": output_parts, "status": "completed", } idx = output_index @@ -1199,13 +1209,18 @@ class APIServerAdapter(BasePlatformAdapter): emitted_items.append({ "type": "function_call_output", "call_id": pending["call_id"], - "output": result_str, + "output": output_parts, }) await _write_event("response.output_item.added", { "type": "response.output_item.added", "output_index": idx, "item": output_item, }) + await _write_event("response.output_item.done", { + "type": "response.output_item.done", + "output_index": idx, + "item": output_item, + }) # Main drain loop — thread-safe queue fed by agent callbacks. async def _dispatch(it) -> None: @@ -1282,6 +1297,7 @@ class APIServerAdapter(BasePlatformAdapter): "output_index": message_output_index, "content_index": 0, "text": final_response_text, + "logprobs": [], }) msg_done_item = { "id": message_item_id, @@ -1933,10 +1949,15 @@ class APIServerAdapter(BasePlatformAdapter): "call_id": tc.get("id", ""), }) elif role == "tool": + output_content = msg.get("content", "") + if isinstance(output_content, list): + output = output_content + else: + output = [{"type": "input_text", "text": str(output_content)}] items.append({ "type": "function_call_output", "call_id": msg.get("tool_call_id", ""), - "output": msg.get("content", ""), + "output": output, }) # Final assistant message diff --git a/tests/gateway/test_api_server.py b/tests/gateway/test_api_server.py index 28df94e0fa..32346fc83d 100644 --- a/tests/gateway/test_api_server.py +++ b/tests/gateway/test_api_server.py @@ -1142,6 +1142,8 @@ class TestResponsesStreaming: assert "event: response.output_text.delta" in body assert "event: response.output_text.done" in body assert "event: response.completed" in body + assert '"sequence_number":' in body + assert '"logprobs": []' in body assert "Hello" in body assert " world" in body @@ -1195,11 +1197,12 @@ class TestResponsesStreaming: body = await resp.text() assert "event: response.output_item.added" in body assert "event: response.output_item.done" in body + assert body.count("event: response.output_item.done") >= 2 assert '"type": "function_call"' in body assert '"type": "function_call_output"' in body assert '"call_id": "call_123"' in body assert '"name": "read_file"' in body - assert '"output": "{\\"content\\":\\"hello\\"}"' in body + assert '"output": [{"type": "input_text", "text": "{\\"content\\":\\"hello\\"}"}]' in body @pytest.mark.asyncio async def test_streamed_response_is_stored_for_get(self, adapter): @@ -1544,7 +1547,7 @@ class TestToolCallsInOutput: assert output[0]["call_id"] == "call_abc123" assert output[1]["type"] == "function_call_output" assert output[1]["call_id"] == "call_abc123" - assert output[1]["output"] == "42" + assert output[1]["output"] == [{"type": "input_text", "text": "42"}] assert output[2]["type"] == "message" assert output[2]["content"][0]["text"] == "The result is 42."