in spark/sql/sparksession.go [230:280]
func (s *sparkSessionImpl) CreateDataFrame(ctx context.Context, data [][]any, schema *types.StructType) (DataFrame, error) {
pool := memory.NewGoAllocator()
// Convert the data into an Arrow Table
arrowSchema := arrow.NewSchema(schema.ToArrowType().Fields(), nil)
rb := array.NewRecordBuilder(pool, arrowSchema)
defer rb.Release()
// Iterate over all fields and add the values:
for _, row := range data {
for i, field := range schema.Fields {
if row[i] == nil {
rb.Field(i).AppendNull()
continue
}
switch field.DataType {
case types.BOOLEAN:
rb.Field(i).(*array.BooleanBuilder).Append(row[i].(bool))
case types.BYTE:
rb.Field(i).(*array.Int8Builder).Append(int8(row[i].(int)))
case types.SHORT:
rb.Field(i).(*array.Int16Builder).Append(int16(row[i].(int)))
case types.INTEGER:
rb.Field(i).(*array.Int32Builder).Append(int32(row[i].(int)))
case types.LONG:
rb.Field(i).(*array.Int64Builder).Append(int64(row[i].(int)))
case types.FLOAT:
rb.Field(i).(*array.Float32Builder).Append(float32(row[i].(float32)))
case types.DOUBLE:
rb.Field(i).(*array.Float64Builder).Append(row[i].(float64))
case types.STRING:
rb.Field(i).(*array.StringBuilder).Append(row[i].(string))
case types.DATE:
rb.Field(i).(*array.Date32Builder).Append(
arrow.Date32FromTime(row[i].(time.Time)))
case types.TIMESTAMP:
ts, err := arrow.TimestampFromTime(row[i].(time.Time), arrow.Millisecond)
if err != nil {
return nil, err
}
rb.Field(i).(*array.TimestampBuilder).Append(ts)
default:
return nil, sparkerrors.WithType(fmt.Errorf(
"unsupported data type: %s", field.DataType), sparkerrors.NotImplementedError)
}
}
}
rec := rb.NewRecord()
defer rec.Release()
tbl := array.NewTableFromRecords(arrowSchema, []arrow.Record{rec})
defer tbl.Release()
return s.CreateDataFrameFromArrow(ctx, tbl)
}