arrow/array/concat.go (260 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package array
import (
"math"
"math/bits"
"github.com/aliyun/aliyun-odps-go-sdk/arrow"
"github.com/aliyun/aliyun-odps-go-sdk/arrow/bitutil"
"github.com/aliyun/aliyun-odps-go-sdk/arrow/internal/debug"
"github.com/aliyun/aliyun-odps-go-sdk/arrow/memory"
"golang.org/x/xerrors"
)
// Concatenate creates a new array.Interface which is the concatenation of the
// passed in arrays. Returns nil if an error is encountered.
//
// The passed in arrays still need to be released manually, and will not be
// released by this function.
func Concatenate(arrs []Interface, mem memory.Allocator) (Interface, error) {
if len(arrs) == 0 {
return nil, xerrors.New("array/concat: must pass at least one array")
}
// gather Data of inputs
data := make([]*Data, len(arrs))
for i, ar := range arrs {
if !arrow.TypeEqual(ar.DataType(), arrs[0].DataType()) {
return nil, xerrors.Errorf("arrays to be concatenated must be identically typed, but %s and %s were encountered",
arrs[0].DataType(), ar.DataType())
}
data[i] = ar.Data()
}
out, err := concat(data, mem)
if err != nil {
return nil, err
}
defer out.Release()
return MakeFromData(out), nil
}
// simple struct to hold ranges
type rng struct {
offset, len int
}
// simple bitmap struct to reference a specific slice of a bitmap where the range
// offset and length are in bits
type bitmap struct {
data []byte
rng rng
}
// gather up the bitmaps from the passed in data objects
func gatherBitmaps(data []*Data, idx int) []bitmap {
out := make([]bitmap, len(data))
for i, d := range data {
if d.buffers[idx] != nil {
out[i].data = d.buffers[idx].Bytes()
}
out[i].rng.offset = d.offset
out[i].rng.len = d.length
}
return out
}
// gatherFixedBuffers gathers up the buffer objects of the given index, specifically
// returning only the slices of the buffers which are relevant to the passed in arrays
// in case they are themselves slices of other arrays. nil buffers are ignored and not
// in the output slice.
func gatherFixedBuffers(data []*Data, idx, byteWidth int) []*memory.Buffer {
out := make([]*memory.Buffer, 0, len(data))
for _, d := range data {
buf := d.buffers[idx]
if buf == nil {
continue
}
out = append(out, memory.NewBufferBytes(buf.Bytes()[d.offset*byteWidth:(d.offset+d.length)*byteWidth]))
}
return out
}
// gatherBuffersFixedWidthType is like gatherFixedBuffers, but uses a datatype to determine the size
// to use for determining the byte slice rather than a passed in bytewidth.
func gatherBuffersFixedWidthType(data []*Data, idx int, fixed arrow.FixedWidthDataType) []*memory.Buffer {
return gatherFixedBuffers(data, idx, fixed.BitWidth()/8)
}
// gatherBufferRanges requires that len(ranges) == len(data) and returns a list of buffers
// which represent the corresponding range of each buffer in the specified index of each
// data object.
func gatherBufferRanges(data []*Data, idx int, ranges []rng) []*memory.Buffer {
out := make([]*memory.Buffer, 0, len(data))
for i, d := range data {
buf := d.buffers[idx]
if buf == nil {
debug.Assert(ranges[i].len == 0, "misaligned buffer value ranges")
continue
}
out = append(out, memory.NewBufferBytes(buf.Bytes()[ranges[i].offset:ranges[i].offset+ranges[i].len]))
}
return out
}
// gatherChildren gathers the children data objects for child of index idx for all of the data objects.
func gatherChildren(data []*Data, idx int) []*Data {
return gatherChildrenMultiplier(data, idx, 1)
}
// gatherChildrenMultiplier gathers the full data slice of the underlying values from the children data objects
// such as the values data for a list array so that it can return a slice of the buffer for a given
// index into the children.
func gatherChildrenMultiplier(data []*Data, idx, multiplier int) []*Data {
out := make([]*Data, len(data))
for i, d := range data {
out[i] = NewSliceData(d.childData[idx], int64(d.offset*multiplier), int64(d.offset+d.length)*int64(multiplier))
}
return out
}
// gatherChildrenRanges returns a slice of Data objects which each represent slices of the given ranges from the
// child in the specified index from each data object.
func gatherChildrenRanges(data []*Data, idx int, ranges []rng) []*Data {
debug.Assert(len(data) == len(ranges), "mismatched children ranges for concat")
out := make([]*Data, len(data))
for i, d := range data {
out[i] = NewSliceData(d.childData[idx], int64(ranges[i].offset), int64(ranges[i].offset+ranges[i].len))
}
return out
}
// creates a single contiguous buffer which contains the concatenation of all of the passed
// in buffer objects.
func concatBuffers(bufs []*memory.Buffer, mem memory.Allocator) *memory.Buffer {
outLen := 0
for _, b := range bufs {
outLen += b.Len()
}
out := memory.NewResizableBuffer(mem)
out.Resize(outLen)
data := out.Bytes()
for _, b := range bufs {
copy(data, b.Bytes())
data = data[b.Len():]
}
return out
}
// concatOffsets creates a single offset buffer which represents the concatenation of all of the
// offsets buffers, adjusting the offsets appropriately to their new relative locations.
//
// It also returns the list of ranges that need to be fetched for the corresponding value buffers
// to construct the final concatenated value buffer.
func concatOffsets(buffers []*memory.Buffer, mem memory.Allocator) (*memory.Buffer, []rng, error) {
outLen := 0
for _, b := range buffers {
outLen += b.Len() / arrow.Int32SizeBytes
}
out := memory.NewResizableBuffer(mem)
out.Resize(arrow.Int32Traits.BytesRequired(outLen + 1))
dst := arrow.Int32Traits.CastFromBytes(out.Bytes())
valuesRanges := make([]rng, len(buffers))
nextOffset := int32(0)
nextElem := int(0)
for i, b := range buffers {
if b.Len() == 0 {
valuesRanges[i].offset = 0
valuesRanges[i].len = 0
continue
}
// when we gather our buffers, we sliced off the last offset from the buffer
// so that we could count the lengths accurately
src := arrow.Int32Traits.CastFromBytes(b.Bytes())
valuesRanges[i].offset = int(src[0])
// expand our slice to see that final offset
expand := src[:len(src)+1]
// compute the length of this range by taking the final offset and subtracting where we started.
valuesRanges[i].len = int(expand[len(src)]) - valuesRanges[i].offset
if nextOffset > math.MaxInt32-int32(valuesRanges[i].len) {
return nil, nil, xerrors.New("offset overflow while concatenating arrays")
}
// adjust each offset by the difference between our last ending point and our starting point
adj := nextOffset - src[0]
for j, o := range src {
dst[nextElem+j] = adj + o
}
// the next index for an element in the output buffer
nextElem += b.Len() / arrow.Int32SizeBytes
// update our offset counter to be the total current length of our output
nextOffset += int32(valuesRanges[i].len)
}
// final offset should point to the end of the data
dst[outLen] = nextOffset
return out, valuesRanges, nil
}
// concat is the implementation for actually performing the concatenation of the *array.Data
// objects that we can call internally for nested types.
func concat(data []*Data, mem memory.Allocator) (*Data, error) {
out := &Data{refCount: 1, dtype: data[0].dtype, nulls: 0}
for _, d := range data {
out.length += d.length
if out.nulls == UnknownNullCount || d.nulls == UnknownNullCount {
out.nulls = UnknownNullCount
continue
}
out.nulls += d.nulls
}
out.buffers = make([]*memory.Buffer, len(data[0].buffers))
if out.nulls != 0 && out.dtype.ID() != arrow.NULL {
bm, err := concatBitmaps(gatherBitmaps(data, 0), mem)
if err != nil {
return nil, err
}
out.buffers[0] = bm
}
switch dt := out.dtype.(type) {
case *arrow.NullType:
case *arrow.BooleanType:
bm, err := concatBitmaps(gatherBitmaps(data, 1), mem)
if err != nil {
return nil, err
}
out.buffers[1] = bm
case arrow.FixedWidthDataType:
out.buffers[1] = concatBuffers(gatherBuffersFixedWidthType(data, 1, dt), mem)
case arrow.BinaryDataType:
offsetBuffer, valueRanges, err := concatOffsets(gatherFixedBuffers(data, 1, arrow.Int32SizeBytes), mem)
if err != nil {
return nil, err
}
out.buffers[2] = concatBuffers(gatherBufferRanges(data, 2, valueRanges), mem)
out.buffers[1] = offsetBuffer
case *arrow.ListType:
offsetBuffer, valueRanges, err := concatOffsets(gatherFixedBuffers(data, 1, arrow.Int32SizeBytes), mem)
if err != nil {
return nil, err
}
childData := gatherChildrenRanges(data, 0, valueRanges)
for _, c := range childData {
defer c.Release()
}
out.buffers[1] = offsetBuffer
out.childData = make([]*Data, 1)
out.childData[0], err = concat(childData, mem)
if err != nil {
return nil, err
}
case *arrow.FixedSizeListType:
childData := gatherChildrenMultiplier(data, 0, int(dt.Len()))
for _, c := range childData {
defer c.Release()
}
children, err := concat(childData, mem)
if err != nil {
return nil, err
}
out.childData = []*Data{children}
case *arrow.StructType:
out.childData = make([]*Data, len(dt.Fields()))
for i := range dt.Fields() {
children := gatherChildren(data, i)
for _, c := range children {
defer c.Release()
}
childData, err := concat(children, mem)
if err != nil {
return nil, err
}
out.childData[i] = childData
}
case *arrow.MapType:
offsetBuffer, valueRanges, err := concatOffsets(gatherFixedBuffers(data, 1, arrow.Int32SizeBytes), mem)
if err != nil {
return nil, err
}
childData := gatherChildrenRanges(data, 0, valueRanges)
for _, c := range childData {
defer c.Release()
}
out.buffers[1] = offsetBuffer
out.childData = make([]*Data, 1)
out.childData[0], err = concat(childData, mem)
if err != nil {
return nil, err
}
default:
return nil, xerrors.Errorf("concatenate not implemented for type %s", dt)
}
return out, nil
}
// check overflow in the addition, taken from bits.Add but adapted for signed integers
// rather than unsigned integers. bits.UintSize will be either 32 or 64 based on
// whether our architecture is 32 bit or 64. The operation is the same for both cases,
// the only difference is how much we need to shift by 30 for 32 bit and 62 for 64 bit.
// Thus, bits.UintSize - 2 is how much we shift right by to check if we had an overflow
// in the signed addition.
//
// First return is the result of the sum, the second return is true if there was an overflow
func addOvf(x, y int) (int, bool) {
sum := x + y
return sum, ((x&y)|((x|y)&^sum))>>(bits.UintSize-2) == 1
}
// concatenate bitmaps together and return a buffer with the combined bitmaps
func concatBitmaps(bitmaps []bitmap, mem memory.Allocator) (*memory.Buffer, error) {
var (
outlen int
overflow bool
)
for _, bm := range bitmaps {
if outlen, overflow = addOvf(outlen, bm.rng.len); overflow {
return nil, xerrors.New("length overflow when concatenating arrays")
}
}
out := memory.NewResizableBuffer(mem)
out.Resize(int(bitutil.BytesForBits(int64(outlen))))
dst := out.Bytes()
offset := 0
for _, bm := range bitmaps {
if bm.data == nil { // if the bitmap is nil, that implies that the value is true for all elements
bitutil.SetBitsTo(out.Bytes(), int64(offset), int64(bm.rng.len), true)
} else {
bitutil.CopyBitmap(bm.data, bm.rng.offset, bm.rng.len, dst, offset)
}
offset += bm.rng.len
}
return out, nil
}