arrow/array/concat.go (707 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package array
import (
"errors"
"fmt"
"math"
"math/bits"
"unsafe"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/bitutil"
"github.com/apache/arrow-go/v18/arrow/encoded"
"github.com/apache/arrow-go/v18/arrow/internal/debug"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/apache/arrow-go/v18/internal/bitutils"
"github.com/apache/arrow-go/v18/internal/utils"
)
// Concatenate creates a new arrow.Array which is the concatenation of the
// passed in arrays. Returns nil if an error is encountered.
//
// The passed in arrays still need to be released manually, and will not be
// released by this function.
func Concatenate(arrs []arrow.Array, mem memory.Allocator) (result arrow.Array, err error) {
if len(arrs) == 0 {
return nil, errors.New("array/concat: must pass at least one array")
}
// gather Data of inputs
data := make([]arrow.ArrayData, len(arrs))
for i, ar := range arrs {
if !arrow.TypeEqual(ar.DataType(), arrs[0].DataType()) {
return nil, fmt.Errorf("arrays to be concatenated must be identically typed, but %s and %s were encountered",
arrs[0].DataType(), ar.DataType())
}
data[i] = ar.Data()
}
out, err := concat(data, mem)
if err != nil {
return nil, err
}
defer out.Release()
return MakeFromData(out), nil
}
// simple struct to hold ranges
type rng struct {
offset, len int
}
// simple bitmap struct to reference a specific slice of a bitmap where the range
// offset and length are in bits
type bitmap struct {
data []byte
rng rng
}
// gather up the bitmaps from the passed in data objects
func gatherBitmaps(data []arrow.ArrayData, idx int) []bitmap {
out := make([]bitmap, len(data))
for i, d := range data {
if d.Buffers()[idx] != nil {
out[i].data = d.Buffers()[idx].Bytes()
}
out[i].rng.offset = d.Offset()
out[i].rng.len = d.Len()
}
return out
}
// gatherFixedBuffers gathers up the buffer objects of the given index, specifically
// returning only the slices of the buffers which are relevant to the passed in arrays
// in case they are themselves slices of other arrays. nil buffers are ignored and not
// in the output slice.
func gatherFixedBuffers(data []arrow.ArrayData, idx, byteWidth int) []*memory.Buffer {
out := make([]*memory.Buffer, 0, len(data))
for _, d := range data {
buf := d.Buffers()[idx]
if buf == nil {
continue
}
out = append(out, memory.NewBufferBytes(buf.Bytes()[d.Offset()*byteWidth:(d.Offset()+d.Len())*byteWidth]))
}
return out
}
// gatherBuffersFixedWidthType is like gatherFixedBuffers, but uses a datatype to determine the size
// to use for determining the byte slice rather than a passed in bytewidth.
func gatherBuffersFixedWidthType(data []arrow.ArrayData, idx int, fixed arrow.FixedWidthDataType) []*memory.Buffer {
return gatherFixedBuffers(data, idx, fixed.BitWidth()/8)
}
// gatherBufferRanges requires that len(ranges) == len(data) and returns a list of buffers
// which represent the corresponding range of each buffer in the specified index of each
// data object.
func gatherBufferRanges(data []arrow.ArrayData, idx int, ranges []rng) []*memory.Buffer {
out := make([]*memory.Buffer, 0, len(data))
for i, d := range data {
buf := d.Buffers()[idx]
if buf == nil {
debug.Assert(ranges[i].len == 0, "misaligned buffer value ranges")
continue
}
out = append(out, memory.NewBufferBytes(buf.Bytes()[ranges[i].offset:ranges[i].offset+ranges[i].len]))
}
return out
}
// gatherChildren gathers the children data objects for child of index idx for all of the data objects.
func gatherChildren(data []arrow.ArrayData, idx int) []arrow.ArrayData {
return gatherChildrenMultiplier(data, idx, 1)
}
// gatherChildrenMultiplier gathers the full data slice of the underlying values from the children data objects
// such as the values data for a list array so that it can return a slice of the buffer for a given
// index into the children.
func gatherChildrenMultiplier(data []arrow.ArrayData, idx, multiplier int) []arrow.ArrayData {
out := make([]arrow.ArrayData, len(data))
for i, d := range data {
out[i] = NewSliceData(d.Children()[idx], int64(d.Offset()*multiplier), int64(d.Offset()+d.Len())*int64(multiplier))
}
return out
}
// gatherChildrenRanges returns a slice of Data objects which each represent slices of the given ranges from the
// child in the specified index from each data object.
func gatherChildrenRanges(data []arrow.ArrayData, idx int, ranges []rng) []arrow.ArrayData {
debug.Assert(len(data) == len(ranges), "mismatched children ranges for concat")
out := make([]arrow.ArrayData, len(data))
for i, d := range data {
out[i] = NewSliceData(d.Children()[idx], int64(ranges[i].offset), int64(ranges[i].offset+ranges[i].len))
}
return out
}
// creates a single contiguous buffer which contains the concatenation of all of the passed
// in buffer objects.
func concatBuffers(bufs []*memory.Buffer, mem memory.Allocator) *memory.Buffer {
outLen := 0
for _, b := range bufs {
outLen += b.Len()
}
out := memory.NewResizableBuffer(mem)
out.Resize(outLen)
data := out.Bytes()
for _, b := range bufs {
copy(data, b.Bytes())
data = data[b.Len():]
}
return out
}
func handle32BitOffsets(outLen int, buffers []*memory.Buffer, out *memory.Buffer) (*memory.Buffer, []rng, error) {
dst := arrow.Int32Traits.CastFromBytes(out.Bytes())
valuesRanges := make([]rng, len(buffers))
nextOffset := int32(0)
nextElem := int(0)
for i, b := range buffers {
if b.Len() == 0 {
valuesRanges[i].offset = 0
valuesRanges[i].len = 0
continue
}
// when we gather our buffers, we sliced off the last offset from the buffer
// so that we could count the lengths accurately
src := arrow.Int32Traits.CastFromBytes(b.Bytes())
valuesRanges[i].offset = int(src[0])
// expand our slice to see that final offset
expand := src[:len(src)+1]
// compute the length of this range by taking the final offset and subtracting where we started.
valuesRanges[i].len = int(expand[len(src)]) - valuesRanges[i].offset
if nextOffset > math.MaxInt32-int32(valuesRanges[i].len) {
return nil, nil, errors.New("offset overflow while concatenating arrays")
}
// adjust each offset by the difference between our last ending point and our starting point
adj := nextOffset - src[0]
for j, o := range src {
dst[nextElem+j] = adj + o
}
// the next index for an element in the output buffer
nextElem += b.Len() / arrow.Int32SizeBytes
// update our offset counter to be the total current length of our output
nextOffset += int32(valuesRanges[i].len)
}
// final offset should point to the end of the data
dst[outLen] = nextOffset
return out, valuesRanges, nil
}
func unifyDictionaries(mem memory.Allocator, data []arrow.ArrayData, dt *arrow.DictionaryType) ([]*memory.Buffer, arrow.Array, error) {
unifier, err := NewDictionaryUnifier(mem, dt.ValueType)
if err != nil {
return nil, nil, err
}
defer unifier.Release()
newLookup := make([]*memory.Buffer, len(data))
for i, d := range data {
dictArr := MakeFromData(d.Dictionary())
defer dictArr.Release()
newLookup[i], err = unifier.UnifyAndTranspose(dictArr)
if err != nil {
return nil, nil, err
}
}
unified, err := unifier.GetResultWithIndexType(dt.IndexType)
if err != nil {
for _, b := range newLookup {
b.Release()
}
return nil, nil, err
}
return newLookup, unified, nil
}
func concatDictIndices(mem memory.Allocator, data []arrow.ArrayData, idxType arrow.FixedWidthDataType, transpositions []*memory.Buffer) (out *memory.Buffer, err error) {
defer func() {
if err != nil && out != nil {
out.Release()
out = nil
}
}()
idxWidth := idxType.BitWidth() / 8
outLen := 0
for i, d := range data {
outLen += d.Len()
defer transpositions[i].Release()
}
out = memory.NewResizableBuffer(mem)
out.Resize(outLen * idxWidth)
outData := out.Bytes()
for i, d := range data {
transposeMap := arrow.Int32Traits.CastFromBytes(transpositions[i].Bytes())
src := d.Buffers()[1].Bytes()
if d.Buffers()[0] == nil {
if err = utils.TransposeIntsBuffers(idxType, idxType, src, outData, d.Offset(), 0, d.Len(), transposeMap); err != nil {
return
}
} else {
rdr := bitutils.NewBitRunReader(d.Buffers()[0].Bytes(), int64(d.Offset()), int64(d.Len()))
pos := 0
for {
run := rdr.NextRun()
if run.Len == 0 {
break
}
if run.Set {
err = utils.TransposeIntsBuffers(idxType, idxType, src, outData, d.Offset()+pos, pos, int(run.Len), transposeMap)
if err != nil {
return
}
} else {
memory.Set(outData[pos:pos+(int(run.Len)*idxWidth)], 0x00)
}
pos += int(run.Len)
}
}
outData = outData[d.Len()*idxWidth:]
}
return
}
func handle64BitOffsets(outLen int, buffers []*memory.Buffer, out *memory.Buffer) (*memory.Buffer, []rng, error) {
dst := arrow.Int64Traits.CastFromBytes(out.Bytes())
valuesRanges := make([]rng, len(buffers))
nextOffset := int64(0)
nextElem := int(0)
for i, b := range buffers {
if b.Len() == 0 {
valuesRanges[i].offset = 0
valuesRanges[i].len = 0
continue
}
// when we gather our buffers, we sliced off the last offset from the buffer
// so that we could count the lengths accurately
src := arrow.Int64Traits.CastFromBytes(b.Bytes())
valuesRanges[i].offset = int(src[0])
// expand our slice to see that final offset
expand := src[:len(src)+1]
// compute the length of this range by taking the final offset and subtracting where we started.
valuesRanges[i].len = int(expand[len(src)]) - valuesRanges[i].offset
if nextOffset > math.MaxInt64-int64(valuesRanges[i].len) {
return nil, nil, errors.New("offset overflow while concatenating arrays")
}
// adjust each offset by the difference between our last ending point and our starting point
adj := nextOffset - src[0]
for j, o := range src {
dst[nextElem+j] = adj + o
}
// the next index for an element in the output buffer
nextElem += b.Len() / arrow.Int64SizeBytes
// update our offset counter to be the total current length of our output
nextOffset += int64(valuesRanges[i].len)
}
// final offset should point to the end of the data
dst[outLen] = nextOffset
return out, valuesRanges, nil
}
// concatOffsets creates a single offset buffer which represents the concatenation of all of the
// offsets buffers, adjusting the offsets appropriately to their new relative locations.
//
// It also returns the list of ranges that need to be fetched for the corresponding value buffers
// to construct the final concatenated value buffer.
func concatOffsets(buffers []*memory.Buffer, byteWidth int, mem memory.Allocator) (*memory.Buffer, []rng, error) {
outLen := 0
for _, b := range buffers {
outLen += b.Len() / byteWidth
}
out := memory.NewResizableBuffer(mem)
out.Resize(byteWidth * (outLen + 1))
switch byteWidth {
case arrow.Int64SizeBytes:
return handle64BitOffsets(outLen, buffers, out)
default:
return handle32BitOffsets(outLen, buffers, out)
}
}
func sumArraySizes(data []arrow.ArrayData) int {
outSize := 0
for _, arr := range data {
outSize += arr.Len()
}
return outSize
}
func getListViewBufferValues[T int32 | int64](data arrow.ArrayData, i int) []T {
bytes := data.Buffers()[i].Bytes()
base := (*T)(unsafe.Pointer(&bytes[0]))
ret := unsafe.Slice(base, data.Offset()+data.Len())
return ret[data.Offset():]
}
func putListViewOffsets32(in arrow.ArrayData, displacement int32, out *memory.Buffer, outOff int) {
debug.Assert(in.DataType().ID() == arrow.LIST_VIEW, "putListViewOffsets32: expected LIST_VIEW data")
inOff, inLen := in.Offset(), in.Len()
if inLen == 0 {
return
}
bitmap := in.Buffers()[0]
srcOffsets := getListViewBufferValues[int32](in, 1)
srcSizes := getListViewBufferValues[int32](in, 2)
isValidAndNonEmpty := func(i int) bool {
return (bitmap == nil || bitutil.BitIsSet(bitmap.Bytes(), inOff+i)) && srcSizes[i] > 0
}
dstOffsets := arrow.Int32Traits.CastFromBytes(out.Bytes())
for i, offset := range srcOffsets {
if isValidAndNonEmpty(i) {
// This is guaranteed by RangeOfValuesUsed returning the smallest offset
// of valid and non-empty list-views.
debug.Assert(offset+displacement >= 0, "putListViewOffsets32: offset underflow while concatenating arrays")
dstOffsets[outOff+i] = offset + displacement
} else {
dstOffsets[outOff+i] = 0
}
}
}
func putListViewOffsets64(in arrow.ArrayData, displacement int64, out *memory.Buffer, outOff int) {
debug.Assert(in.DataType().ID() == arrow.LARGE_LIST_VIEW, "putListViewOffsets64: expected LARGE_LIST_VIEW data")
inOff, inLen := in.Offset(), in.Len()
if inLen == 0 {
return
}
bitmap := in.Buffers()[0]
srcOffsets := getListViewBufferValues[int64](in, 1)
srcSizes := getListViewBufferValues[int64](in, 2)
isValidAndNonEmpty := func(i int) bool {
return (bitmap == nil || bitutil.BitIsSet(bitmap.Bytes(), inOff+i)) && srcSizes[i] > 0
}
dstOffsets := arrow.Int64Traits.CastFromBytes(out.Bytes())
for i, offset := range srcOffsets {
if isValidAndNonEmpty(i) {
// This is guaranteed by RangeOfValuesUsed returning the smallest offset
// of valid and non-empty list-views.
debug.Assert(offset+displacement >= 0, "putListViewOffsets64: offset underflow while concatenating arrays")
dstOffsets[outOff+i] = offset + displacement
} else {
dstOffsets[outOff+i] = 0
}
}
}
// Concatenate buffers holding list-view offsets into a single buffer of offsets
//
// valueRanges contains the relevant ranges of values in the child array actually
// referenced to by the views. Most commonly, these ranges will start from 0,
// but when that is not the case, we need to adjust the displacement of offsets.
// The concatenated child array does not contain values from the beginning
// if they are not referenced to by any view.
func concatListViewOffsets(data []arrow.ArrayData, byteWidth int, valueRanges []rng, mem memory.Allocator) (*memory.Buffer, error) {
outSize := sumArraySizes(data)
if byteWidth == 4 && outSize > math.MaxInt32 {
return nil, fmt.Errorf("%w: offset overflow while concatenating arrays", arrow.ErrInvalid)
}
out := memory.NewResizableBuffer(mem)
out.Resize(byteWidth * outSize)
numChildValues, elementsLength := 0, 0
for i, arr := range data {
displacement := numChildValues - valueRanges[i].offset
if byteWidth == 4 {
putListViewOffsets32(arr, int32(displacement), out, elementsLength)
} else {
putListViewOffsets64(arr, int64(displacement), out, elementsLength)
}
elementsLength += arr.Len()
numChildValues += valueRanges[i].len
}
debug.Assert(elementsLength == outSize, "implementation error")
return out, nil
}
func zeroNullListViewSizes[T int32 | int64](data arrow.ArrayData) {
if data.Len() == 0 || data.Buffers()[0] == nil {
return
}
validity := data.Buffers()[0].Bytes()
sizes := getListViewBufferValues[T](data, 2)
for i := 0; i < data.Len(); i++ {
if !bitutil.BitIsSet(validity, data.Offset()+i) {
sizes[i] = 0
}
}
}
func concatListView(data []arrow.ArrayData, offsetType arrow.FixedWidthDataType, out *Data, mem memory.Allocator) (err error) {
// Calculate the ranges of values that each list-view array uses
valueRanges := make([]rng, len(data))
for i, input := range data {
offset, len := rangeOfValuesUsed(input)
valueRanges[i].offset = offset
valueRanges[i].len = len
}
// Gather the children ranges of each input array
childData := gatherChildrenRanges(data, 0, valueRanges)
for _, c := range childData {
defer c.Release()
}
// Concatenate the values
values, err := concat(childData, mem)
if err != nil {
return err
}
// Concatenate the offsets
offsetBuffer, err := concatListViewOffsets(data, offsetType.Bytes(), valueRanges, mem)
if err != nil {
return err
}
// Concatenate the sizes
sizeBuffers := gatherBuffersFixedWidthType(data, 2, offsetType)
sizeBuffer := concatBuffers(sizeBuffers, mem)
out.childData = []arrow.ArrayData{values}
out.buffers[1] = offsetBuffer
out.buffers[2] = sizeBuffer
// To make sure the sizes don't reference values that are not in the new
// concatenated values array, we zero the sizes of null list-view values.
if offsetType.ID() == arrow.INT32 {
zeroNullListViewSizes[int32](out)
} else {
zeroNullListViewSizes[int64](out)
}
return nil
}
// concat is the implementation for actually performing the concatenation of the arrow.ArrayData
// objects that we can call internally for nested types.
func concat(data []arrow.ArrayData, mem memory.Allocator) (arr arrow.ArrayData, err error) {
out := &Data{dtype: data[0].DataType(), nulls: 0}
out.refCount.Add(1)
defer func() {
if pErr := recover(); pErr != nil {
err = utils.FormatRecoveredError("arrow/concat", pErr)
}
if err != nil {
out.Release()
}
}()
for _, d := range data {
out.length += d.Len()
if out.nulls == UnknownNullCount || d.NullN() == UnknownNullCount {
out.nulls = UnknownNullCount
continue
}
out.nulls += d.NullN()
}
out.buffers = make([]*memory.Buffer, len(data[0].Buffers()))
if out.nulls != 0 && out.dtype.ID() != arrow.NULL {
bm, err := concatBitmaps(gatherBitmaps(data, 0), mem)
if err != nil {
return nil, err
}
out.buffers[0] = bm
}
dt := out.dtype
if dt.ID() == arrow.EXTENSION {
dt = dt.(arrow.ExtensionType).StorageType()
}
switch dt := dt.(type) {
case *arrow.NullType:
case *arrow.BooleanType:
bm, err := concatBitmaps(gatherBitmaps(data, 1), mem)
if err != nil {
return nil, err
}
out.buffers[1] = bm
case *arrow.DictionaryType:
idxType := dt.IndexType.(arrow.FixedWidthDataType)
// two cases: all dictionaries are the same or we need to unify them
dictsSame := true
dict0 := MakeFromData(data[0].Dictionary())
defer dict0.Release()
for _, d := range data {
dict := MakeFromData(d.Dictionary())
if !Equal(dict0, dict) {
dict.Release()
dictsSame = false
break
}
dict.Release()
}
indexBuffers := gatherBuffersFixedWidthType(data, 1, idxType)
if dictsSame {
out.dictionary = dict0.Data().(*Data)
out.dictionary.Retain()
out.buffers[1] = concatBuffers(indexBuffers, mem)
break
}
indexLookup, unifiedDict, err := unifyDictionaries(mem, data, dt)
if err != nil {
return nil, err
}
defer unifiedDict.Release()
out.dictionary = unifiedDict.Data().(*Data)
out.dictionary.Retain()
out.buffers[1], err = concatDictIndices(mem, data, idxType, indexLookup)
if err != nil {
return nil, err
}
case arrow.FixedWidthDataType:
out.buffers[1] = concatBuffers(gatherBuffersFixedWidthType(data, 1, dt), mem)
case arrow.BinaryViewDataType:
out.buffers = out.buffers[:2]
for _, d := range data {
for _, buf := range d.Buffers()[2:] {
buf.Retain()
out.buffers = append(out.buffers, buf)
}
}
out.buffers[1] = concatBuffers(gatherFixedBuffers(data, 1, arrow.ViewHeaderSizeBytes), mem)
var (
s = arrow.ViewHeaderTraits.CastFromBytes(out.buffers[1].Bytes())
i = data[0].Len()
precedingBufsCount int
)
for idx := 1; idx < len(data); idx++ {
precedingBufsCount += len(data[idx-1].Buffers()) - 2
for end := i + data[idx].Len(); i < end; i++ {
if s[i].IsInline() {
continue
}
bufIndex := s[i].BufferIndex() + int32(precedingBufsCount)
s[i].SetIndexOffset(bufIndex, s[i].BufferOffset())
}
}
case arrow.BinaryDataType:
offsetWidth := dt.Layout().Buffers[1].ByteWidth
offsetBuffer, valueRanges, err := concatOffsets(gatherFixedBuffers(data, 1, offsetWidth), offsetWidth, mem)
if err != nil {
return nil, err
}
out.buffers[1] = offsetBuffer
out.buffers[2] = concatBuffers(gatherBufferRanges(data, 2, valueRanges), mem)
case *arrow.ListType:
offsetWidth := dt.Layout().Buffers[1].ByteWidth
offsetBuffer, valueRanges, err := concatOffsets(gatherFixedBuffers(data, 1, offsetWidth), offsetWidth, mem)
if err != nil {
return nil, err
}
childData := gatherChildrenRanges(data, 0, valueRanges)
for _, c := range childData {
defer c.Release()
}
out.buffers[1] = offsetBuffer
out.childData = make([]arrow.ArrayData, 1)
out.childData[0], err = concat(childData, mem)
if err != nil {
return nil, err
}
case *arrow.LargeListType:
offsetWidth := dt.Layout().Buffers[1].ByteWidth
offsetBuffer, valueRanges, err := concatOffsets(gatherFixedBuffers(data, 1, offsetWidth), offsetWidth, mem)
if err != nil {
return nil, err
}
childData := gatherChildrenRanges(data, 0, valueRanges)
for _, c := range childData {
defer c.Release()
}
out.buffers[1] = offsetBuffer
out.childData = make([]arrow.ArrayData, 1)
out.childData[0], err = concat(childData, mem)
if err != nil {
return nil, err
}
case *arrow.ListViewType:
offsetType := arrow.PrimitiveTypes.Int32.(arrow.FixedWidthDataType)
err := concatListView(data, offsetType, out, mem)
if err != nil {
return nil, err
}
case *arrow.LargeListViewType:
offsetType := arrow.PrimitiveTypes.Int64.(arrow.FixedWidthDataType)
err := concatListView(data, offsetType, out, mem)
if err != nil {
return nil, err
}
case *arrow.FixedSizeListType:
childData := gatherChildrenMultiplier(data, 0, int(dt.Len()))
for _, c := range childData {
defer c.Release()
}
children, err := concat(childData, mem)
if err != nil {
return nil, err
}
out.childData = []arrow.ArrayData{children}
case *arrow.StructType:
out.childData = make([]arrow.ArrayData, dt.NumFields())
for i := range dt.Fields() {
children := gatherChildren(data, i)
for _, c := range children {
defer c.Release()
}
childData, err := concat(children, mem)
if err != nil {
return nil, err
}
out.childData[i] = childData
}
case *arrow.MapType:
offsetWidth := dt.Layout().Buffers[1].ByteWidth
offsetBuffer, valueRanges, err := concatOffsets(gatherFixedBuffers(data, 1, offsetWidth), offsetWidth, mem)
if err != nil {
return nil, err
}
childData := gatherChildrenRanges(data, 0, valueRanges)
for _, c := range childData {
defer c.Release()
}
out.buffers[1] = offsetBuffer
out.childData = make([]arrow.ArrayData, 1)
out.childData[0], err = concat(childData, mem)
if err != nil {
return nil, err
}
case *arrow.RunEndEncodedType:
physicalLength, overflow := int(0), false
// we can't use gatherChildren because the Offset and Len of
// data doesn't correspond to the physical length or offset
runs := make([]arrow.ArrayData, len(data))
values := make([]arrow.ArrayData, len(data))
for i, d := range data {
plen := encoded.GetPhysicalLength(d)
off := encoded.FindPhysicalOffset(d)
runs[i] = NewSliceData(d.Children()[0], int64(off), int64(off+plen))
defer runs[i].Release()
values[i] = NewSliceData(d.Children()[1], int64(off), int64(off+plen))
defer values[i].Release()
physicalLength, overflow = addOvf(physicalLength, plen)
if overflow {
return nil, fmt.Errorf("%w: run end encoded array length must fit into a 32-bit signed integer",
arrow.ErrInvalid)
}
}
runEndsByteWidth := runs[0].DataType().(arrow.FixedWidthDataType).Bytes()
runEndsBuffers := gatherFixedBuffers(runs, 1, runEndsByteWidth)
outRunEndsLen := physicalLength * runEndsByteWidth
outRunEndsBuf := memory.NewResizableBuffer(mem)
outRunEndsBuf.Resize(outRunEndsLen)
defer outRunEndsBuf.Release()
if err := updateRunEnds(runEndsByteWidth, data, runEndsBuffers, outRunEndsBuf); err != nil {
return nil, err
}
out.childData = make([]arrow.ArrayData, 2)
out.childData[0] = NewData(data[0].Children()[0].DataType(), int(physicalLength),
[]*memory.Buffer{nil, outRunEndsBuf}, nil, 0, 0)
var err error
out.childData[1], err = concat(values, mem)
if err != nil {
out.childData[0].Release()
return nil, err
}
default:
return nil, fmt.Errorf("concatenate not implemented for type %s", dt)
}
return out, nil
}
// check overflow in the addition, taken from bits.Add but adapted for signed integers
// rather than unsigned integers. bits.UintSize will be either 32 or 64 based on
// whether our architecture is 32 bit or 64. The operation is the same for both cases,
// the only difference is how much we need to shift by 30 for 32 bit and 62 for 64 bit.
// Thus, bits.UintSize - 2 is how much we shift right by to check if we had an overflow
// in the signed addition.
//
// First return is the result of the sum, the second return is true if there was an overflow
func addOvf(x, y int) (int, bool) {
sum := x + y
return sum, ((x&y)|((x|y)&^sum))>>(bits.UintSize-2) == 1
}
// concatenate bitmaps together and return a buffer with the combined bitmaps
func concatBitmaps(bitmaps []bitmap, mem memory.Allocator) (*memory.Buffer, error) {
var (
outlen int
overflow bool
)
for _, bm := range bitmaps {
if outlen, overflow = addOvf(outlen, bm.rng.len); overflow {
return nil, errors.New("length overflow when concatenating arrays")
}
}
out := memory.NewResizableBuffer(mem)
out.Resize(int(bitutil.BytesForBits(int64(outlen))))
dst := out.Bytes()
offset := 0
for _, bm := range bitmaps {
if bm.data == nil { // if the bitmap is nil, that implies that the value is true for all elements
bitutil.SetBitsTo(out.Bytes(), int64(offset), int64(bm.rng.len), true)
} else {
bitutil.CopyBitmap(bm.data, bm.rng.offset, bm.rng.len, dst, offset)
}
offset += bm.rng.len
}
return out, nil
}
func updateRunEnds(byteWidth int, inputData []arrow.ArrayData, inputBuffers []*memory.Buffer, outputBuffer *memory.Buffer) error {
switch byteWidth {
case 2:
out := arrow.Int16Traits.CastFromBytes(outputBuffer.Bytes())
return updateRunsInt16(inputData, inputBuffers, out)
case 4:
out := arrow.Int32Traits.CastFromBytes(outputBuffer.Bytes())
return updateRunsInt32(inputData, inputBuffers, out)
case 8:
out := arrow.Int64Traits.CastFromBytes(outputBuffer.Bytes())
return updateRunsInt64(inputData, inputBuffers, out)
}
return fmt.Errorf("%w: invalid dataType for RLE runEnds", arrow.ErrInvalid)
}
func updateRunsInt16(inputData []arrow.ArrayData, inputBuffers []*memory.Buffer, output []int16) error {
// for now we will not attempt to optimize by checking if we
// can fold the end and beginning of each array we're concatenating
// into a single run
pos := 0
for i, buf := range inputBuffers {
if buf.Len() == 0 {
continue
}
src := arrow.Int16Traits.CastFromBytes(buf.Bytes())
if pos == 0 {
pos += copy(output, src)
continue
}
lastEnd := output[pos-1]
// we can check the last runEnd in the src and add it to the
// last value that we're adjusting them all by to see if we
// are going to overflow
if int64(lastEnd)+int64(int(src[len(src)-1])-inputData[i].Offset()) > math.MaxInt16 {
return fmt.Errorf("%w: overflow in run-length-encoded run ends concat", arrow.ErrInvalid)
}
// adjust all of the run ends by first normalizing them (e - data[i].offset)
// then adding the previous value we ended on. Since the offset
// is a logical length offset it should be accurate to just subtract
// it from each value.
for j, e := range src {
output[pos+j] = lastEnd + int16(int(e)-inputData[i].Offset())
}
pos += len(src)
}
return nil
}
func updateRunsInt32(inputData []arrow.ArrayData, inputBuffers []*memory.Buffer, output []int32) error {
// for now we will not attempt to optimize by checking if we
// can fold the end and beginning of each array we're concatenating
// into a single run
pos := 0
for i, buf := range inputBuffers {
if buf.Len() == 0 {
continue
}
src := arrow.Int32Traits.CastFromBytes(buf.Bytes())
if pos == 0 {
pos += copy(output, src)
continue
}
lastEnd := output[pos-1]
// we can check the last runEnd in the src and add it to the
// last value that we're adjusting them all by to see if we
// are going to overflow
if int64(lastEnd)+int64(int(src[len(src)-1])-inputData[i].Offset()) > math.MaxInt32 {
return fmt.Errorf("%w: overflow in run-length-encoded run ends concat", arrow.ErrInvalid)
}
// adjust all of the run ends by first normalizing them (e - data[i].offset)
// then adding the previous value we ended on. Since the offset
// is a logical length offset it should be accurate to just subtract
// it from each value.
for j, e := range src {
output[pos+j] = lastEnd + int32(int(e)-inputData[i].Offset())
}
pos += len(src)
}
return nil
}
func updateRunsInt64(inputData []arrow.ArrayData, inputBuffers []*memory.Buffer, output []int64) error {
// for now we will not attempt to optimize by checking if we
// can fold the end and beginning of each array we're concatenating
// into a single run
pos := 0
for i, buf := range inputBuffers {
if buf.Len() == 0 {
continue
}
src := arrow.Int64Traits.CastFromBytes(buf.Bytes())
if pos == 0 {
pos += copy(output, src)
continue
}
lastEnd := output[pos-1]
// we can check the last runEnd in the src and add it to the
// last value that we're adjusting them all by to see if we
// are going to overflow
if uint64(lastEnd)+uint64(int(src[len(src)-1])-inputData[i].Offset()) > math.MaxInt64 {
return fmt.Errorf("%w: overflow in run-length-encoded run ends concat", arrow.ErrInvalid)
}
// adjust all of the run ends by first normalizing them (e - data[i].offset)
// then adding the previous value we ended on. Since the offset
// is a logical length offset it should be accurate to just subtract
// it from each value.
for j, e := range src {
output[pos+j] = lastEnd + e - int64(inputData[i].Offset())
}
pos += len(src)
}
return nil
}