in arrow/array/concat.go [519:773]
func concat(data []arrow.ArrayData, mem memory.Allocator) (arr arrow.ArrayData, err error) {
out := &Data{dtype: data[0].DataType(), nulls: 0}
out.refCount.Add(1)
defer func() {
if pErr := recover(); pErr != nil {
err = utils.FormatRecoveredError("arrow/concat", pErr)
}
if err != nil {
out.Release()
}
}()
for _, d := range data {
out.length += d.Len()
if out.nulls == UnknownNullCount || d.NullN() == UnknownNullCount {
out.nulls = UnknownNullCount
continue
}
out.nulls += d.NullN()
}
out.buffers = make([]*memory.Buffer, len(data[0].Buffers()))
if out.nulls != 0 && out.dtype.ID() != arrow.NULL {
bm, err := concatBitmaps(gatherBitmaps(data, 0), mem)
if err != nil {
return nil, err
}
out.buffers[0] = bm
}
dt := out.dtype
if dt.ID() == arrow.EXTENSION {
dt = dt.(arrow.ExtensionType).StorageType()
}
switch dt := dt.(type) {
case *arrow.NullType:
case *arrow.BooleanType:
bm, err := concatBitmaps(gatherBitmaps(data, 1), mem)
if err != nil {
return nil, err
}
out.buffers[1] = bm
case *arrow.DictionaryType:
idxType := dt.IndexType.(arrow.FixedWidthDataType)
// two cases: all dictionaries are the same or we need to unify them
dictsSame := true
dict0 := MakeFromData(data[0].Dictionary())
defer dict0.Release()
for _, d := range data {
dict := MakeFromData(d.Dictionary())
if !Equal(dict0, dict) {
dict.Release()
dictsSame = false
break
}
dict.Release()
}
indexBuffers := gatherBuffersFixedWidthType(data, 1, idxType)
if dictsSame {
out.dictionary = dict0.Data().(*Data)
out.dictionary.Retain()
out.buffers[1] = concatBuffers(indexBuffers, mem)
break
}
indexLookup, unifiedDict, err := unifyDictionaries(mem, data, dt)
if err != nil {
return nil, err
}
defer unifiedDict.Release()
out.dictionary = unifiedDict.Data().(*Data)
out.dictionary.Retain()
out.buffers[1], err = concatDictIndices(mem, data, idxType, indexLookup)
if err != nil {
return nil, err
}
case arrow.FixedWidthDataType:
out.buffers[1] = concatBuffers(gatherBuffersFixedWidthType(data, 1, dt), mem)
case arrow.BinaryViewDataType:
out.buffers = out.buffers[:2]
for _, d := range data {
for _, buf := range d.Buffers()[2:] {
buf.Retain()
out.buffers = append(out.buffers, buf)
}
}
out.buffers[1] = concatBuffers(gatherFixedBuffers(data, 1, arrow.ViewHeaderSizeBytes), mem)
var (
s = arrow.ViewHeaderTraits.CastFromBytes(out.buffers[1].Bytes())
i = data[0].Len()
precedingBufsCount int
)
for idx := 1; idx < len(data); idx++ {
precedingBufsCount += len(data[idx-1].Buffers()) - 2
for end := i + data[idx].Len(); i < end; i++ {
if s[i].IsInline() {
continue
}
bufIndex := s[i].BufferIndex() + int32(precedingBufsCount)
s[i].SetIndexOffset(bufIndex, s[i].BufferOffset())
}
}
case arrow.BinaryDataType:
offsetWidth := dt.Layout().Buffers[1].ByteWidth
offsetBuffer, valueRanges, err := concatOffsets(gatherFixedBuffers(data, 1, offsetWidth), offsetWidth, mem)
if err != nil {
return nil, err
}
out.buffers[1] = offsetBuffer
out.buffers[2] = concatBuffers(gatherBufferRanges(data, 2, valueRanges), mem)
case *arrow.ListType:
offsetWidth := dt.Layout().Buffers[1].ByteWidth
offsetBuffer, valueRanges, err := concatOffsets(gatherFixedBuffers(data, 1, offsetWidth), offsetWidth, mem)
if err != nil {
return nil, err
}
childData := gatherChildrenRanges(data, 0, valueRanges)
for _, c := range childData {
defer c.Release()
}
out.buffers[1] = offsetBuffer
out.childData = make([]arrow.ArrayData, 1)
out.childData[0], err = concat(childData, mem)
if err != nil {
return nil, err
}
case *arrow.LargeListType:
offsetWidth := dt.Layout().Buffers[1].ByteWidth
offsetBuffer, valueRanges, err := concatOffsets(gatherFixedBuffers(data, 1, offsetWidth), offsetWidth, mem)
if err != nil {
return nil, err
}
childData := gatherChildrenRanges(data, 0, valueRanges)
for _, c := range childData {
defer c.Release()
}
out.buffers[1] = offsetBuffer
out.childData = make([]arrow.ArrayData, 1)
out.childData[0], err = concat(childData, mem)
if err != nil {
return nil, err
}
case *arrow.ListViewType:
offsetType := arrow.PrimitiveTypes.Int32.(arrow.FixedWidthDataType)
err := concatListView(data, offsetType, out, mem)
if err != nil {
return nil, err
}
case *arrow.LargeListViewType:
offsetType := arrow.PrimitiveTypes.Int64.(arrow.FixedWidthDataType)
err := concatListView(data, offsetType, out, mem)
if err != nil {
return nil, err
}
case *arrow.FixedSizeListType:
childData := gatherChildrenMultiplier(data, 0, int(dt.Len()))
for _, c := range childData {
defer c.Release()
}
children, err := concat(childData, mem)
if err != nil {
return nil, err
}
out.childData = []arrow.ArrayData{children}
case *arrow.StructType:
out.childData = make([]arrow.ArrayData, dt.NumFields())
for i := range dt.Fields() {
children := gatherChildren(data, i)
for _, c := range children {
defer c.Release()
}
childData, err := concat(children, mem)
if err != nil {
return nil, err
}
out.childData[i] = childData
}
case *arrow.MapType:
offsetWidth := dt.Layout().Buffers[1].ByteWidth
offsetBuffer, valueRanges, err := concatOffsets(gatherFixedBuffers(data, 1, offsetWidth), offsetWidth, mem)
if err != nil {
return nil, err
}
childData := gatherChildrenRanges(data, 0, valueRanges)
for _, c := range childData {
defer c.Release()
}
out.buffers[1] = offsetBuffer
out.childData = make([]arrow.ArrayData, 1)
out.childData[0], err = concat(childData, mem)
if err != nil {
return nil, err
}
case *arrow.RunEndEncodedType:
physicalLength, overflow := int(0), false
// we can't use gatherChildren because the Offset and Len of
// data doesn't correspond to the physical length or offset
runs := make([]arrow.ArrayData, len(data))
values := make([]arrow.ArrayData, len(data))
for i, d := range data {
plen := encoded.GetPhysicalLength(d)
off := encoded.FindPhysicalOffset(d)
runs[i] = NewSliceData(d.Children()[0], int64(off), int64(off+plen))
defer runs[i].Release()
values[i] = NewSliceData(d.Children()[1], int64(off), int64(off+plen))
defer values[i].Release()
physicalLength, overflow = addOvf(physicalLength, plen)
if overflow {
return nil, fmt.Errorf("%w: run end encoded array length must fit into a 32-bit signed integer",
arrow.ErrInvalid)
}
}
runEndsByteWidth := runs[0].DataType().(arrow.FixedWidthDataType).Bytes()
runEndsBuffers := gatherFixedBuffers(runs, 1, runEndsByteWidth)
outRunEndsLen := physicalLength * runEndsByteWidth
outRunEndsBuf := memory.NewResizableBuffer(mem)
outRunEndsBuf.Resize(outRunEndsLen)
defer outRunEndsBuf.Release()
if err := updateRunEnds(runEndsByteWidth, data, runEndsBuffers, outRunEndsBuf); err != nil {
return nil, err
}
out.childData = make([]arrow.ArrayData, 2)
out.childData[0] = NewData(data[0].Children()[0].DataType(), int(physicalLength),
[]*memory.Buffer{nil, outRunEndsBuf}, nil, 0, 0)
var err error
out.childData[1], err = concat(values, mem)
if err != nil {
out.childData[0].Release()
return nil, err
}
default:
return nil, fmt.Errorf("concatenate not implemented for type %s", dt)
}
return out, nil
}