arrow/array/binarybuilder.go (543 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package array import ( "bytes" "encoding/base64" "fmt" "math" "reflect" "unsafe" "github.com/apache/arrow-go/v18/arrow" "github.com/apache/arrow-go/v18/arrow/internal/debug" "github.com/apache/arrow-go/v18/arrow/memory" "github.com/apache/arrow-go/v18/internal/json" ) // A BinaryBuilder is used to build a Binary array using the Append methods. type BinaryBuilder struct { builder dtype arrow.BinaryDataType offsets bufBuilder values *byteBufferBuilder appendOffsetVal func(int) getOffsetVal func(int) int maxCapacity uint64 offsetByteWidth int } // NewBinaryBuilder can be used for any of the variable length binary types, // Binary, LargeBinary, String, LargeString by passing the appropriate data type func NewBinaryBuilder(mem memory.Allocator, dtype arrow.BinaryDataType) *BinaryBuilder { var ( offsets bufBuilder offsetValFn func(int) maxCapacity uint64 offsetByteWidth int getOffsetVal func(int) int ) switch dtype.Layout().Buffers[1].ByteWidth { case 4: b := newInt32BufferBuilder(mem) offsetValFn = func(v int) { b.AppendValue(int32(v)) } getOffsetVal = func(i int) int { return int(b.Value(i)) } offsets = b maxCapacity = math.MaxInt32 offsetByteWidth = arrow.Int32SizeBytes case 8: b := newInt64BufferBuilder(mem) offsetValFn = func(v int) { b.AppendValue(int64(v)) } getOffsetVal = func(i int) int { return int(b.Value(i)) } offsets = b maxCapacity = math.MaxInt64 offsetByteWidth = arrow.Int64SizeBytes } bb := &BinaryBuilder{ builder: builder{mem: mem}, dtype: dtype, offsets: offsets, values: newByteBufferBuilder(mem), appendOffsetVal: offsetValFn, maxCapacity: maxCapacity, offsetByteWidth: offsetByteWidth, getOffsetVal: getOffsetVal, } bb.builder.refCount.Add(1) return bb } func (b *BinaryBuilder) Type() arrow.DataType { return b.dtype } // Release decreases the reference count by 1. // When the reference count goes to zero, the memory is freed. // Release may be called simultaneously from multiple goroutines. func (b *BinaryBuilder) Release() { debug.Assert(b.refCount.Load() > 0, "too many releases") if b.refCount.Add(-1) == 0 { if b.nullBitmap != nil { b.nullBitmap.Release() b.nullBitmap = nil } if b.offsets != nil { b.offsets.Release() b.offsets = nil } if b.values != nil { b.values.Release() b.values = nil } } } func (b *BinaryBuilder) Append(v []byte) { b.Reserve(1) b.appendNextOffset() b.values.Append(v) b.UnsafeAppendBoolToBitmap(true) } func (b *BinaryBuilder) AppendString(v string) { b.Append([]byte(v)) } func (b *BinaryBuilder) AppendNull() { b.Reserve(1) b.appendNextOffset() b.UnsafeAppendBoolToBitmap(false) } func (b *BinaryBuilder) AppendNulls(n int) { for i := 0; i < n; i++ { b.AppendNull() } } func (b *BinaryBuilder) AppendEmptyValue() { b.Reserve(1) b.appendNextOffset() b.UnsafeAppendBoolToBitmap(true) } func (b *BinaryBuilder) AppendEmptyValues(n int) { for i := 0; i < n; i++ { b.AppendEmptyValue() } } // AppendValues will append the values in the v slice. The valid slice determines which values // in v are valid (not null). The valid slice must either be empty or be equal in length to v. If empty, // all values in v are appended and considered valid. func (b *BinaryBuilder) AppendValues(v [][]byte, valid []bool) { if len(v) != len(valid) && len(valid) != 0 { panic("len(v) != len(valid) && len(valid) != 0") } if len(v) == 0 { return } b.Reserve(len(v)) for _, vv := range v { b.appendNextOffset() b.values.Append(vv) } b.builder.unsafeAppendBoolsToBitmap(valid, len(v)) } // AppendStringValues will append the values in the v slice. The valid slice determines which values // in v are valid (not null). The valid slice must either be empty or be equal in length to v. If empty, // all values in v are appended and considered valid. func (b *BinaryBuilder) AppendStringValues(v []string, valid []bool) { if len(v) != len(valid) && len(valid) != 0 { panic("len(v) != len(valid) && len(valid) != 0") } if len(v) == 0 { return } b.Reserve(len(v)) for _, vv := range v { b.appendNextOffset() b.values.Append([]byte(vv)) } b.builder.unsafeAppendBoolsToBitmap(valid, len(v)) } func (b *BinaryBuilder) UnsafeAppend(v []byte) { b.appendNextOffset() b.values.unsafeAppend(v) b.UnsafeAppendBoolToBitmap(true) } func (b *BinaryBuilder) Value(i int) []byte { start := b.getOffsetVal(i) var end int if i == (b.length - 1) { end = b.values.Len() } else { end = b.getOffsetVal(i + 1) } return b.values.Bytes()[start:end] } func (b *BinaryBuilder) init(capacity int) { b.builder.init(capacity) b.offsets.resize((capacity + 1) * b.offsetByteWidth) } // DataLen returns the number of bytes in the data array. func (b *BinaryBuilder) DataLen() int { return b.values.length } // DataCap returns the total number of bytes that can be stored // without allocating additional memory. func (b *BinaryBuilder) DataCap() int { return b.values.capacity } // Reserve ensures there is enough space for appending n elements // by checking the capacity and calling Resize if necessary. func (b *BinaryBuilder) Reserve(n int) { b.builder.reserve(n, b.Resize) } // ReserveData ensures there is enough space for appending n bytes // by checking the capacity and resizing the data buffer if necessary. func (b *BinaryBuilder) ReserveData(n int) { if b.values.capacity < b.values.length+n { b.values.resize(b.values.Len() + n) } } // Resize adjusts the space allocated by b to n elements. If n is greater than b.Cap(), // additional memory will be allocated. If n is smaller, the allocated memory may be reduced. func (b *BinaryBuilder) Resize(n int) { b.offsets.resize((n + 1) * b.offsetByteWidth) if (n * b.offsetByteWidth) < b.offsets.Len() { b.offsets.SetLength(n * b.offsetByteWidth) } b.builder.resize(n, b.init) } func (b *BinaryBuilder) ResizeData(n int) { b.values.length = n } // NewArray creates a Binary array from the memory buffers used by the builder and resets the BinaryBuilder // so it can be used to build a new array. // // Builds the appropriate Binary or LargeBinary array based on the datatype // it was initialized with. func (b *BinaryBuilder) NewArray() arrow.Array { if b.offsetByteWidth == arrow.Int32SizeBytes { return b.NewBinaryArray() } return b.NewLargeBinaryArray() } // NewBinaryArray creates a Binary array from the memory buffers used by the builder and resets the BinaryBuilder // so it can be used to build a new array. func (b *BinaryBuilder) NewBinaryArray() (a *Binary) { if b.offsetByteWidth != arrow.Int32SizeBytes { panic("arrow/array: invalid call to NewBinaryArray when building a LargeBinary array") } data := b.newData() a = NewBinaryData(data) data.Release() return } func (b *BinaryBuilder) NewLargeBinaryArray() (a *LargeBinary) { if b.offsetByteWidth != arrow.Int64SizeBytes { panic("arrow/array: invalid call to NewLargeBinaryArray when building a Binary array") } data := b.newData() a = NewLargeBinaryData(data) data.Release() return } func (b *BinaryBuilder) newData() (data *Data) { b.appendNextOffset() offsets, values := b.offsets.Finish(), b.values.Finish() data = NewData(b.dtype, b.length, []*memory.Buffer{b.nullBitmap, offsets, values}, nil, b.nulls, 0) if offsets != nil { offsets.Release() } if values != nil { values.Release() } b.builder.reset() return } func (b *BinaryBuilder) appendNextOffset() { numBytes := b.values.Len() debug.Assert(uint64(numBytes) <= b.maxCapacity, "exceeded maximum capacity of binary array") b.appendOffsetVal(numBytes) } func (b *BinaryBuilder) AppendValueFromString(s string) error { if s == NullValueStr { b.AppendNull() return nil } if b.dtype.IsUtf8() { b.Append([]byte(s)) return nil } decodedVal, err := base64.StdEncoding.DecodeString(s) if err != nil { return fmt.Errorf("could not decode base64 string: %w", err) } b.Append(decodedVal) return nil } func (b *BinaryBuilder) UnmarshalOne(dec *json.Decoder) error { t, err := dec.Token() if err != nil { return err } switch v := t.(type) { case string: data, err := base64.StdEncoding.DecodeString(v) if err != nil { return err } b.Append(data) case []byte: b.Append(v) case nil: b.AppendNull() default: return &json.UnmarshalTypeError{ Value: fmt.Sprint(t), Type: reflect.TypeOf([]byte{}), Offset: dec.InputOffset(), } } return nil } func (b *BinaryBuilder) Unmarshal(dec *json.Decoder) error { for dec.More() { if err := b.UnmarshalOne(dec); err != nil { return err } } return nil } func (b *BinaryBuilder) UnmarshalJSON(data []byte) error { dec := json.NewDecoder(bytes.NewReader(data)) t, err := dec.Token() if err != nil { return err } if delim, ok := t.(json.Delim); !ok || delim != '[' { return fmt.Errorf("binary builder must unpack from json array, found %s", delim) } return b.Unmarshal(dec) } const ( dfltBlockSize = 32 << 10 // 32 KB viewValueSizeLimit int32 = math.MaxInt32 ) type BinaryViewBuilder struct { builder dtype arrow.BinaryDataType data *memory.Buffer rawData []arrow.ViewHeader blockBuilder multiBufferBuilder } func NewBinaryViewBuilder(mem memory.Allocator) *BinaryViewBuilder { bvb := &BinaryViewBuilder{ dtype: arrow.BinaryTypes.BinaryView, builder: builder{ mem: mem, }, blockBuilder: multiBufferBuilder{ blockSize: dfltBlockSize, mem: mem, }, } bvb.builder.refCount.Add(1) bvb.blockBuilder.refCount.Add(1) return bvb } func (b *BinaryViewBuilder) SetBlockSize(sz uint) { b.blockBuilder.blockSize = int(sz) } func (b *BinaryViewBuilder) Type() arrow.DataType { return b.dtype } func (b *BinaryViewBuilder) Release() { debug.Assert(b.refCount.Load() > 0, "too many releases") if b.refCount.Add(-1) != 0 { return } if b.nullBitmap != nil { b.nullBitmap.Release() b.nullBitmap = nil } if b.data != nil { b.data.Release() b.data = nil b.rawData = nil } } func (b *BinaryViewBuilder) init(capacity int) { b.builder.init(capacity) b.data = memory.NewResizableBuffer(b.mem) bytesN := arrow.ViewHeaderTraits.BytesRequired(capacity) b.data.Resize(bytesN) b.rawData = arrow.ViewHeaderTraits.CastFromBytes(b.data.Bytes()) } func (b *BinaryViewBuilder) Resize(n int) { nbuild := n if n < minBuilderCapacity { n = minBuilderCapacity } if b.capacity == 0 { b.init(n) return } b.builder.resize(nbuild, b.init) b.data.Resize(arrow.ViewHeaderTraits.BytesRequired(n)) b.rawData = arrow.ViewHeaderTraits.CastFromBytes(b.data.Bytes()) } func (b *BinaryViewBuilder) ReserveData(length int) { if int32(length) > viewValueSizeLimit { panic(fmt.Errorf("%w: BinaryView or StringView elements cannot reference strings larger than 2GB", arrow.ErrInvalid)) } b.blockBuilder.Reserve(int(length)) } func (b *BinaryViewBuilder) Reserve(n int) { b.builder.reserve(n, b.Resize) } func (b *BinaryViewBuilder) Append(v []byte) { if int32(len(v)) > viewValueSizeLimit { panic(fmt.Errorf("%w: BinaryView or StringView elements cannot reference strings larger than 2GB", arrow.ErrInvalid)) } if !arrow.IsViewInline(len(v)) { b.ReserveData(len(v)) } b.Reserve(1) b.UnsafeAppend(v) } // AppendString is identical to Append, only accepting a string instead // of a byte slice, avoiding the extra copy that would occur if you simply // did []byte(v). // // This is different than AppendValueFromString which exists for the // Builder interface, in that this expects raw binary data which is // appended unmodified. AppendValueFromString expects base64 encoded binary // data instead. func (b *BinaryViewBuilder) AppendString(v string) { // create a []byte without copying the bytes // in go1.20 this would be unsafe.StringData val := *(*[]byte)(unsafe.Pointer(&struct { string int }{v, len(v)})) b.Append(val) } func (b *BinaryViewBuilder) AppendNull() { b.Reserve(1) b.UnsafeAppendBoolToBitmap(false) } func (b *BinaryViewBuilder) AppendNulls(n int) { b.Reserve(n) for i := 0; i < n; i++ { b.UnsafeAppendBoolToBitmap(false) } } func (b *BinaryViewBuilder) AppendEmptyValue() { b.Reserve(1) b.UnsafeAppendBoolToBitmap(true) } func (b *BinaryViewBuilder) AppendEmptyValues(n int) { b.Reserve(n) b.unsafeAppendBoolsToBitmap(nil, n) } func (b *BinaryViewBuilder) UnsafeAppend(v []byte) { hdr := &b.rawData[b.length] hdr.SetBytes(v) if !hdr.IsInline() { b.blockBuilder.UnsafeAppend(hdr, v) } b.UnsafeAppendBoolToBitmap(true) } func (b *BinaryViewBuilder) AppendValues(v [][]byte, valid []bool) { if len(v) != len(valid) && len(valid) != 0 { panic("len(v) != len(valid) && len(valid) != 0") } if len(v) == 0 { return } b.Reserve(len(v)) outOfLineTotal := 0 for i, vv := range v { if len(valid) == 0 || valid[i] { if !arrow.IsViewInline(len(vv)) { outOfLineTotal += len(vv) } } } b.ReserveData(outOfLineTotal) for i, vv := range v { if len(valid) == 0 || valid[i] { hdr := &b.rawData[b.length+i] hdr.SetBytes(vv) if !hdr.IsInline() { b.blockBuilder.UnsafeAppend(hdr, vv) } } } b.builder.unsafeAppendBoolsToBitmap(valid, len(v)) } func (b *BinaryViewBuilder) AppendStringValues(v []string, valid []bool) { if len(v) != len(valid) && len(valid) != 0 { panic("len(v) != len(valid) && len(valid) != 0") } if len(v) == 0 { return } b.Reserve(len(v)) outOfLineTotal := 0 for i, vv := range v { if len(valid) == 0 || valid[i] { if !arrow.IsViewInline(len(vv)) { outOfLineTotal += len(vv) } } } b.ReserveData(outOfLineTotal) for i, vv := range v { if len(valid) == 0 || valid[i] { hdr := &b.rawData[b.length+i] hdr.SetString(vv) if !hdr.IsInline() { b.blockBuilder.UnsafeAppendString(hdr, vv) } } } b.builder.unsafeAppendBoolsToBitmap(valid, len(v)) } // AppendValueFromString is paired with ValueStr for fulfilling the // base Builder interface. This is intended to read in a human-readable // string such as from CSV or JSON and append it to the array. // // For Binary values are expected to be base64 encoded (and will be // decoded as such before being appended). func (b *BinaryViewBuilder) AppendValueFromString(s string) error { if s == NullValueStr { b.AppendNull() return nil } if b.dtype.IsUtf8() { b.Append([]byte(s)) return nil } decodedVal, err := base64.StdEncoding.DecodeString(s) if err != nil { return fmt.Errorf("could not decode base64 string: %w", err) } b.Append(decodedVal) return nil } func (b *BinaryViewBuilder) UnmarshalOne(dec *json.Decoder) error { t, err := dec.Token() if err != nil { return err } switch v := t.(type) { case string: data, err := base64.StdEncoding.DecodeString(v) if err != nil { return err } b.Append(data) case []byte: b.Append(v) case nil: b.AppendNull() default: return &json.UnmarshalTypeError{ Value: fmt.Sprint(t), Type: reflect.TypeOf([]byte{}), Offset: dec.InputOffset(), } } return nil } func (b *BinaryViewBuilder) Unmarshal(dec *json.Decoder) error { for dec.More() { if err := b.UnmarshalOne(dec); err != nil { return err } } return nil } func (b *BinaryViewBuilder) UnmarshalJSON(data []byte) error { dec := json.NewDecoder(bytes.NewReader(data)) t, err := dec.Token() if err != nil { return err } if delim, ok := t.(json.Delim); !ok || delim != '[' { return fmt.Errorf("binary view builder must unpack from json array, found %s", delim) } return b.Unmarshal(dec) } func (b *BinaryViewBuilder) newData() (data *Data) { bytesRequired := arrow.ViewHeaderTraits.BytesRequired(b.length) if bytesRequired > 0 && bytesRequired < b.data.Len() { // trim buffers b.data.Resize(bytesRequired) } dataBuffers := b.blockBuilder.Finish() data = NewData(b.dtype, b.length, append([]*memory.Buffer{ b.nullBitmap, b.data, }, dataBuffers...), nil, b.nulls, 0) b.reset() if b.data != nil { b.data.Release() b.data = nil b.rawData = nil for _, buf := range dataBuffers { buf.Release() } } return } func (b *BinaryViewBuilder) NewBinaryViewArray() (a *BinaryView) { data := b.newData() a = NewBinaryViewData(data) data.Release() return } func (b *BinaryViewBuilder) NewArray() arrow.Array { return b.NewBinaryViewArray() } var ( _ Builder = (*BinaryBuilder)(nil) _ Builder = (*BinaryViewBuilder)(nil) )