arrow/avro/reader_types.go (787 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package avro
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"math/big"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/decimal128"
"github.com/apache/arrow-go/v18/arrow/decimal256"
"github.com/apache/arrow-go/v18/arrow/extensions"
"github.com/apache/arrow-go/v18/arrow/memory"
)
type dataLoader struct {
idx, depth int32
list *fieldPos
item *fieldPos
mapField *fieldPos
mapKey *fieldPos
mapValue *fieldPos
fields []*fieldPos
children []*dataLoader
}
var (
ErrNullStructData = errors.New("null struct data")
)
func newDataLoader() *dataLoader { return &dataLoader{idx: 0, depth: 0} }
// drawTree takes the tree of field builders produced by mapFieldBuilders()
// and produces another tree structure and aggregates fields whose values can
// be retrieved from a `map[string]any` into a slice of builders, and creates a hierarchy to
// deal with nested types (lists and maps).
func (d *dataLoader) drawTree(field *fieldPos) {
for _, f := range field.children() {
if f.isList || f.isMap {
if f.isList {
c := d.newListChild(f)
if !f.childrens[0].isList {
c.item = f.childrens[0]
c.drawTree(f.childrens[0])
} else {
c.drawTree(f.childrens[0].childrens[0])
}
}
if f.isMap {
c := d.newMapChild(f)
if !arrow.IsNested(f.childrens[1].builder.Type().ID()) {
c.mapKey = f.childrens[0]
c.mapValue = f.childrens[1]
} else {
c.mapKey = f.childrens[0]
m := c.newChild()
m.mapValue = f.childrens[1]
m.drawTree(f.childrens[1])
}
}
} else {
d.fields = append(d.fields, f)
if len(f.children()) > 0 {
d.drawTree(f)
}
}
}
}
// loadDatum loads decoded Avro data to the schema fields' builder functions.
// Since array.StructBuilder.AppendNull() will recursively append null to all of the
// struct's fields, in the case of nil being passed to a struct's builderFunc it will
// return a ErrNullStructData error to signal that all its sub-fields can be skipped.
func (d *dataLoader) loadDatum(data any) error {
if d.list == nil && d.mapField == nil {
if d.mapValue != nil {
d.mapValue.appendFunc(data)
}
var NullParent *fieldPos
for _, f := range d.fields {
if f.parent == NullParent {
continue
}
if d.mapValue == nil {
err := f.appendFunc(f.getValue(data))
if err != nil {
if err == ErrNullStructData {
NullParent = f
continue
}
return err
}
} else {
switch dt := data.(type) {
case nil:
err := f.appendFunc(dt)
if err != nil {
if err == ErrNullStructData {
NullParent = f
continue
}
return err
}
case []any:
if len(d.children) < 1 {
for _, e := range dt {
err := f.appendFunc(e)
if err != nil {
if err == ErrNullStructData {
NullParent = f
continue
}
return err
}
}
} else {
for _, e := range dt {
d.children[0].loadDatum(e)
}
}
case map[string]any:
err := f.appendFunc(f.getValue(dt))
if err != nil {
if err == ErrNullStructData {
NullParent = f
continue
}
return err
}
}
}
}
for _, c := range d.children {
if c.list != nil {
c.loadDatum(c.list.getValue(data))
}
if c.mapField != nil {
switch dt := data.(type) {
case nil:
c.loadDatum(dt)
case map[string]any:
c.loadDatum(c.mapField.getValue(dt))
default:
c.loadDatum(c.mapField.getValue(data))
}
}
}
} else {
if d.list != nil {
switch dt := data.(type) {
case nil:
d.list.appendFunc(dt)
case []any:
d.list.appendFunc(dt)
for _, e := range dt {
if d.item != nil {
d.item.appendFunc(e)
}
var NullParent *fieldPos
for _, f := range d.fields {
if f.parent == NullParent {
continue
}
err := f.appendFunc(f.getValue(e))
if err != nil {
if err == ErrNullStructData {
NullParent = f
continue
}
return err
}
}
for _, c := range d.children {
if c.list != nil {
c.loadDatum(c.list.getValue(e))
}
if c.mapField != nil {
c.loadDatum(c.mapField.getValue(e))
}
}
}
case map[string]any:
d.list.appendFunc(dt["array"])
for _, e := range dt["array"].([]any) {
if d.item != nil {
d.item.appendFunc(e)
}
var NullParent *fieldPos
for _, f := range d.fields {
if f.parent == NullParent {
continue
}
err := f.appendFunc(f.getValue(e))
if err != nil {
if err == ErrNullStructData {
NullParent = f
continue
}
return err
}
}
for _, c := range d.children {
c.loadDatum(c.list.getValue(e))
}
}
default:
d.list.appendFunc(data)
d.item.appendFunc(dt)
}
}
if d.mapField != nil {
switch dt := data.(type) {
case nil:
d.mapField.appendFunc(dt)
case map[string]any:
d.mapField.appendFunc(dt)
for k, v := range dt {
d.mapKey.appendFunc(k)
if d.mapValue != nil {
d.mapValue.appendFunc(v)
} else {
d.children[0].loadDatum(v)
}
}
}
}
}
return nil
}
func (d *dataLoader) newChild() *dataLoader {
var child *dataLoader = &dataLoader{
depth: d.depth + 1,
}
d.children = append(d.children, child)
return child
}
func (d *dataLoader) newListChild(list *fieldPos) *dataLoader {
var child *dataLoader = &dataLoader{
list: list,
item: list.childrens[0],
depth: d.depth + 1,
}
d.children = append(d.children, child)
return child
}
func (d *dataLoader) newMapChild(mapField *fieldPos) *dataLoader {
var child *dataLoader = &dataLoader{
mapField: mapField,
depth: d.depth + 1,
}
d.children = append(d.children, child)
return child
}
type fieldPos struct {
parent *fieldPos
fieldName string
builder array.Builder
path []string
isList bool
isItem bool
isStruct bool
isMap bool
typeName string
appendFunc func(val interface{}) error
metadatas arrow.Metadata
childrens []*fieldPos
index, depth int32
}
func newFieldPos() *fieldPos { return &fieldPos{index: -1} }
func (f *fieldPos) children() []*fieldPos { return f.childrens }
func (f *fieldPos) newChild(childName string, childBuilder array.Builder, meta arrow.Metadata) *fieldPos {
var child fieldPos = fieldPos{
parent: f,
fieldName: childName,
builder: childBuilder,
metadatas: meta,
index: int32(len(f.childrens)),
depth: f.depth + 1,
}
if f.isList {
child.isItem = true
}
child.path = child.buildNamePath()
f.childrens = append(f.childrens, &child)
return &child
}
func (f *fieldPos) buildNamePath() []string {
var path []string
var listPath []string
cur := f
for i := f.depth - 1; i >= 0; i-- {
if cur.typeName == "" {
path = append([]string{cur.fieldName}, path...)
} else {
path = append([]string{cur.fieldName, cur.typeName}, path...)
}
if !cur.parent.isMap {
cur = cur.parent
}
}
if f.parent.parent != nil && f.parent.parent.isList {
for i := len(path) - 1; i >= 0; i-- {
if path[i] != "item" {
listPath = append([]string{path[i]}, listPath...)
} else {
return listPath
}
}
}
if f.parent != nil && f.parent.fieldName == "value" {
for i := len(path) - 1; i >= 0; i-- {
if path[i] != "value" {
listPath = append([]string{path[i]}, listPath...)
} else {
return listPath
}
}
}
return path
}
// NamePath returns a slice of keys making up the path to the field
func (f *fieldPos) namePath() []string { return f.path }
// GetValue retrieves the value from the map[string]any
// by following the field's key path
func (f *fieldPos) getValue(m any) any {
if _, ok := m.(map[string]any); !ok {
return m
}
for _, key := range f.namePath() {
valueMap, ok := m.(map[string]any)
if !ok {
if key == "item" {
return m
}
return nil
}
m, ok = valueMap[key]
if !ok {
return nil
}
}
return m
}
// Avro data is loaded to Arrow arrays using the following type mapping:
//
// Avro Go Arrow
// null nil Null
// boolean bool Boolean
// bytes []byte Binary
// float float32 Float32
// double float64 Float64
// long int64 Int64
// int int32 Int32
// string string String
// array []interface{} List
// enum string Dictionary
// fixed []byte FixedSizeBinary
// map and record map[string]any Struct
//
// mapFieldBuilders builds a tree of field builders matching the Arrow schema
func mapFieldBuilders(b array.Builder, field arrow.Field, parent *fieldPos) {
f := parent.newChild(field.Name, b, field.Metadata)
switch bt := b.(type) {
case *array.BinaryBuilder:
f.appendFunc = func(data interface{}) error {
appendBinaryData(bt, data)
return nil
}
case *array.BinaryDictionaryBuilder:
// has metadata for Avro enum symbols
f.appendFunc = func(data interface{}) error {
appendBinaryDictData(bt, data)
return nil
}
// add Avro enum symbols to builder
sb := array.NewStringBuilder(memory.DefaultAllocator)
for _, v := range field.Metadata.Values() {
sb.Append(v)
}
sa := sb.NewStringArray()
bt.InsertStringDictValues(sa)
case *array.BooleanBuilder:
f.appendFunc = func(data interface{}) error {
appendBoolData(bt, data)
return nil
}
case *array.Date32Builder:
f.appendFunc = func(data interface{}) error {
appendDate32Data(bt, data)
return nil
}
case *array.Decimal128Builder:
f.appendFunc = func(data interface{}) error {
err := appendDecimal128Data(bt, data)
if err != nil {
return err
}
return nil
}
case *array.Decimal256Builder:
f.appendFunc = func(data interface{}) error {
err := appendDecimal256Data(bt, data)
if err != nil {
return err
}
return nil
}
case *extensions.UUIDBuilder:
f.appendFunc = func(data interface{}) error {
switch dt := data.(type) {
case nil:
bt.AppendNull()
case string:
err := bt.AppendValueFromString(dt)
if err != nil {
return err
}
case []byte:
err := bt.AppendValueFromString(string(dt))
if err != nil {
return err
}
}
return nil
}
case *array.FixedSizeBinaryBuilder:
f.appendFunc = func(data interface{}) error {
appendFixedSizeBinaryData(bt, data)
return nil
}
case *array.Float32Builder:
f.appendFunc = func(data interface{}) error {
appendFloat32Data(bt, data)
return nil
}
case *array.Float64Builder:
f.appendFunc = func(data interface{}) error {
appendFloat64Data(bt, data)
return nil
}
case *array.Int32Builder:
f.appendFunc = func(data interface{}) error {
appendInt32Data(bt, data)
return nil
}
case *array.Int64Builder:
f.appendFunc = func(data interface{}) error {
appendInt64Data(bt, data)
return nil
}
case *array.LargeListBuilder:
vb := bt.ValueBuilder()
f.isList = true
mapFieldBuilders(vb, field.Type.(*arrow.LargeListType).ElemField(), f)
f.appendFunc = func(data interface{}) error {
switch dt := data.(type) {
case nil:
bt.AppendNull()
case []interface{}:
if len(dt) == 0 {
bt.AppendEmptyValue()
} else {
bt.Append(true)
}
default:
bt.Append(true)
}
return nil
}
case *array.ListBuilder:
vb := bt.ValueBuilder()
f.isList = true
mapFieldBuilders(vb, field.Type.(*arrow.ListType).ElemField(), f)
f.appendFunc = func(data interface{}) error {
switch dt := data.(type) {
case nil:
bt.AppendNull()
case []interface{}:
if len(dt) == 0 {
bt.AppendEmptyValue()
} else {
bt.Append(true)
}
default:
bt.Append(true)
}
return nil
}
case *array.MapBuilder:
// has metadata for objects in values
f.isMap = true
kb := bt.KeyBuilder()
ib := bt.ItemBuilder()
mapFieldBuilders(kb, field.Type.(*arrow.MapType).KeyField(), f)
mapFieldBuilders(ib, field.Type.(*arrow.MapType).ItemField(), f)
f.appendFunc = func(data interface{}) error {
switch data.(type) {
case nil:
bt.AppendNull()
default:
bt.Append(true)
}
return nil
}
case *array.MonthDayNanoIntervalBuilder:
f.appendFunc = func(data interface{}) error {
appendDurationData(bt, data)
return nil
}
case *array.StringBuilder:
f.appendFunc = func(data interface{}) error {
appendStringData(bt, data)
return nil
}
case *array.StructBuilder:
// has metadata for Avro Union named types
f.typeName, _ = field.Metadata.GetValue("typeName")
f.isStruct = true
// create children
for i, p := range field.Type.(*arrow.StructType).Fields() {
mapFieldBuilders(bt.FieldBuilder(i), p, f)
}
f.appendFunc = func(data interface{}) error {
switch data.(type) {
case nil:
bt.AppendNull()
return ErrNullStructData
default:
bt.Append(true)
}
return nil
}
case *array.Time32Builder:
f.appendFunc = func(data interface{}) error {
appendTime32Data(bt, data)
return nil
}
case *array.Time64Builder:
f.appendFunc = func(data interface{}) error {
appendTime64Data(bt, data)
return nil
}
case *array.TimestampBuilder:
f.appendFunc = func(data interface{}) error {
appendTimestampData(bt, data)
return nil
}
}
}
func appendBinaryData(b *array.BinaryBuilder, data interface{}) {
switch dt := data.(type) {
case nil:
b.AppendNull()
case map[string]any:
switch ct := dt["bytes"].(type) {
case nil:
b.AppendNull()
default:
b.Append(ct.([]byte))
}
default:
b.Append(fmt.Append([]byte{}, data))
}
}
func appendBinaryDictData(b *array.BinaryDictionaryBuilder, data interface{}) {
switch dt := data.(type) {
case nil:
b.AppendNull()
case string:
b.AppendString(dt)
case map[string]any:
switch v := dt["string"].(type) {
case nil:
b.AppendNull()
case string:
b.AppendString(v)
}
}
}
func appendBoolData(b *array.BooleanBuilder, data interface{}) {
switch dt := data.(type) {
case nil:
b.AppendNull()
case bool:
b.Append(dt)
case map[string]any:
switch v := dt["boolean"].(type) {
case nil:
b.AppendNull()
case bool:
b.Append(v)
}
}
}
func appendDate32Data(b *array.Date32Builder, data interface{}) {
switch dt := data.(type) {
case nil:
b.AppendNull()
case int32:
b.Append(arrow.Date32(dt))
case map[string]any:
switch v := dt["int"].(type) {
case nil:
b.AppendNull()
case int32:
b.Append(arrow.Date32(v))
}
}
}
func appendDecimal128Data(b *array.Decimal128Builder, data interface{}) error {
switch dt := data.(type) {
case nil:
b.AppendNull()
case []byte:
buf := bytes.NewBuffer(dt)
if len(dt) <= 38 {
var intData int64
err := binary.Read(buf, binary.BigEndian, &intData)
if err != nil {
return err
}
b.Append(decimal128.FromI64(intData))
} else {
var bigIntData big.Int
b.Append(decimal128.FromBigInt(bigIntData.SetBytes(buf.Bytes())))
}
case map[string]any:
buf := bytes.NewBuffer(dt["bytes"].([]byte))
if len(dt["bytes"].([]byte)) <= 38 {
var intData int64
err := binary.Read(buf, binary.BigEndian, &intData)
if err != nil {
return err
}
b.Append(decimal128.FromI64(intData))
} else {
var bigIntData big.Int
b.Append(decimal128.FromBigInt(bigIntData.SetBytes(buf.Bytes())))
}
}
return nil
}
func appendDecimal256Data(b *array.Decimal256Builder, data interface{}) error {
switch dt := data.(type) {
case nil:
b.AppendNull()
case []byte:
var bigIntData big.Int
buf := bytes.NewBuffer(dt)
b.Append(decimal256.FromBigInt(bigIntData.SetBytes(buf.Bytes())))
case map[string]any:
var bigIntData big.Int
buf := bytes.NewBuffer(dt["bytes"].([]byte))
b.Append(decimal256.FromBigInt(bigIntData.SetBytes(buf.Bytes())))
}
return nil
}
// Avro duration logical type annotates Avro fixed type of size 12, which stores three little-endian
// unsigned integers that represent durations at different granularities of time. The first stores
// a number in months, the second stores a number in days, and the third stores a number in milliseconds.
func appendDurationData(b *array.MonthDayNanoIntervalBuilder, data interface{}) {
switch dt := data.(type) {
case nil:
b.AppendNull()
case []byte:
dur := new(arrow.MonthDayNanoInterval)
dur.Months = int32(binary.LittleEndian.Uint16(dt[:3]))
dur.Days = int32(binary.LittleEndian.Uint16(dt[4:7]))
dur.Nanoseconds = int64(binary.LittleEndian.Uint32(dt[8:]) * 1000000)
b.Append(*dur)
case map[string]any:
switch dtb := dt["bytes"].(type) {
case nil:
b.AppendNull()
case []byte:
dur := new(arrow.MonthDayNanoInterval)
dur.Months = int32(binary.LittleEndian.Uint16(dtb[:3]))
dur.Days = int32(binary.LittleEndian.Uint16(dtb[4:7]))
dur.Nanoseconds = int64(binary.LittleEndian.Uint32(dtb[8:]) * 1000000)
b.Append(*dur)
}
}
}
func appendFixedSizeBinaryData(b *array.FixedSizeBinaryBuilder, data interface{}) {
switch dt := data.(type) {
case nil:
b.AppendNull()
case []byte:
b.Append(dt)
case map[string]any:
switch v := dt["bytes"].(type) {
case nil:
b.AppendNull()
case []byte:
b.Append(v)
}
}
}
func appendFloat32Data(b *array.Float32Builder, data interface{}) {
switch dt := data.(type) {
case nil:
b.AppendNull()
case float32:
b.Append(dt)
case map[string]any:
switch v := dt["float"].(type) {
case nil:
b.AppendNull()
case float32:
b.Append(v)
}
}
}
func appendFloat64Data(b *array.Float64Builder, data interface{}) {
switch dt := data.(type) {
case nil:
b.AppendNull()
case float64:
b.Append(dt)
case map[string]any:
switch v := dt["double"].(type) {
case nil:
b.AppendNull()
case float64:
b.Append(v)
}
}
}
func appendInt32Data(b *array.Int32Builder, data interface{}) {
switch dt := data.(type) {
case nil:
b.AppendNull()
case int:
b.Append(int32(dt))
case int32:
b.Append(dt)
case map[string]any:
switch v := dt["int"].(type) {
case nil:
b.AppendNull()
case int:
b.Append(int32(v))
case int32:
b.Append(v)
}
}
}
func appendInt64Data(b *array.Int64Builder, data interface{}) {
switch dt := data.(type) {
case nil:
b.AppendNull()
case int:
b.Append(int64(dt))
case int64:
b.Append(dt)
case map[string]any:
switch v := dt["long"].(type) {
case nil:
b.AppendNull()
case int:
b.Append(int64(v))
case int64:
b.Append(v)
}
}
}
func appendStringData(b *array.StringBuilder, data interface{}) {
switch dt := data.(type) {
case nil:
b.AppendNull()
case string:
b.Append(dt)
case map[string]any:
switch v := dt["string"].(type) {
case nil:
b.AppendNull()
case string:
b.Append(v)
}
default:
b.Append(fmt.Sprint(data))
}
}
func appendTime32Data(b *array.Time32Builder, data interface{}) {
switch dt := data.(type) {
case nil:
b.AppendNull()
case int32:
b.Append(arrow.Time32(dt))
case map[string]any:
switch v := dt["int"].(type) {
case nil:
b.AppendNull()
case int32:
b.Append(arrow.Time32(v))
}
}
}
func appendTime64Data(b *array.Time64Builder, data interface{}) {
switch dt := data.(type) {
case nil:
b.AppendNull()
case int64:
b.Append(arrow.Time64(dt))
case map[string]any:
switch v := dt["long"].(type) {
case nil:
b.AppendNull()
case int64:
b.Append(arrow.Time64(v))
}
}
}
func appendTimestampData(b *array.TimestampBuilder, data interface{}) {
switch dt := data.(type) {
case nil:
b.AppendNull()
case int64:
b.Append(arrow.Timestamp(dt))
case map[string]any:
switch v := dt["long"].(type) {
case nil:
b.AppendNull()
case int64:
b.Append(arrow.Timestamp(v))
}
}
}