arrow/array/union.go (975 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package array import ( "bytes" "errors" "fmt" "math" "reflect" "strings" "github.com/apache/arrow-go/v18/arrow" "github.com/apache/arrow-go/v18/arrow/bitutil" "github.com/apache/arrow-go/v18/arrow/internal/debug" "github.com/apache/arrow-go/v18/arrow/memory" "github.com/apache/arrow-go/v18/internal/bitutils" "github.com/apache/arrow-go/v18/internal/json" ) // Union is a convenience interface to encompass both Sparse and Dense // union array types. type Union interface { arrow.Array // NumFields returns the number of child fields in this union. // Equivalent to len(UnionType().Fields()) NumFields() int // Validate returns an error if there are any issues with the lengths // or types of the children arrays mismatching with the Type of the // Union Array. nil is returned if there are no problems. Validate() error // ValidateFull runs the same checks that Validate() does, but additionally // checks that all childIDs are valid (>= 0 || ==InvalidID) and for // dense unions validates that all offsets are within the bounds of their // respective child. ValidateFull() error // TypeCodes returns the type id buffer for the union Array, equivalent to // Data().Buffers()[1]. Note: This will not account for any slice offset. TypeCodes() *memory.Buffer // RawTypeCodes returns a slice of UnionTypeCodes properly accounting for // any slice offset. RawTypeCodes() []arrow.UnionTypeCode // TypeCode returns the logical type code of the value at the requested index TypeCode(i int) arrow.UnionTypeCode // ChildID returns the index of the physical child containing the value // at the requested index. Equivalent to: // // arr.UnionType().ChildIDs()[arr.RawTypeCodes()[i+arr.Data().Offset()]] ChildID(i int) int // UnionType is a convenience function to retrieve the properly typed UnionType // instead of having to call DataType() and manually assert the type. UnionType() arrow.UnionType // Mode returns the union mode of the underlying Array, either arrow.SparseMode // or arrow.DenseMode. Mode() arrow.UnionMode // Field returns the requested child array for this union. Returns nil if a // nonexistent position is passed in. // // The appropriate child for an index can be retrieved with Field(ChildID(index)) Field(pos int) arrow.Array } const kMaxElems = math.MaxInt32 type union struct { array unionType arrow.UnionType typecodes []arrow.UnionTypeCode children []arrow.Array } func (a *union) Retain() { a.array.Retain() for _, c := range a.children { c.Retain() } } func (a *union) Release() { a.array.Release() for _, c := range a.children { c.Release() } } func (a *union) NumFields() int { return len(a.unionType.Fields()) } func (a *union) Mode() arrow.UnionMode { return a.unionType.Mode() } func (a *union) UnionType() arrow.UnionType { return a.unionType } func (a *union) TypeCodes() *memory.Buffer { return a.data.buffers[1] } func (a *union) RawTypeCodes() []arrow.UnionTypeCode { if a.data.length > 0 { return a.typecodes[a.data.offset:] } return []arrow.UnionTypeCode{} } func (a *union) TypeCode(i int) arrow.UnionTypeCode { return a.typecodes[i+a.data.offset] } func (a *union) ChildID(i int) int { return a.unionType.ChildIDs()[a.typecodes[i+a.data.offset]] } func (a *union) setData(data *Data) { a.unionType = data.dtype.(arrow.UnionType) debug.Assert(len(data.buffers) >= 2, "arrow/array: invalid number of union array buffers") if data.length > 0 { a.typecodes = arrow.Int8Traits.CastFromBytes(data.buffers[1].Bytes()) } else { a.typecodes = []int8{} } a.children = make([]arrow.Array, len(data.childData)) for i, child := range data.childData { if a.unionType.Mode() == arrow.SparseMode && (data.offset != 0 || child.Len() != data.length) { child = NewSliceData(child, int64(data.offset), int64(data.offset+data.length)) defer child.Release() } a.children[i] = MakeFromData(child) } a.array.setData(data) } func (a *union) Field(pos int) (result arrow.Array) { if pos < 0 || pos >= len(a.children) { return nil } return a.children[pos] } func (a *union) Validate() error { fields := a.unionType.Fields() for i, f := range fields { fieldData := a.data.childData[i] if a.unionType.Mode() == arrow.SparseMode && fieldData.Len() < a.data.length+a.data.offset { return fmt.Errorf("arrow/array: sparse union child array #%d has length smaller than expected for union array (%d < %d)", i, fieldData.Len(), a.data.length+a.data.offset) } if !arrow.TypeEqual(f.Type, fieldData.DataType()) { return fmt.Errorf("arrow/array: union child array #%d does not match type field %s vs %s", i, fieldData.DataType(), f.Type) } } return nil } func (a *union) ValidateFull() error { if err := a.Validate(); err != nil { return err } childIDs := a.unionType.ChildIDs() codesMap := a.unionType.TypeCodes() codes := a.RawTypeCodes() for i := 0; i < a.data.length; i++ { code := codes[i] if code < 0 || childIDs[code] == arrow.InvalidUnionChildID { return fmt.Errorf("arrow/array: union value at position %d has invalid type id %d", i, code) } } if a.unionType.Mode() == arrow.DenseMode { // validate offsets // map logical typeid to child length var childLengths [256]int64 for i := range a.unionType.Fields() { childLengths[codesMap[i]] = int64(a.data.childData[i].Len()) } // check offsets are in bounds var lastOffsets [256]int64 offsets := arrow.Int32Traits.CastFromBytes(a.data.buffers[2].Bytes())[a.data.offset:] for i := int64(0); i < int64(a.data.length); i++ { code := codes[i] offset := offsets[i] switch { case offset < 0: return fmt.Errorf("arrow/array: union value at position %d has negative offset %d", i, offset) case offset >= int32(childLengths[code]): return fmt.Errorf("arrow/array: union value at position %d has offset larger than child length (%d >= %d)", i, offset, childLengths[code]) case offset < int32(lastOffsets[code]): return fmt.Errorf("arrow/array: union value at position %d has non-monotonic offset %d", i, offset) } lastOffsets[code] = int64(offset) } } return nil } // SparseUnion represents an array where each logical value is taken from // a single child. A buffer of 8-bit type ids indicates which child a given // logical value is to be taken from. This is represented as the ChildID, // which is the index into the list of children. // // In a sparse union, each child array will have the same length as the // union array itself, regardless of how many values in the union actually // refer to it. // // Unlike most other arrays, unions do not have a top-level validity bitmap. type SparseUnion struct { union } // NewSparseUnion constructs a union array using the given type, length, list of // children and buffer of typeIDs with the given offset. func NewSparseUnion(dt *arrow.SparseUnionType, length int, children []arrow.Array, typeIDs *memory.Buffer, offset int) *SparseUnion { childData := make([]arrow.ArrayData, len(children)) for i, c := range children { childData[i] = c.Data() } data := NewData(dt, length, []*memory.Buffer{nil, typeIDs}, childData, 0, offset) defer data.Release() return NewSparseUnionData(data) } // NewSparseUnionData constructs a SparseUnion array from the given ArrayData object. func NewSparseUnionData(data arrow.ArrayData) *SparseUnion { a := &SparseUnion{} a.refCount.Add(1) a.setData(data.(*Data)) return a } // NewSparseUnionFromArrays constructs a new SparseUnion array with the provided // values. // // typeIDs *must* be an INT8 array with no nulls // len(codes) *must* be either 0 or equal to len(children). If len(codes) is 0, // the type codes used will be sequentially numeric starting at 0. func NewSparseUnionFromArrays(typeIDs arrow.Array, children []arrow.Array, codes ...arrow.UnionTypeCode) (*SparseUnion, error) { return NewSparseUnionFromArraysWithFieldCodes(typeIDs, children, []string{}, codes) } // NewSparseUnionFromArrayWithFields constructs a new SparseUnion array like // NewSparseUnionFromArrays, but allows specifying the field names. Type codes // will be auto-generated sequentially starting at 0. // // typeIDs *must* be an INT8 array with no nulls. // len(fields) *must* either be 0 or equal to len(children). If len(fields) is 0, // then the fields will be named sequentially starting at "0". func NewSparseUnionFromArraysWithFields(typeIDs arrow.Array, children []arrow.Array, fields []string) (*SparseUnion, error) { return NewSparseUnionFromArraysWithFieldCodes(typeIDs, children, fields, []arrow.UnionTypeCode{}) } // NewSparseUnionFromArraysWithFieldCodes combines the other constructors // for constructing a new SparseUnion array with the provided field names // and type codes, along with children and type ids. // // All the requirements mentioned in NewSparseUnionFromArrays and // NewSparseUnionFromArraysWithFields apply. func NewSparseUnionFromArraysWithFieldCodes(typeIDs arrow.Array, children []arrow.Array, fields []string, codes []arrow.UnionTypeCode) (*SparseUnion, error) { switch { case typeIDs.DataType().ID() != arrow.INT8: return nil, errors.New("arrow/array: union array type ids must be signed int8") case typeIDs.NullN() != 0: return nil, errors.New("arrow/array: union type ids may not have nulls") case len(fields) > 0 && len(fields) != len(children): return nil, errors.New("arrow/array: field names must have the same length as children") case len(codes) > 0 && len(codes) != len(children): return nil, errors.New("arrow/array: type codes must have same length as children") } buffers := []*memory.Buffer{nil, typeIDs.Data().Buffers()[1]} ty := arrow.SparseUnionFromArrays(children, fields, codes) childData := make([]arrow.ArrayData, len(children)) for i, c := range children { childData[i] = c.Data() if c.Len() != typeIDs.Len() { return nil, errors.New("arrow/array: sparse union array must have len(child) == len(typeids) for all children") } } data := NewData(ty, typeIDs.Len(), buffers, childData, 0, typeIDs.Data().Offset()) defer data.Release() return NewSparseUnionData(data), nil } func (a *SparseUnion) setData(data *Data) { a.union.setData(data) debug.Assert(a.data.dtype.ID() == arrow.SPARSE_UNION, "arrow/array: invalid data type for SparseUnion") debug.Assert(len(a.data.buffers) == 2, "arrow/array: sparse unions should have exactly 2 buffers") debug.Assert(a.data.buffers[0] == nil, "arrow/array: validity bitmap for sparse unions should be nil") } func (a *SparseUnion) GetOneForMarshal(i int) interface{} { typeID := a.RawTypeCodes()[i] childID := a.ChildID(i) data := a.Field(childID) if data.IsNull(i) { return nil } return []interface{}{typeID, data.GetOneForMarshal(i)} } func (a *SparseUnion) MarshalJSON() ([]byte, error) { var buf bytes.Buffer enc := json.NewEncoder(&buf) buf.WriteByte('[') for i := 0; i < a.Len(); i++ { if i != 0 { buf.WriteByte(',') } if err := enc.Encode(a.GetOneForMarshal(i)); err != nil { return nil, err } } buf.WriteByte(']') return buf.Bytes(), nil } func (a *SparseUnion) ValueStr(i int) string { if a.IsNull(i) { return NullValueStr } val := a.GetOneForMarshal(i) if val == nil { // child is nil return NullValueStr } data, err := json.Marshal(val) if err != nil { panic(err) } return string(data) } func (a *SparseUnion) String() string { var b strings.Builder b.WriteByte('[') fieldList := a.unionType.Fields() for i := 0; i < a.Len(); i++ { if i > 0 { b.WriteString(" ") } field := fieldList[a.ChildID(i)] f := a.Field(a.ChildID(i)) fmt.Fprintf(&b, "{%s=%v}", field.Name, f.GetOneForMarshal(i)) } b.WriteByte(']') return b.String() } // GetFlattenedField returns a child array, adjusting its validity bitmap // where the union array type codes don't match. // // ie: the returned array will have a null in every index that it is // not referenced by union. func (a *SparseUnion) GetFlattenedField(mem memory.Allocator, index int) (arrow.Array, error) { if index < 0 || index >= a.NumFields() { return nil, fmt.Errorf("arrow/array: index out of range: %d", index) } childData := a.data.childData[index] if a.data.offset != 0 || a.data.length != childData.Len() { childData = NewSliceData(childData, int64(a.data.offset), int64(a.data.offset+a.data.length)) // NewSliceData doesn't break the slice reference for buffers // since we're going to replace the null bitmap buffer we need to break the // slice reference so that we don't affect a.children's references newBufs := make([]*memory.Buffer, len(childData.Buffers())) copy(newBufs, childData.(*Data).buffers) childData.(*Data).buffers = newBufs } else { childData = childData.(*Data).Copy() } defer childData.Release() // synthesize a null bitmap based on the union discriminant // make sure the bitmap has extra bits corresponding to the child's offset flattenedNullBitmap := memory.NewResizableBuffer(mem) flattenedNullBitmap.Resize(childData.Len() + childData.Offset()) var ( childNullBitmap = childData.Buffers()[0] childOffset = childData.Offset() typeCode = a.unionType.TypeCodes()[index] codes = a.RawTypeCodes() offset int64 = 0 ) bitutils.GenerateBitsUnrolled(flattenedNullBitmap.Bytes(), int64(childOffset), int64(a.data.length), func() bool { b := codes[offset] == typeCode offset++ return b }) if childNullBitmap != nil { defer childNullBitmap.Release() bitutil.BitmapAnd(flattenedNullBitmap.Bytes(), childNullBitmap.Bytes(), int64(childOffset), int64(childOffset), flattenedNullBitmap.Bytes(), int64(childOffset), int64(childData.Len())) } childData.(*Data).buffers[0] = flattenedNullBitmap childData.(*Data).nulls = childData.Len() - bitutil.CountSetBits(flattenedNullBitmap.Bytes(), childOffset, childData.Len()) return MakeFromData(childData), nil } func arraySparseUnionEqual(l, r *SparseUnion) bool { childIDs := l.unionType.ChildIDs() leftCodes, rightCodes := l.RawTypeCodes(), r.RawTypeCodes() for i := 0; i < l.data.length; i++ { typeID := leftCodes[i] if typeID != rightCodes[i] { return false } childNum := childIDs[typeID] eq := SliceEqual(l.children[childNum], int64(i), int64(i+1), r.children[childNum], int64(i), int64(i+1)) if !eq { return false } } return true } func arraySparseUnionApproxEqual(l, r *SparseUnion, opt equalOption) bool { childIDs := l.unionType.ChildIDs() leftCodes, rightCodes := l.RawTypeCodes(), r.RawTypeCodes() for i := 0; i < l.data.length; i++ { typeID := leftCodes[i] if typeID != rightCodes[i] { return false } childNum := childIDs[typeID] eq := sliceApproxEqual(l.children[childNum], int64(i+l.data.offset), int64(i+l.data.offset+1), r.children[childNum], int64(i+r.data.offset), int64(i+r.data.offset+1), opt) if !eq { return false } } return true } // DenseUnion represents an array where each logical value is taken from // a single child, at a specific offset. A buffer of 8-bit type ids // indicates which child a given logical value is to be taken from and // a buffer of 32-bit offsets indicating which physical position in the // given child array has the logical value for that index. // // Unlike a sparse union, a dense union allows encoding only the child values // which are actually referred to by the union array. This is counterbalanced // by the additional footprint of the offsets buffer, and the additional // indirection cost when looking up values. // // Unlike most other arrays, unions do not have a top-level validity bitmap. type DenseUnion struct { union offsets []int32 } // NewDenseUnion constructs a union array using the given type, length, list of // children and buffers of typeIDs and offsets, with the given array offset. func NewDenseUnion(dt *arrow.DenseUnionType, length int, children []arrow.Array, typeIDs, valueOffsets *memory.Buffer, offset int) *DenseUnion { childData := make([]arrow.ArrayData, len(children)) for i, c := range children { childData[i] = c.Data() } data := NewData(dt, length, []*memory.Buffer{nil, typeIDs, valueOffsets}, childData, 0, offset) defer data.Release() return NewDenseUnionData(data) } // NewDenseUnionData constructs a DenseUnion array from the given ArrayData object. func NewDenseUnionData(data arrow.ArrayData) *DenseUnion { a := &DenseUnion{} a.refCount.Add(1) a.setData(data.(*Data)) return a } // NewDenseUnionFromArrays constructs a new DenseUnion array with the provided // values. // // typeIDs *must* be an INT8 array with no nulls // offsets *must* be an INT32 array with no nulls // len(codes) *must* be either 0 or equal to len(children). If len(codes) is 0, // the type codes used will be sequentially numeric starting at 0. func NewDenseUnionFromArrays(typeIDs, offsets arrow.Array, children []arrow.Array, codes ...arrow.UnionTypeCode) (*DenseUnion, error) { return NewDenseUnionFromArraysWithFieldCodes(typeIDs, offsets, children, []string{}, codes) } // NewDenseUnionFromArrayWithFields constructs a new DenseUnion array like // NewDenseUnionFromArrays, but allows specifying the field names. Type codes // will be auto-generated sequentially starting at 0. // // typeIDs *must* be an INT8 array with no nulls. // offsets *must* be an INT32 array with no nulls. // len(fields) *must* either be 0 or equal to len(children). If len(fields) is 0, // then the fields will be named sequentially starting at "0". func NewDenseUnionFromArraysWithFields(typeIDs, offsets arrow.Array, children []arrow.Array, fields []string) (*DenseUnion, error) { return NewDenseUnionFromArraysWithFieldCodes(typeIDs, offsets, children, fields, []arrow.UnionTypeCode{}) } // NewDenseUnionFromArraysWithFieldCodes combines the other constructors // for constructing a new DenseUnion array with the provided field names // and type codes, along with children and type ids. // // All the requirements mentioned in NewDenseUnionFromArrays and // NewDenseUnionFromArraysWithFields apply. func NewDenseUnionFromArraysWithFieldCodes(typeIDs, offsets arrow.Array, children []arrow.Array, fields []string, codes []arrow.UnionTypeCode) (*DenseUnion, error) { switch { case offsets.DataType().ID() != arrow.INT32: return nil, errors.New("arrow/array: union offsets must be signed int32") case typeIDs.DataType().ID() != arrow.INT8: return nil, errors.New("arrow/array: union type_ids must be signed int8") case typeIDs.NullN() != 0: return nil, errors.New("arrow/array: union typeIDs may not have nulls") case offsets.NullN() != 0: return nil, errors.New("arrow/array: nulls are not allowed in offsets for NewDenseUnionFromArrays*") case len(fields) > 0 && len(fields) != len(children): return nil, errors.New("arrow/array: fields must be the same length as children") case len(codes) > 0 && len(codes) != len(children): return nil, errors.New("arrow/array: typecodes must have the same length as children") } ty := arrow.DenseUnionFromArrays(children, fields, codes) buffers := []*memory.Buffer{nil, typeIDs.Data().Buffers()[1], offsets.Data().Buffers()[1]} childData := make([]arrow.ArrayData, len(children)) for i, c := range children { childData[i] = c.Data() } data := NewData(ty, typeIDs.Len(), buffers, childData, 0, typeIDs.Data().Offset()) defer data.Release() return NewDenseUnionData(data), nil } func (a *DenseUnion) ValueOffsets() *memory.Buffer { return a.data.buffers[2] } func (a *DenseUnion) ValueOffset(i int) int32 { return a.offsets[i+a.data.offset] } func (a *DenseUnion) RawValueOffsets() []int32 { return a.offsets[a.data.offset:] } func (a *DenseUnion) setData(data *Data) { a.union.setData(data) debug.Assert(a.data.dtype.ID() == arrow.DENSE_UNION, "arrow/array: invalid data type for DenseUnion") debug.Assert(len(a.data.buffers) == 3, "arrow/array: dense unions should have exactly 3 buffers") debug.Assert(a.data.buffers[0] == nil, "arrow/array: validity bitmap for dense unions should be nil") if data.length > 0 { a.offsets = arrow.Int32Traits.CastFromBytes(a.data.buffers[2].Bytes()) } else { a.offsets = []int32{} } } func (a *DenseUnion) GetOneForMarshal(i int) interface{} { typeID := a.RawTypeCodes()[i] childID := a.ChildID(i) data := a.Field(childID) offset := int(a.RawValueOffsets()[i]) if data.IsNull(offset) { return nil } return []interface{}{typeID, data.GetOneForMarshal(offset)} } func (a *DenseUnion) MarshalJSON() ([]byte, error) { var buf bytes.Buffer enc := json.NewEncoder(&buf) buf.WriteByte('[') for i := 0; i < a.Len(); i++ { if i != 0 { buf.WriteByte(',') } if err := enc.Encode(a.GetOneForMarshal(i)); err != nil { return nil, err } } buf.WriteByte(']') return buf.Bytes(), nil } func (a *DenseUnion) ValueStr(i int) string { if a.IsNull(i) { return NullValueStr } val := a.GetOneForMarshal(i) if val == nil { // child in nil return NullValueStr } data, err := json.Marshal(val) if err != nil { panic(err) } return string(data) } func (a *DenseUnion) String() string { var b strings.Builder b.WriteByte('[') offsets := a.RawValueOffsets() fieldList := a.unionType.Fields() for i := 0; i < a.Len(); i++ { if i > 0 { b.WriteString(" ") } field := fieldList[a.ChildID(i)] f := a.Field(a.ChildID(i)) fmt.Fprintf(&b, "{%s=%v}", field.Name, f.GetOneForMarshal(int(offsets[i]))) } b.WriteByte(']') return b.String() } func arrayDenseUnionEqual(l, r *DenseUnion) bool { childIDs := l.unionType.ChildIDs() leftCodes, rightCodes := l.RawTypeCodes(), r.RawTypeCodes() leftOffsets, rightOffsets := l.RawValueOffsets(), r.RawValueOffsets() for i := 0; i < l.data.length; i++ { typeID := leftCodes[i] if typeID != rightCodes[i] { return false } childNum := childIDs[typeID] eq := SliceEqual(l.children[childNum], int64(leftOffsets[i]), int64(leftOffsets[i]+1), r.children[childNum], int64(rightOffsets[i]), int64(rightOffsets[i]+1)) if !eq { return false } } return true } func arrayDenseUnionApproxEqual(l, r *DenseUnion, opt equalOption) bool { childIDs := l.unionType.ChildIDs() leftCodes, rightCodes := l.RawTypeCodes(), r.RawTypeCodes() leftOffsets, rightOffsets := l.RawValueOffsets(), r.RawValueOffsets() for i := 0; i < l.data.length; i++ { typeID := leftCodes[i] if typeID != rightCodes[i] { return false } childNum := childIDs[typeID] eq := sliceApproxEqual(l.children[childNum], int64(leftOffsets[i]), int64(leftOffsets[i]+1), r.children[childNum], int64(rightOffsets[i]), int64(rightOffsets[i]+1), opt) if !eq { return false } } return true } // UnionBuilder is a convenience interface for building Union arrays of // either Dense or Sparse mode. type UnionBuilder interface { Builder // AppendChild allows constructing the union type on the fly by making new // new array builder available to the union builder. The type code (index) // of the new child is returned, which should be passed to the Append method // when adding a new element to the union array. AppendChild(newChild Builder, fieldName string) (newCode arrow.UnionTypeCode) // Append adds an element to the UnionArray indicating which typecode the // new element should use. This *must* be followed up by an append to the // appropriate child builder. Append(arrow.UnionTypeCode) // Mode returns what kind of Union is being built, either arrow.SparseMode // or arrow.DenseMode Mode() arrow.UnionMode // Child returns the builder for the requested child index. // If an invalid index is requested (e.g. <0 or >len(children)) // then this will panic. Child(idx int) Builder } type unionBuilder struct { builder childFields []arrow.Field codes []arrow.UnionTypeCode mode arrow.UnionMode children []Builder typeIDtoBuilder []Builder typeIDtoChildID []int // for all typeID < denseTypeID, typeIDtoBuilder[typeID] != nil denseTypeID arrow.UnionTypeCode typesBuilder *int8BufferBuilder } func newUnionBuilder(mem memory.Allocator, children []Builder, typ arrow.UnionType) *unionBuilder { if children == nil { children = make([]Builder, 0) } b := unionBuilder{ builder: builder{mem: mem}, mode: typ.Mode(), codes: typ.TypeCodes(), children: children, typeIDtoChildID: make([]int, int(typ.MaxTypeCode())+1), // convert to int as int8(127) +1 panics typeIDtoBuilder: make([]Builder, int(typ.MaxTypeCode())+1), // convert to int as int8(127) +1 panics childFields: make([]arrow.Field, len(children)), typesBuilder: newInt8BufferBuilder(mem), } b.refCount.Add(1) b.typeIDtoChildID[0] = arrow.InvalidUnionChildID for i := 1; i < len(b.typeIDtoChildID); i *= 2 { copy(b.typeIDtoChildID[i:], b.typeIDtoChildID[:i]) } debug.Assert(len(children) == len(typ.TypeCodes()), "mismatched typecodes and children") debug.Assert(len(b.typeIDtoBuilder)-1 <= int(arrow.MaxUnionTypeCode), "too many typeids") copy(b.childFields, typ.Fields()) for i, c := range children { c.Retain() typeID := typ.TypeCodes()[i] b.typeIDtoChildID[typeID] = i b.typeIDtoBuilder[typeID] = c } return &b } func (b *unionBuilder) NumChildren() int { return len(b.children) } func (b *unionBuilder) Child(idx int) Builder { if idx < 0 || idx > len(b.children) { panic("arrow/array: invalid child index for union builder") } return b.children[idx] } // Len returns the current number of elements in the builder. func (b *unionBuilder) Len() int { return b.typesBuilder.Len() } func (b *unionBuilder) Mode() arrow.UnionMode { return b.mode } func (b *unionBuilder) reserve(elements int, resize func(int)) { // union has no null bitmap, ever so we can skip that handling if b.length+elements > b.capacity { b.capacity = bitutil.NextPowerOf2(b.length + elements) resize(b.capacity) } } func (b *unionBuilder) Release() { debug.Assert(b.refCount.Load() > 0, "too many releases") if b.refCount.Add(-1) == 0 { for _, c := range b.children { c.Release() } b.typesBuilder.Release() } } func (b *unionBuilder) Type() arrow.DataType { fields := make([]arrow.Field, len(b.childFields)) for i, f := range b.childFields { fields[i] = f fields[i].Type = b.children[i].Type() } switch b.mode { case arrow.SparseMode: return arrow.SparseUnionOf(fields, b.codes) case arrow.DenseMode: return arrow.DenseUnionOf(fields, b.codes) default: panic("invalid union builder mode") } } func (b *unionBuilder) AppendChild(newChild Builder, fieldName string) arrow.UnionTypeCode { newChild.Retain() b.children = append(b.children, newChild) newType := b.nextTypeID() b.typeIDtoChildID[newType] = len(b.children) - 1 b.typeIDtoBuilder[newType] = newChild b.childFields = append(b.childFields, arrow.Field{Name: fieldName, Nullable: true}) b.codes = append(b.codes, newType) return newType } func (b *unionBuilder) nextTypeID() arrow.UnionTypeCode { // find typeID such that typeIDtoBuilder[typeID] == nil // use that for the new child. Start searching at denseTypeID // since typeIDtoBuilder is densely packed up at least to denseTypeID for ; int(b.denseTypeID) < len(b.typeIDtoBuilder); b.denseTypeID++ { if b.typeIDtoBuilder[b.denseTypeID] == nil { id := b.denseTypeID b.denseTypeID++ return id } } debug.Assert(len(b.typeIDtoBuilder) < int(arrow.MaxUnionTypeCode), "too many children typeids") // typeIDtoBuilder is already densely packed, so just append the new child b.typeIDtoBuilder = append(b.typeIDtoBuilder, nil) b.typeIDtoChildID = append(b.typeIDtoChildID, arrow.InvalidUnionChildID) id := b.denseTypeID b.denseTypeID++ return id } func (b *unionBuilder) newData() *Data { length := b.typesBuilder.Len() typesBuffer := b.typesBuilder.Finish() defer typesBuffer.Release() childData := make([]arrow.ArrayData, len(b.children)) for i, b := range b.children { childData[i] = b.newData() defer childData[i].Release() } return NewData(b.Type(), length, []*memory.Buffer{nil, typesBuffer}, childData, 0, 0) } // SparseUnionBuilder is used to build a Sparse Union array using the Append // methods. You can also add new types to the union on the fly by using // AppendChild. // // Keep in mind: All children of a SparseUnion should be the same length // as the union itself. If you add new children with AppendChild, ensure // that they have the correct number of preceding elements that have been // added to the builder beforehand. type SparseUnionBuilder struct { *unionBuilder } // NewEmptySparseUnionBuilder is a helper to construct a SparseUnionBuilder // without having to predefine the union types. It creates a builder with no // children and AppendChild will have to be called before appending any // elements to this builder. func NewEmptySparseUnionBuilder(mem memory.Allocator) *SparseUnionBuilder { return &SparseUnionBuilder{ unionBuilder: newUnionBuilder(mem, nil, arrow.SparseUnionOf([]arrow.Field{}, []arrow.UnionTypeCode{})), } } // NewSparseUnionBuilder constructs a new SparseUnionBuilder with the provided // children and type codes. Builders will be constructed for each child // using the fields in typ func NewSparseUnionBuilder(mem memory.Allocator, typ *arrow.SparseUnionType) *SparseUnionBuilder { children := make([]Builder, typ.NumFields()) for i, f := range typ.Fields() { children[i] = NewBuilder(mem, f.Type) defer children[i].Release() } return NewSparseUnionBuilderWithBuilders(mem, typ, children) } // NewSparseUnionWithBuilders returns a new SparseUnionBuilder using the // provided type and builders. func NewSparseUnionBuilderWithBuilders(mem memory.Allocator, typ *arrow.SparseUnionType, children []Builder) *SparseUnionBuilder { return &SparseUnionBuilder{ unionBuilder: newUnionBuilder(mem, children, typ), } } func (b *SparseUnionBuilder) Reserve(n int) { b.reserve(n, b.Resize) } func (b *SparseUnionBuilder) Resize(n int) { b.typesBuilder.resize(n) } // AppendNull will append a null to the first child and an empty value // (implementation-defined) to the rest of the children. func (b *SparseUnionBuilder) AppendNull() { firstChildCode := b.codes[0] b.typesBuilder.AppendValue(firstChildCode) b.typeIDtoBuilder[firstChildCode].AppendNull() for _, c := range b.codes[1:] { b.typeIDtoBuilder[c].AppendEmptyValue() } } // AppendNulls is identical to calling AppendNull() n times, except // it will pre-allocate with reserve for all the nulls beforehand. func (b *SparseUnionBuilder) AppendNulls(n int) { firstChildCode := b.codes[0] b.Reserve(n) for _, c := range b.codes { b.typeIDtoBuilder[c].Reserve(n) } for i := 0; i < n; i++ { b.typesBuilder.AppendValue(firstChildCode) b.typeIDtoBuilder[firstChildCode].AppendNull() for _, c := range b.codes[1:] { b.typeIDtoBuilder[c].AppendEmptyValue() } } } // AppendEmptyValue appends an empty value (implementation defined) // to each child, and appends the type of the first typecode to the typeid // buffer. func (b *SparseUnionBuilder) AppendEmptyValue() { b.typesBuilder.AppendValue(b.codes[0]) for _, c := range b.codes { b.typeIDtoBuilder[c].AppendEmptyValue() } } // AppendEmptyValues is identical to calling AppendEmptyValue() n times, // except it pre-allocates first so it is more efficient. func (b *SparseUnionBuilder) AppendEmptyValues(n int) { b.Reserve(n) firstChildCode := b.codes[0] for _, c := range b.codes { b.typeIDtoBuilder[c].Reserve(n) } for i := 0; i < n; i++ { b.typesBuilder.AppendValue(firstChildCode) for _, c := range b.codes { b.typeIDtoBuilder[c].AppendEmptyValue() } } } // Append appends an element to the UnionArray and must be followed up // by an append to the appropriate child builder. The parameter should // be the type id of the child to which the next value will be appended. // // After appending to the corresponding child builder, all other child // builders should have a null or empty value appended to them (although // this is not enforced and any value is theoretically allowed and will be // ignored). func (b *SparseUnionBuilder) Append(nextType arrow.UnionTypeCode) { b.typesBuilder.AppendValue(nextType) } func (b *SparseUnionBuilder) NewArray() arrow.Array { return b.NewSparseUnionArray() } func (b *SparseUnionBuilder) NewSparseUnionArray() (a *SparseUnion) { data := b.newData() a = NewSparseUnionData(data) data.Release() return } func (b *SparseUnionBuilder) UnmarshalJSON(data []byte) (err error) { dec := json.NewDecoder(bytes.NewReader(data)) t, err := dec.Token() if err != nil { return err } if delim, ok := t.(json.Delim); !ok || delim != '[' { return fmt.Errorf("sparse union builder must unpack from json array, found %s", t) } return b.Unmarshal(dec) } func (b *SparseUnionBuilder) Unmarshal(dec *json.Decoder) error { for dec.More() { if err := b.UnmarshalOne(dec); err != nil { return err } } return nil } func (b *SparseUnionBuilder) AppendValueFromString(s string) error { if s == NullValueStr { b.AppendNull() return nil } dec := json.NewDecoder(strings.NewReader(s)) return b.UnmarshalOne(dec) } func (b *SparseUnionBuilder) UnmarshalOne(dec *json.Decoder) error { t, err := dec.Token() if err != nil { return err } switch t { case json.Delim('['): // should be [type_id, Value] typeID, err := dec.Token() if err != nil { return err } var typeCode int8 switch tid := typeID.(type) { case json.Number: id, err := tid.Int64() if err != nil { return err } typeCode = int8(id) case float64: if tid != float64(int64(tid)) { return &json.UnmarshalTypeError{ Offset: dec.InputOffset(), Type: reflect.TypeOf(int8(0)), Struct: fmt.Sprint(b.Type()), Value: "float", } } typeCode = int8(tid) } childNum := b.typeIDtoChildID[typeCode] if childNum == arrow.InvalidUnionChildID { return &json.UnmarshalTypeError{ Offset: dec.InputOffset(), Value: "invalid type code", } } for i, c := range b.children { if i != childNum { c.AppendNull() } } b.Append(typeCode) if err := b.children[childNum].UnmarshalOne(dec); err != nil { return err } endArr, err := dec.Token() if err != nil { return err } if endArr != json.Delim(']') { return &json.UnmarshalTypeError{ Offset: dec.InputOffset(), Value: "union value array should have exactly 2 elements", } } case nil: b.AppendNull() default: return &json.UnmarshalTypeError{ Offset: dec.InputOffset(), Value: fmt.Sprint(t), Struct: fmt.Sprint(b.Type()), } } return nil } // DenseUnionBuilder is used to build a Dense Union array using the Append // methods. You can also add new types to the union on the fly by using // AppendChild. type DenseUnionBuilder struct { *unionBuilder offsetsBuilder *int32BufferBuilder } // NewEmptyDenseUnionBuilder is a helper to construct a DenseUnionBuilder // without having to predefine the union types. It creates a builder with no // children and AppendChild will have to be called before appending any // elements to this builder. func NewEmptyDenseUnionBuilder(mem memory.Allocator) *DenseUnionBuilder { return &DenseUnionBuilder{ unionBuilder: newUnionBuilder(mem, nil, arrow.DenseUnionOf([]arrow.Field{}, []arrow.UnionTypeCode{})), offsetsBuilder: newInt32BufferBuilder(mem), } } // NewDenseUnionBuilder constructs a new DenseUnionBuilder with the provided // children and type codes. Builders will be constructed for each child // using the fields in typ func NewDenseUnionBuilder(mem memory.Allocator, typ *arrow.DenseUnionType) *DenseUnionBuilder { children := make([]Builder, 0, typ.NumFields()) defer func() { for _, child := range children { child.Release() } }() for _, f := range typ.Fields() { children = append(children, NewBuilder(mem, f.Type)) } return NewDenseUnionBuilderWithBuilders(mem, typ, children) } // NewDenseUnionWithBuilders returns a new DenseUnionBuilder using the // provided type and builders. func NewDenseUnionBuilderWithBuilders(mem memory.Allocator, typ *arrow.DenseUnionType, children []Builder) *DenseUnionBuilder { return &DenseUnionBuilder{ unionBuilder: newUnionBuilder(mem, children, typ), offsetsBuilder: newInt32BufferBuilder(mem), } } func (b *DenseUnionBuilder) Reserve(n int) { b.reserve(n, b.Resize) } func (b *DenseUnionBuilder) Resize(n int) { b.typesBuilder.resize(n) b.offsetsBuilder.resize(n * arrow.Int32SizeBytes) } // AppendNull will only append a null value arbitrarily to the first child // and use that offset for this element of the array. func (b *DenseUnionBuilder) AppendNull() { firstChildCode := b.codes[0] childBuilder := b.typeIDtoBuilder[firstChildCode] b.typesBuilder.AppendValue(firstChildCode) b.offsetsBuilder.AppendValue(int32(childBuilder.Len())) childBuilder.AppendNull() } // AppendNulls will only append a single null arbitrarily to the first child // and use the same offset multiple times to point to it. The result is that // for a DenseUnion this is more efficient than calling AppendNull multiple // times in a loop func (b *DenseUnionBuilder) AppendNulls(n int) { // only append 1 null to the child builder, use the same offset twice firstChildCode := b.codes[0] childBuilder := b.typeIDtoBuilder[firstChildCode] b.Reserve(n) for i := 0; i < n; i++ { b.typesBuilder.AppendValue(firstChildCode) b.offsetsBuilder.AppendValue(int32(childBuilder.Len())) } // only append a single null to the child builder, the offsets all refer to the same value childBuilder.AppendNull() } // AppendEmptyValue only appends an empty value arbitrarily to the first child, // and then uses that offset to identify the value. func (b *DenseUnionBuilder) AppendEmptyValue() { firstChildCode := b.codes[0] childBuilder := b.typeIDtoBuilder[firstChildCode] b.typesBuilder.AppendValue(firstChildCode) b.offsetsBuilder.AppendValue(int32(childBuilder.Len())) childBuilder.AppendEmptyValue() } // AppendEmptyValues, like AppendNulls, will only append a single empty value // (implementation defined) to the first child arbitrarily, and then point // at that value using the offsets n times. That makes this more efficient // than calling AppendEmptyValue multiple times. func (b *DenseUnionBuilder) AppendEmptyValues(n int) { // only append 1 null to the child builder, use the same offset twice firstChildCode := b.codes[0] childBuilder := b.typeIDtoBuilder[firstChildCode] b.Reserve(n) for i := 0; i < n; i++ { b.typesBuilder.AppendValue(firstChildCode) b.offsetsBuilder.AppendValue(int32(childBuilder.Len())) } // only append a single empty value to the child builder, the offsets all // refer to the same value childBuilder.AppendEmptyValue() } // Append appends the necessary offset and type code to the builder // and must be followed up with an append to the appropriate child builder func (b *DenseUnionBuilder) Append(nextType arrow.UnionTypeCode) { b.typesBuilder.AppendValue(nextType) bldr := b.typeIDtoBuilder[nextType] if bldr.Len() == kMaxElems { panic("a dense UnionArray cannot contain more than 2^31 - 1 elements from a single child") } b.offsetsBuilder.AppendValue(int32(bldr.Len())) } func (b *DenseUnionBuilder) Release() { debug.Assert(b.refCount.Load() > 0, "too many releases") if b.refCount.Add(-1) == 0 { for _, c := range b.children { c.Release() } b.typesBuilder.Release() b.offsetsBuilder.Release() } } func (b *DenseUnionBuilder) newData() *Data { data := b.unionBuilder.newData() data.buffers = append(data.buffers, b.offsetsBuilder.Finish()) return data } func (b *DenseUnionBuilder) NewArray() arrow.Array { return b.NewDenseUnionArray() } func (b *DenseUnionBuilder) NewDenseUnionArray() (a *DenseUnion) { data := b.newData() a = NewDenseUnionData(data) data.Release() return } func (b *DenseUnionBuilder) UnmarshalJSON(data []byte) (err error) { dec := json.NewDecoder(bytes.NewReader(data)) t, err := dec.Token() if err != nil { return err } if delim, ok := t.(json.Delim); !ok || delim != '[' { return fmt.Errorf("dense union builder must unpack from json array, found %s", t) } return b.Unmarshal(dec) } func (b *DenseUnionBuilder) Unmarshal(dec *json.Decoder) error { for dec.More() { if err := b.UnmarshalOne(dec); err != nil { return err } } return nil } func (d *DenseUnionBuilder) AppendValueFromString(s string) error { if s == NullValueStr { d.AppendNull() return nil } dec := json.NewDecoder(strings.NewReader(s)) return d.UnmarshalOne(dec) } func (b *DenseUnionBuilder) UnmarshalOne(dec *json.Decoder) error { t, err := dec.Token() if err != nil { return err } switch t { case json.Delim('['): // should be [type_id, Value] typeID, err := dec.Token() if err != nil { return err } var typeCode int8 switch tid := typeID.(type) { case json.Number: id, err := tid.Int64() if err != nil { return err } typeCode = int8(id) case float64: if tid != float64(int64(tid)) { return &json.UnmarshalTypeError{ Offset: dec.InputOffset(), Type: reflect.TypeOf(int8(0)), Struct: fmt.Sprint(b.Type()), Value: "float", } } typeCode = int8(tid) } childNum := b.typeIDtoChildID[typeCode] if childNum == arrow.InvalidUnionChildID { return &json.UnmarshalTypeError{ Offset: dec.InputOffset(), Value: "invalid type code", } } b.Append(typeCode) if err := b.children[childNum].UnmarshalOne(dec); err != nil { return err } endArr, err := dec.Token() if err != nil { return err } if endArr != json.Delim(']') { return &json.UnmarshalTypeError{ Offset: dec.InputOffset(), Value: "union value array should have exactly 2 elements", } } case nil: b.AppendNull() default: return &json.UnmarshalTypeError{ Offset: dec.InputOffset(), Value: fmt.Sprint(t), Struct: fmt.Sprint(b.Type()), } } return nil } var ( _ arrow.Array = (*SparseUnion)(nil) _ arrow.Array = (*DenseUnion)(nil) _ Union = (*SparseUnion)(nil) _ Union = (*DenseUnion)(nil) _ Builder = (*SparseUnionBuilder)(nil) _ Builder = (*DenseUnionBuilder)(nil) _ UnionBuilder = (*SparseUnionBuilder)(nil) _ UnionBuilder = (*DenseUnionBuilder)(nil) )