func concat()

in arrow/array/concat.go [519:773]


func concat(data []arrow.ArrayData, mem memory.Allocator) (arr arrow.ArrayData, err error) {
	out := &Data{dtype: data[0].DataType(), nulls: 0}
	out.refCount.Add(1)

	defer func() {
		if pErr := recover(); pErr != nil {
			err = utils.FormatRecoveredError("arrow/concat", pErr)
		}
		if err != nil {
			out.Release()
		}
	}()
	for _, d := range data {
		out.length += d.Len()
		if out.nulls == UnknownNullCount || d.NullN() == UnknownNullCount {
			out.nulls = UnknownNullCount
			continue
		}
		out.nulls += d.NullN()
	}

	out.buffers = make([]*memory.Buffer, len(data[0].Buffers()))
	if out.nulls != 0 && out.dtype.ID() != arrow.NULL {
		bm, err := concatBitmaps(gatherBitmaps(data, 0), mem)
		if err != nil {
			return nil, err
		}
		out.buffers[0] = bm
	}

	dt := out.dtype
	if dt.ID() == arrow.EXTENSION {
		dt = dt.(arrow.ExtensionType).StorageType()
	}

	switch dt := dt.(type) {
	case *arrow.NullType:
	case *arrow.BooleanType:
		bm, err := concatBitmaps(gatherBitmaps(data, 1), mem)
		if err != nil {
			return nil, err
		}
		out.buffers[1] = bm
	case *arrow.DictionaryType:
		idxType := dt.IndexType.(arrow.FixedWidthDataType)
		// two cases: all dictionaries are the same or we need to unify them
		dictsSame := true
		dict0 := MakeFromData(data[0].Dictionary())
		defer dict0.Release()
		for _, d := range data {
			dict := MakeFromData(d.Dictionary())
			if !Equal(dict0, dict) {
				dict.Release()
				dictsSame = false
				break
			}
			dict.Release()
		}

		indexBuffers := gatherBuffersFixedWidthType(data, 1, idxType)
		if dictsSame {
			out.dictionary = dict0.Data().(*Data)
			out.dictionary.Retain()
			out.buffers[1] = concatBuffers(indexBuffers, mem)
			break
		}

		indexLookup, unifiedDict, err := unifyDictionaries(mem, data, dt)
		if err != nil {
			return nil, err
		}
		defer unifiedDict.Release()
		out.dictionary = unifiedDict.Data().(*Data)
		out.dictionary.Retain()

		out.buffers[1], err = concatDictIndices(mem, data, idxType, indexLookup)
		if err != nil {
			return nil, err
		}
	case arrow.FixedWidthDataType:
		out.buffers[1] = concatBuffers(gatherBuffersFixedWidthType(data, 1, dt), mem)
	case arrow.BinaryViewDataType:
		out.buffers = out.buffers[:2]
		for _, d := range data {
			for _, buf := range d.Buffers()[2:] {
				buf.Retain()
				out.buffers = append(out.buffers, buf)
			}
		}

		out.buffers[1] = concatBuffers(gatherFixedBuffers(data, 1, arrow.ViewHeaderSizeBytes), mem)

		var (
			s                  = arrow.ViewHeaderTraits.CastFromBytes(out.buffers[1].Bytes())
			i                  = data[0].Len()
			precedingBufsCount int
		)

		for idx := 1; idx < len(data); idx++ {
			precedingBufsCount += len(data[idx-1].Buffers()) - 2

			for end := i + data[idx].Len(); i < end; i++ {
				if s[i].IsInline() {
					continue
				}

				bufIndex := s[i].BufferIndex() + int32(precedingBufsCount)
				s[i].SetIndexOffset(bufIndex, s[i].BufferOffset())
			}
		}
	case arrow.BinaryDataType:
		offsetWidth := dt.Layout().Buffers[1].ByteWidth
		offsetBuffer, valueRanges, err := concatOffsets(gatherFixedBuffers(data, 1, offsetWidth), offsetWidth, mem)
		if err != nil {
			return nil, err
		}
		out.buffers[1] = offsetBuffer
		out.buffers[2] = concatBuffers(gatherBufferRanges(data, 2, valueRanges), mem)
	case *arrow.ListType:
		offsetWidth := dt.Layout().Buffers[1].ByteWidth
		offsetBuffer, valueRanges, err := concatOffsets(gatherFixedBuffers(data, 1, offsetWidth), offsetWidth, mem)
		if err != nil {
			return nil, err
		}
		childData := gatherChildrenRanges(data, 0, valueRanges)
		for _, c := range childData {
			defer c.Release()
		}

		out.buffers[1] = offsetBuffer
		out.childData = make([]arrow.ArrayData, 1)
		out.childData[0], err = concat(childData, mem)
		if err != nil {
			return nil, err
		}
	case *arrow.LargeListType:
		offsetWidth := dt.Layout().Buffers[1].ByteWidth
		offsetBuffer, valueRanges, err := concatOffsets(gatherFixedBuffers(data, 1, offsetWidth), offsetWidth, mem)
		if err != nil {
			return nil, err
		}
		childData := gatherChildrenRanges(data, 0, valueRanges)
		for _, c := range childData {
			defer c.Release()
		}

		out.buffers[1] = offsetBuffer
		out.childData = make([]arrow.ArrayData, 1)
		out.childData[0], err = concat(childData, mem)
		if err != nil {
			return nil, err
		}
	case *arrow.ListViewType:
		offsetType := arrow.PrimitiveTypes.Int32.(arrow.FixedWidthDataType)
		err := concatListView(data, offsetType, out, mem)
		if err != nil {
			return nil, err
		}
	case *arrow.LargeListViewType:
		offsetType := arrow.PrimitiveTypes.Int64.(arrow.FixedWidthDataType)
		err := concatListView(data, offsetType, out, mem)
		if err != nil {
			return nil, err
		}
	case *arrow.FixedSizeListType:
		childData := gatherChildrenMultiplier(data, 0, int(dt.Len()))
		for _, c := range childData {
			defer c.Release()
		}

		children, err := concat(childData, mem)
		if err != nil {
			return nil, err
		}
		out.childData = []arrow.ArrayData{children}
	case *arrow.StructType:
		out.childData = make([]arrow.ArrayData, dt.NumFields())
		for i := range dt.Fields() {
			children := gatherChildren(data, i)
			for _, c := range children {
				defer c.Release()
			}

			childData, err := concat(children, mem)
			if err != nil {
				return nil, err
			}
			out.childData[i] = childData
		}
	case *arrow.MapType:
		offsetWidth := dt.Layout().Buffers[1].ByteWidth
		offsetBuffer, valueRanges, err := concatOffsets(gatherFixedBuffers(data, 1, offsetWidth), offsetWidth, mem)
		if err != nil {
			return nil, err
		}
		childData := gatherChildrenRanges(data, 0, valueRanges)
		for _, c := range childData {
			defer c.Release()
		}

		out.buffers[1] = offsetBuffer
		out.childData = make([]arrow.ArrayData, 1)
		out.childData[0], err = concat(childData, mem)
		if err != nil {
			return nil, err
		}
	case *arrow.RunEndEncodedType:
		physicalLength, overflow := int(0), false
		// we can't use gatherChildren because the Offset and Len of
		// data doesn't correspond to the physical length or offset
		runs := make([]arrow.ArrayData, len(data))
		values := make([]arrow.ArrayData, len(data))
		for i, d := range data {
			plen := encoded.GetPhysicalLength(d)
			off := encoded.FindPhysicalOffset(d)

			runs[i] = NewSliceData(d.Children()[0], int64(off), int64(off+plen))
			defer runs[i].Release()
			values[i] = NewSliceData(d.Children()[1], int64(off), int64(off+plen))
			defer values[i].Release()

			physicalLength, overflow = addOvf(physicalLength, plen)
			if overflow {
				return nil, fmt.Errorf("%w: run end encoded array length must fit into a 32-bit signed integer",
					arrow.ErrInvalid)
			}
		}

		runEndsByteWidth := runs[0].DataType().(arrow.FixedWidthDataType).Bytes()
		runEndsBuffers := gatherFixedBuffers(runs, 1, runEndsByteWidth)
		outRunEndsLen := physicalLength * runEndsByteWidth
		outRunEndsBuf := memory.NewResizableBuffer(mem)
		outRunEndsBuf.Resize(outRunEndsLen)
		defer outRunEndsBuf.Release()

		if err := updateRunEnds(runEndsByteWidth, data, runEndsBuffers, outRunEndsBuf); err != nil {
			return nil, err
		}

		out.childData = make([]arrow.ArrayData, 2)
		out.childData[0] = NewData(data[0].Children()[0].DataType(), int(physicalLength),
			[]*memory.Buffer{nil, outRunEndsBuf}, nil, 0, 0)

		var err error
		out.childData[1], err = concat(values, mem)
		if err != nil {
			out.childData[0].Release()
			return nil, err
		}
	default:
		return nil, fmt.Errorf("concatenate not implemented for type %s", dt)
	}

	return out, nil
}