in spark/sql/types/arrow.go [71:288]
func readArrayData(t arrow.Type, data arrow.ArrayData) ([]any, error) {
buf := make([]any, 0)
// Switch over the type t and append the values to buf.
switch t {
case arrow.BOOL:
data := array.NewBooleanData(data)
for i := 0; i < data.Len(); i++ {
if data.IsNull(i) {
buf = append(buf, nil)
} else {
buf = append(buf, data.Value(i))
}
}
case arrow.INT8:
data := array.NewInt8Data(data)
for i := 0; i < data.Len(); i++ {
if data.IsNull(i) {
buf = append(buf, nil)
} else {
buf = append(buf, data.Value(i))
}
}
case arrow.INT16:
data := array.NewInt16Data(data)
for i := 0; i < data.Len(); i++ {
if data.IsNull(i) {
buf = append(buf, nil)
} else {
buf = append(buf, data.Value(i))
}
}
case arrow.INT32:
data := array.NewInt32Data(data)
for i := 0; i < data.Len(); i++ {
if data.IsNull(i) {
buf = append(buf, nil)
} else {
buf = append(buf, data.Value(i))
}
}
case arrow.INT64:
data := array.NewInt64Data(data)
for i := 0; i < data.Len(); i++ {
if data.IsNull(i) {
buf = append(buf, nil)
} else {
buf = append(buf, data.Value(i))
}
}
case arrow.FLOAT16:
data := array.NewFloat16Data(data)
for i := 0; i < data.Len(); i++ {
if data.IsNull(i) {
buf = append(buf, nil)
} else {
buf = append(buf, data.Value(i))
}
}
case arrow.FLOAT32:
data := array.NewFloat32Data(data)
for i := 0; i < data.Len(); i++ {
if data.IsNull(i) {
buf = append(buf, nil)
} else {
buf = append(buf, data.Value(i))
}
}
case arrow.FLOAT64:
data := array.NewFloat64Data(data)
for i := 0; i < data.Len(); i++ {
if data.IsNull(i) {
buf = append(buf, nil)
} else {
buf = append(buf, data.Value(i))
}
}
case arrow.DECIMAL | arrow.DECIMAL128:
data := array.NewDecimal128Data(data)
for i := 0; i < data.Len(); i++ {
if data.IsNull(i) {
buf = append(buf, nil)
} else {
buf = append(buf, data.Value(i))
}
}
case arrow.DECIMAL256:
data := array.NewDecimal256Data(data)
for i := 0; i < data.Len(); i++ {
if data.IsNull(i) {
buf = append(buf, nil)
} else {
buf = append(buf, data.Value(i))
}
}
case arrow.STRING:
data := array.NewStringData(data)
for i := 0; i < data.Len(); i++ {
if data.IsNull(i) {
buf = append(buf, nil)
} else {
buf = append(buf, data.Value(i))
}
}
case arrow.BINARY:
data := array.NewBinaryData(data)
for i := 0; i < data.Len(); i++ {
if data.IsNull(i) {
buf = append(buf, nil)
} else {
buf = append(buf, data.Value(i))
}
}
case arrow.TIMESTAMP:
data := array.NewTimestampData(data)
for i := 0; i < data.Len(); i++ {
if data.IsNull(i) {
buf = append(buf, nil)
} else {
buf = append(buf, data.Value(i))
}
}
case arrow.DATE64:
data := array.NewDate64Data(data)
for i := 0; i < data.Len(); i++ {
if data.IsNull(i) {
buf = append(buf, nil)
} else {
buf = append(buf, data.Value(i))
}
}
case arrow.DATE32:
data := array.NewDate32Data(data)
for i := 0; i < data.Len(); i++ {
if data.IsNull(i) {
buf = append(buf, nil)
} else {
buf = append(buf, data.Value(i))
}
}
case arrow.LIST:
data := array.NewListData(data)
values := data.ListValues()
res, err := readArrayData(values.DataType().ID(), values.Data())
if err != nil {
return nil, err
}
for i := 0; i < data.Len(); i++ {
if data.IsNull(i) {
buf = append(buf, nil)
continue
}
start := data.Offsets()[i]
end := data.Offsets()[i+1]
// TODO: Unfortunately, this ends up being stored as a slice of slices of any. But not
// the right type.
buf = append(buf, res[start:end])
}
case arrow.MAP:
// For maps the data is stored as a list of key value pairs. So to extract the maps,
// we follow the same behavior as for lists but with two sub lists.
data := array.NewMapData(data)
keys := data.Keys()
values := data.Items()
keyValues, err := readArrayData(keys.DataType().ID(), keys.Data())
if err != nil {
return nil, err
}
valueValues, err := readArrayData(values.DataType().ID(), values.Data())
if err != nil {
return nil, err
}
for i := 0; i < data.Len(); i++ {
if data.IsNull(i) {
buf = append(buf, nil)
continue
}
tmp := make(map[any]any)
start := data.Offsets()[i]
end := data.Offsets()[i+1]
k := keyValues[start:end]
v := valueValues[start:end]
for j := 0; j < len(k); j++ {
tmp[k[j]] = v[j]
}
buf = append(buf, tmp)
}
case arrow.STRUCT:
data := array.NewStructData(data)
schema := data.DataType().(*arrow.StructType)
for i := 0; i < data.Len(); i++ {
if data.IsNull(i) {
buf = append(buf, nil)
continue
}
tmp := make(map[string]any)
for j := range data.NumField() {
field := data.Field(j)
fieldValues, err := readArrayData(field.DataType().ID(), field.Data())
if err != nil {
return nil, err
}
tmp[schema.Field(j).Name] = fieldValues[i]
}
buf = append(buf, tmp)
}
default:
return nil, fmt.Errorf("unsupported arrow data type %s", t.String())
}
return buf, nil
}