arrow/compute/internal/kernels/vector_selection.go (1,444 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. //go:build go1.18 package kernels import ( "fmt" "math" "github.com/apache/arrow-go/v18/arrow" "github.com/apache/arrow-go/v18/arrow/array" "github.com/apache/arrow-go/v18/arrow/bitutil" "github.com/apache/arrow-go/v18/arrow/compute/exec" "github.com/apache/arrow-go/v18/arrow/internal/debug" "github.com/apache/arrow-go/v18/arrow/memory" "github.com/apache/arrow-go/v18/internal/bitutils" ) type NullSelectionBehavior int8 const ( DropNulls NullSelectionBehavior = iota EmitNulls ) type FilterOptions struct { NullSelection NullSelectionBehavior `compute:"null_selection_behavior"` } func (FilterOptions) TypeName() string { return "FilterOptions" } type FilterState = FilterOptions type TakeOptions struct { BoundsCheck bool } func (TakeOptions) TypeName() string { return "TakeOptions" } type TakeState = TakeOptions func getFilterOutputSize(filter *exec.ArraySpan, nullSelection NullSelectionBehavior) (size int64) { if filter.MayHaveNulls() { counter := bitutils.NewBinaryBitBlockCounter(filter.Buffers[1].Buf, filter.Buffers[0].Buf, filter.Offset, filter.Offset, filter.Len) pos := int64(0) if nullSelection == EmitNulls { for pos < filter.Len { block := counter.NextOrNotWord() size += int64(block.Popcnt) pos += int64(block.Len) } } else { for pos < filter.Len { block := counter.NextAndWord() size += int64(block.Popcnt) pos += int64(block.Len) } } return } // filter has no nulls, so we can just use CountSetBits return int64(bitutil.CountSetBits(filter.Buffers[1].Buf, int(filter.Offset), int(filter.Len))) } func preallocateData(ctx *exec.KernelCtx, length int64, bitWidth int, allocateValidity bool, out *exec.ExecResult) { out.Len = length if allocateValidity { out.Buffers[0].WrapBuffer(ctx.AllocateBitmap(length)) } if bitWidth == 1 { out.Buffers[1].WrapBuffer(ctx.AllocateBitmap(length)) } else { out.Buffers[1].WrapBuffer(ctx.Allocate(int(length) * (bitWidth / 8))) } } type builder[T any] interface { array.Builder Append(T) UnsafeAppend(T) UnsafeAppendBoolToBitmap(bool) } func getTakeIndices[T arrow.IntType | arrow.UintType](mem memory.Allocator, filter *exec.ArraySpan, nullSelect NullSelectionBehavior) arrow.ArrayData { var ( filterData = filter.Buffers[1].Buf haveFilterNulls = filter.MayHaveNulls() filterIsValid = filter.Buffers[0].Buf idxType = arrow.GetDataType[T]() ) if haveFilterNulls && nullSelect == EmitNulls { // Most complex case: the filter may have nulls and we don't drop them. // The logic is ternary: // - filter is null: emit null // - filter is valid and true: emit index // - filter is valid and false: don't emit anything bldr := array.NewBuilder(mem, idxType).(builder[T]) defer bldr.Release() // position relative to start of filter var pos T // current position taking the filter offset into account posWithOffset := filter.Offset // to count blocks where filterData[i] || !filterIsValid[i] filterCounter := bitutils.NewBinaryBitBlockCounter(filterData, filterIsValid, filter.Offset, filter.Offset, filter.Len) isValidCounter := bitutils.NewBitBlockCounter(filterIsValid, filter.Offset, filter.Len) for int64(pos) < filter.Len { // true OR NOT valid selectedOrNullBlock := filterCounter.NextOrNotWord() if selectedOrNullBlock.NoneSet() { pos += T(selectedOrNullBlock.Len) posWithOffset += int64(selectedOrNullBlock.Len) continue } bldr.Reserve(int(selectedOrNullBlock.Popcnt)) // if the values are all valid and the selectedOrNullBlock // is full, then we can infer that all the values are true // and skip the bit checking isValidBlock := isValidCounter.NextWord() if selectedOrNullBlock.AllSet() && isValidBlock.AllSet() { // all the values are selected and non-null for i := 0; i < int(selectedOrNullBlock.Len); i++ { bldr.UnsafeAppend(pos) pos++ } posWithOffset += int64(selectedOrNullBlock.Len) } else { // some of the values are false or null for i := 0; i < int(selectedOrNullBlock.Len); i++ { if bitutil.BitIsSet(filterIsValid, int(posWithOffset)) { if bitutil.BitIsSet(filterData, int(posWithOffset)) { bldr.UnsafeAppend(pos) } } else { // null slot, append null bldr.UnsafeAppendBoolToBitmap(false) } pos++ posWithOffset++ } } } result := bldr.NewArray() defer result.Release() result.Data().Retain() return result.Data() } bldr := newBufferBuilder[T](mem) if haveFilterNulls { // the filter may have nulls, so we scan the validity bitmap // and the filter data bitmap together debug.Assert(nullSelect == DropNulls, "incorrect nullselect logic") // position relative to start of the filter var pos T // current position taking the filter offset into account posWithOffset := filter.Offset filterCounter := bitutils.NewBinaryBitBlockCounter(filterData, filterIsValid, filter.Offset, filter.Offset, filter.Len) for int64(pos) < filter.Len { andBlock := filterCounter.NextAndWord() bldr.reserve(int(andBlock.Popcnt)) if andBlock.AllSet() { // all the values are selected and non-null for i := 0; i < int(andBlock.Len); i++ { bldr.unsafeAppend(pos) pos++ } posWithOffset += int64(andBlock.Len) } else if !andBlock.NoneSet() { // some values are false or null for i := 0; i < int(andBlock.Len); i++ { if bitutil.BitIsSet(filterIsValid, int(posWithOffset)) && bitutil.BitIsSet(filterData, int(posWithOffset)) { bldr.unsafeAppend(pos) } pos++ posWithOffset++ } } else { pos += T(andBlock.Len) posWithOffset += int64(andBlock.Len) } } } else { // filter has no nulls, so we only need to look for true values bitutils.VisitSetBitRuns(filterData, filter.Offset, filter.Len, func(pos, length int64) error { // append consecutive run of indices bldr.reserve(int(length)) for i := int64(0); i < length; i++ { bldr.unsafeAppend(T(pos + i)) } return nil }) } length := bldr.len() outBuf := bldr.finish() defer outBuf.Release() return array.NewData(idxType, length, []*memory.Buffer{nil, outBuf}, nil, 0, 0) } func GetTakeIndices(mem memory.Allocator, filter *exec.ArraySpan, nullSelect NullSelectionBehavior) (arrow.ArrayData, error) { debug.Assert(filter.Type.ID() == arrow.BOOL, "filter should be a boolean array") if filter.Len < math.MaxUint16 { return getTakeIndices[uint16](mem, filter, nullSelect), nil } else if filter.Len < math.MaxUint32 { return getTakeIndices[uint32](mem, filter, nullSelect), nil } return nil, fmt.Errorf("%w: filter length exceeds UINT32_MAX, consider a different strategy for selecting elements", arrow.ErrNotImplemented) } type writeFiltered interface { OutPos() int WriteValue(int64) WriteValueSegment(int64, int64) WriteNull() } type dropNullCounter struct { dataCounter bitutils.BitBlockCounter dataValidityCounter bitutils.BinaryBitBlockCounter hasValidity bool } func newDropNullCounter(validity []byte, data []byte, offset int64, length int64) *dropNullCounter { return &dropNullCounter{ dataCounter: *bitutils.NewBitBlockCounter(data, offset, length), dataValidityCounter: *bitutils.NewBinaryBitBlockCounter(data, validity, offset, offset, length), hasValidity: len(validity) > 0, } } func (n *dropNullCounter) NextBlock() bitutils.BitBlockCount { if n.hasValidity { // filter is true AND not null return n.dataValidityCounter.NextAndWord() } return n.dataCounter.NextWord() } func primitiveFilterImpl(wr writeFiltered, values *exec.ArraySpan, filter *exec.ArraySpan, nullSelection NullSelectionBehavior, out *exec.ExecResult) { var ( valuesIsValid = values.Buffers[0].Buf filterIsValid = filter.Buffers[0].Buf filterData = filter.Buffers[1].Buf outIsValid = out.Buffers[0].Buf ) if filter.Nulls == 0 && values.Nulls == 0 { // fast filter path when values and filters have no nulls bitutils.VisitSetBitRuns(filterData, filter.Offset, values.Len, func(pos, length int64) error { wr.WriteValueSegment(pos, length) return nil }) return } var ( dropNulls = newDropNullCounter(filterIsValid, filterData, filter.Offset, values.Len) dataCounter = bitutils.NewOptionalBitBlockCounter(valuesIsValid, values.Offset, values.Len) filterValidCounter = bitutils.NewOptionalBitBlockCounter(filterIsValid, filter.Offset, values.Len) writeNotNull = func(idx int64) { bitutil.SetBit(outIsValid, int(out.Offset)+wr.OutPos()) wr.WriteValue(idx) } writeMaybeNull = func(idx int64) { bitutil.SetBitTo(outIsValid, int(out.Offset)+wr.OutPos(), bitutil.BitIsSet(valuesIsValid, int(values.Offset+idx))) wr.WriteValue(idx) } inPos int64 ) for inPos < values.Len { filterBlock := dropNulls.NextBlock() filterValidBlock := filterValidCounter.NextWord() dataBlock := dataCounter.NextWord() switch { case filterBlock.AllSet() && dataBlock.AllSet(): // faster path: all values in block are included and not null bitutil.SetBitsTo(outIsValid, out.Offset+int64(wr.OutPos()), int64(filterBlock.Len), true) wr.WriteValueSegment(inPos, int64(filterBlock.Len)) inPos += int64(filterBlock.Len) case filterBlock.AllSet(): // faster: all values are selected, but some are null // batch copy bits from values validity bitmap to output validity bitmap bitutil.CopyBitmap(valuesIsValid, int(values.Offset+inPos), int(filterBlock.Len), outIsValid, int(out.Offset)+wr.OutPos()) wr.WriteValueSegment(inPos, int64(filterBlock.Len)) inPos += int64(filterBlock.Len) case filterBlock.NoneSet() && nullSelection == DropNulls: // for this exceedingly common case in low-selectivity filters // we can skip further analysis of the data and move onto the next block inPos += int64(filterBlock.Len) default: // some filter values are false or null if dataBlock.AllSet() { // no values are null if filterValidBlock.AllSet() { // filter is non-null but some values are false for i := 0; i < int(filterBlock.Len); i++ { if bitutil.BitIsSet(filterData, int(filter.Offset+inPos)) { writeNotNull(inPos) } inPos++ } } else if nullSelection == DropNulls { // if any values are selected, they ARE NOT null for i := 0; i < int(filterBlock.Len); i++ { if bitutil.BitIsSet(filterIsValid, int(filter.Offset+inPos)) && bitutil.BitIsSet(filterData, int(filter.Offset+inPos)) { writeNotNull(inPos) } inPos++ } } else { // nullselect == EmitNulls // data values in this block are not null for i := 0; i < int(filterBlock.Len); i++ { isValid := bitutil.BitIsSet(filterIsValid, int(filter.Offset+inPos)) if isValid && bitutil.BitIsSet(filterData, int(filter.Offset+inPos)) { // filter slot is non-null and set writeNotNull(inPos) } else if !isValid { // filter slot is null, so we have a null in the output bitutil.ClearBit(outIsValid, int(out.Offset)+wr.OutPos()) wr.WriteNull() } inPos++ } } } else { // !dataBlock.AllSet() // some values are null if filterValidBlock.AllSet() { // filter is non-null but some values are false for i := 0; i < int(filterBlock.Len); i++ { if bitutil.BitIsSet(filterData, int(filter.Offset+inPos)) { writeMaybeNull(inPos) } inPos++ } } else if nullSelection == DropNulls { // if any values are selected they ARE NOT null for i := 0; i < int(filterBlock.Len); i++ { if bitutil.BitIsSet(filterIsValid, int(filter.Offset+inPos)) && bitutil.BitIsSet(filterData, int(filter.Offset+inPos)) { writeMaybeNull(inPos) } inPos++ } } else { // nullselect == emitnulls // Data values in this block are not null for i := 0; i < int(filterBlock.Len); i++ { isValid := bitutil.BitIsSet(filterIsValid, int(filter.Offset+inPos)) if isValid && bitutil.BitIsSet(filterData, int(filter.Offset+inPos)) { // filter slot is non-null and set writeMaybeNull(inPos) } else if !isValid { // filter slot is null, so we have a null in the output bitutil.ClearBit(outIsValid, int(out.Offset)+wr.OutPos()) wr.WriteNull() } inPos++ } } } } } } type filterWriter[T arrow.UintType] struct { outPosition int outOffset int valuesOffset int valuesData []T outData []T } func (f *filterWriter[T]) OutPos() int { return f.outPosition } func (f *filterWriter[T]) WriteValue(inPos int64) { f.outData[f.outPosition] = f.valuesData[inPos] f.outPosition++ } func (f *filterWriter[T]) WriteValueSegment(inStart, length int64) { copy(f.outData[f.outPosition:], f.valuesData[inStart:inStart+length]) f.outPosition += int(length) } func (f *filterWriter[T]) WriteNull() { var z T f.outData[f.outPosition] = z f.outPosition++ } type boolFilterWriter struct { outPosition int outOffset int valuesOffset int valuesData []byte outData []byte } func (b *boolFilterWriter) OutPos() int { return b.outPosition } func (b *boolFilterWriter) WriteValue(inPos int64) { bitutil.SetBitTo(b.outData, b.outOffset+b.outPosition, bitutil.BitIsSet(b.valuesData, b.valuesOffset+int(inPos))) } func (b *boolFilterWriter) WriteValueSegment(inStart, length int64) { bitutil.CopyBitmap(b.valuesData, b.valuesOffset+int(inStart), int(length), b.outData, b.outOffset+b.outPosition) b.outPosition += int(length) } func (b *boolFilterWriter) WriteNull() { bitutil.ClearBit(b.outData, b.outOffset+b.outPosition) b.outPosition++ } func PrimitiveFilter(ctx *exec.KernelCtx, batch *exec.ExecSpan, out *exec.ExecResult) error { var ( values = &batch.Values[0].Array filter = &batch.Values[1].Array nullSelection = ctx.State.(FilterState).NullSelection ) values.UpdateNullCount() filter.UpdateNullCount() outputLength := getFilterOutputSize(filter, nullSelection) // the output precomputed null count is unknown except in the narrow // condition that all the values are non-null and the filter will not // cause any new nulls to be created if values.Nulls == 0 && (nullSelection == DropNulls || filter.Nulls == 0) { out.Nulls = 0 } else { out.Nulls = array.UnknownNullCount } // when neither the values nor filter is known to have any nulls, // we will elect the optimized ExecNonNull path where there is no // need to populate a validity bitmap. allocateValidity := values.Nulls != 0 || filter.Nulls != 0 bitWidth := values.Type.(arrow.FixedWidthDataType).BitWidth() preallocateData(ctx, outputLength, bitWidth, allocateValidity, out) var wr writeFiltered switch bitWidth { case 1: wr = &boolFilterWriter{ outOffset: int(out.Offset), valuesOffset: int(values.Offset), outData: out.Buffers[1].Buf, valuesData: values.Buffers[1].Buf, } case 8: wr = &filterWriter[uint8]{ outOffset: int(out.Offset), valuesOffset: int(values.Offset), outData: exec.GetSpanValues[uint8](out, 1), valuesData: exec.GetSpanValues[uint8](values, 1), } case 16: wr = &filterWriter[uint16]{ outOffset: int(out.Offset), valuesOffset: int(values.Offset), outData: exec.GetSpanValues[uint16](out, 1), valuesData: exec.GetSpanValues[uint16](values, 1), } case 32: wr = &filterWriter[uint32]{ outOffset: int(out.Offset), valuesOffset: int(values.Offset), outData: exec.GetSpanValues[uint32](out, 1), valuesData: exec.GetSpanValues[uint32](values, 1), } case 64: wr = &filterWriter[uint64]{ outOffset: int(out.Offset), valuesOffset: int(values.Offset), outData: exec.GetSpanValues[uint64](out, 1), valuesData: exec.GetSpanValues[uint64](values, 1), } default: return fmt.Errorf("%w: invalid values bit width", arrow.ErrType) } primitiveFilterImpl(wr, values, filter, nullSelection, out) return nil } type primitiveGetter[T arrow.IntType | bool] interface { IsValid(int64) bool GetValue(int64) T NullCount() int64 Len() int64 } type boolGetter struct { inner *exec.ArraySpan values []byte } func (b *boolGetter) IsValid(i int64) bool { return bitutil.BitIsSet(b.inner.Buffers[0].Buf, int(b.inner.Offset+i)) } func (b *boolGetter) GetValue(i int64) bool { return bitutil.BitIsSet(b.values, int(b.inner.Offset+i)) } func (b *boolGetter) NullCount() int64 { return b.inner.Nulls } func (b *boolGetter) Len() int64 { return b.inner.Len } type primitiveGetterImpl[T arrow.IntType] struct { inner *exec.ArraySpan values []T } func (p *primitiveGetterImpl[T]) IsValid(i int64) bool { return bitutil.BitIsSet(p.inner.Buffers[0].Buf, int(p.inner.Offset+i)) } func (p *primitiveGetterImpl[T]) GetValue(i int64) T { return p.values[i] } func (p *primitiveGetterImpl[T]) NullCount() int64 { return p.inner.Nulls } func (p *primitiveGetterImpl[T]) Len() int64 { return p.inner.Len } type chunkedBoolGetter struct { inner *arrow.Chunked resolver *exec.ChunkResolver nulls int64 len int64 chunkLengths []int64 valuesData [][]byte valuesIsValid [][]byte valuesOffset []int64 } func newChunkedBoolGetter(arr *arrow.Chunked) *chunkedBoolGetter { nchunks := len(arr.Chunks()) lengths := make([]int64, nchunks) valuesData := make([][]byte, nchunks) valuesIsValid := make([][]byte, nchunks) valuesOffset := make([]int64, nchunks) for i, c := range arr.Chunks() { lengths[i] = int64(c.Len()) valuesOffset[i] = int64(c.Data().Offset()) valuesIsValid[i] = c.NullBitmapBytes() valuesData[i] = c.Data().Buffers()[1].Bytes() } return &chunkedBoolGetter{ inner: arr, resolver: exec.NewChunkResolver(arr.Chunks()), nulls: int64(arr.NullN()), len: int64(arr.Len()), chunkLengths: lengths, valuesData: valuesData, valuesIsValid: valuesIsValid, valuesOffset: valuesOffset, } } func (c *chunkedBoolGetter) IsValid(i int64) bool { chunk, chunkidx := c.resolver.Resolve(i) bm := c.valuesIsValid[chunk] if bm == nil { return true } return bitutil.BitIsSet(bm, int(c.valuesOffset[chunk]+chunkidx)) } func (c *chunkedBoolGetter) GetValue(i int64) bool { chunk, idx := c.resolver.Resolve(i) return bitutil.BitIsSet(c.valuesData[chunk], int(c.valuesOffset[chunk]+idx)) } func (c *chunkedBoolGetter) NullCount() int64 { return c.nulls } func (c *chunkedBoolGetter) Len() int64 { return c.len } type chunkedPrimitiveGetter[T arrow.IntType] struct { inner *arrow.Chunked resolver *exec.ChunkResolver nulls int64 len int64 chunkLengths []int64 valuesData [][]T valuesIsValid [][]byte valuesOffset []int64 } func newChunkedPrimitiveGetter[T arrow.IntType](arr *arrow.Chunked) *chunkedPrimitiveGetter[T] { nchunks := len(arr.Chunks()) lengths := make([]int64, nchunks) valuesData := make([][]T, nchunks) valuesIsValid := make([][]byte, nchunks) valuesOffset := make([]int64, nchunks) for i, c := range arr.Chunks() { lengths[i] = int64(c.Len()) valuesOffset[i] = int64(c.Data().Offset()) valuesIsValid[i] = c.NullBitmapBytes() valuesData[i] = arrow.GetValues[T](c.Data(), 1) } return &chunkedPrimitiveGetter[T]{ inner: arr, resolver: exec.NewChunkResolver(arr.Chunks()), nulls: int64(arr.NullN()), len: int64(arr.Len()), chunkLengths: lengths, valuesData: valuesData, valuesIsValid: valuesIsValid, valuesOffset: valuesOffset, } } func (c *chunkedPrimitiveGetter[T]) IsValid(i int64) bool { chunk, chunkidx := c.resolver.Resolve(i) bm := c.valuesIsValid[chunk] if bm == nil { return true } return bitutil.BitIsSet(bm, int(c.valuesOffset[chunk]+chunkidx)) } func (c *chunkedPrimitiveGetter[T]) GetValue(i int64) T { chunk, idx := c.resolver.Resolve(i) return c.valuesData[chunk][idx] } func (c *chunkedPrimitiveGetter[T]) NullCount() int64 { return c.nulls } func (c *chunkedPrimitiveGetter[T]) Len() int64 { return c.len } func primitiveTakeImpl[IdxT arrow.UintType, ValT arrow.IntType](values primitiveGetter[ValT], indices *exec.ArraySpan, out *exec.ExecResult) { var ( indicesData = exec.GetSpanValues[IdxT](indices, 1) indicesIsValid = indices.Buffers[0].Buf indicesOffset = indices.Offset outData = exec.GetSpanValues[ValT](out, 1) outIsValid = out.Buffers[0].Buf outOffset = out.Offset ) pos, validCount := int64(0), int64(0) if values.NullCount() == 0 && indices.Nulls == 0 { // values and indices are both never null // this means we didn't allocate the validity bitmap // and can simplify everything for i, idx := range indicesData { outData[i] = values.GetValue(int64(idx)) } out.Nulls = 0 return } indicesBitCounter := bitutils.NewOptionalBitBlockCounter(indicesIsValid, indicesOffset, indices.Len) for pos < indices.Len { block := indicesBitCounter.NextBlock() if values.NullCount() == 0 { // values are never null, so things are easier validCount += int64(block.Popcnt) if block.AllSet() { // fastest path: neither values nor index nulls bitutil.SetBitsTo(outIsValid, outOffset+pos, int64(block.Len), true) for i := 0; i < int(block.Len); i++ { outData[pos] = values.GetValue(int64(indicesData[pos])) pos++ } } else if block.Popcnt > 0 { // slow path: some indices but not all are null for i := 0; i < int(block.Len); i++ { if bitutil.BitIsSet(indicesIsValid, int(indicesOffset+pos)) { // index is not null bitutil.SetBit(outIsValid, int(outOffset+pos)) outData[pos] = values.GetValue(int64(indicesData[pos])) } pos++ } } else { pos += int64(block.Len) } } else { // values have nulls, so we must do random access into the values bitmap if block.AllSet() { // faster path: indices are not null but values may be for i := 0; i < int(block.Len); i++ { if values.IsValid(int64(indicesData[pos])) { // value is not null outData[pos] = values.GetValue(int64(indicesData[pos])) bitutil.SetBit(outIsValid, int(outOffset+pos)) validCount++ } pos++ } } else if block.Popcnt > 0 { // slow path: some but not all indices are null. since we // are doing random access in general we have to check the // value nullness one by one for i := 0; i < int(block.Len); i++ { if bitutil.BitIsSet(indicesIsValid, int(indicesOffset+pos)) && values.IsValid(int64(indicesData[pos])) { // index is not null && value is not null outData[pos] = values.GetValue(int64(indicesData[pos])) bitutil.SetBit(outIsValid, int(outOffset+pos)) validCount++ } pos++ } } else { pos += int64(block.Len) } } } out.Nulls = out.Len - validCount } func booleanTakeImpl[IdxT arrow.UintType](values primitiveGetter[bool], indices *exec.ArraySpan, out *exec.ExecResult) { var ( indicesData = exec.GetSpanValues[IdxT](indices, 1) indicesIsValid = indices.Buffers[0].Buf indicesOffset = indices.Offset outData = out.Buffers[1].Buf outIsValid = out.Buffers[0].Buf outOffset = out.Offset ) placeDataBit := func(loc int64, index IdxT) { bitutil.SetBitTo(outData, int(outOffset+loc), values.GetValue(int64(index))) } pos, validCount := int64(0), int64(0) if values.NullCount() == 0 && indices.Nulls == 0 { // values and indices are both never null // this means we didn't allocate the validity bitmap // and can simplify everything for i, idx := range indicesData { placeDataBit(int64(i), idx) } out.Nulls = 0 return } indicesBitCounter := bitutils.NewOptionalBitBlockCounter(indicesIsValid, indicesOffset, indices.Len) for pos < indices.Len { block := indicesBitCounter.NextBlock() if values.NullCount() == 0 { // values are never null so things are easier validCount += int64(block.Popcnt) if block.AllSet() { // fastest path: neither values nor index nulls bitutil.SetBitsTo(outIsValid, outOffset+pos, int64(block.Len), true) for i := 0; i < int(block.Len); i++ { placeDataBit(pos, indicesData[pos]) pos++ } } else if block.Popcnt > 0 { // slow path: some but not all indices are null for i := 0; i < int(block.Len); i++ { if bitutil.BitIsSet(indicesIsValid, int(indicesOffset+pos)) { // index is not null bitutil.SetBit(outIsValid, int(outOffset+pos)) placeDataBit(pos, indicesData[pos]) } pos++ } } else { pos += int64(block.Len) } } else { // values have nulls so we must do random access into the values bitmap if block.AllSet() { // faster path: indices are not null but values may be for i := 0; i < int(block.Len); i++ { if values.IsValid(int64(indicesData[pos])) { // value is not null bitutil.SetBit(outIsValid, int(outOffset+pos)) placeDataBit(pos, indicesData[pos]) validCount++ } pos++ } } else if block.Popcnt > 0 { // slow path: some but not all indices are null. // we have to check the values one by one for i := 0; i < int(block.Len); i++ { if bitutil.BitIsSet(indicesIsValid, int(indicesOffset+pos)) && values.IsValid(int64(indicesData[pos])) { placeDataBit(pos, indicesData[pos]) bitutil.SetBit(outIsValid, int(outOffset+pos)) validCount++ } pos++ } } else { pos += int64(block.Len) } } } out.Nulls = out.Len - validCount } func booleanTakeDispatchChunked(values, indices *arrow.Chunked, out []*exec.ExecResult) error { getter := newChunkedBoolGetter(values) var fn func(primitiveGetter[bool], *exec.ArraySpan, *exec.ExecResult) switch indices.DataType().(arrow.FixedWidthDataType).Bytes() { case 1: fn = booleanTakeImpl[uint8] case 2: fn = booleanTakeImpl[uint16] case 4: fn = booleanTakeImpl[uint32] case 8: fn = booleanTakeImpl[uint64] default: return fmt.Errorf("%w: invalid indices byte width", arrow.ErrIndex) } var indexSpan exec.ArraySpan for i, c := range indices.Chunks() { indexSpan.SetMembers(c.Data()) fn(getter, &indexSpan, out[i]) } return nil } func booleanTakeDispatch(values, indices *exec.ArraySpan, out *exec.ExecResult) error { getter := &boolGetter{inner: values, values: values.Buffers[1].Buf} switch indices.Type.(arrow.FixedWidthDataType).Bytes() { case 1: booleanTakeImpl[uint8](getter, indices, out) case 2: booleanTakeImpl[uint16](getter, indices, out) case 4: booleanTakeImpl[uint32](getter, indices, out) case 8: booleanTakeImpl[uint64](getter, indices, out) default: return fmt.Errorf("%w: invalid indices byte width", arrow.ErrIndex) } return nil } func takeIdxChunkedDispatch[ValT arrow.IntType](values, indices *arrow.Chunked, out []*exec.ExecResult) error { getter := newChunkedPrimitiveGetter[ValT](values) var fn func(primitiveGetter[ValT], *exec.ArraySpan, *exec.ExecResult) switch indices.DataType().(arrow.FixedWidthDataType).Bytes() { case 1: fn = primitiveTakeImpl[uint8, ValT] case 2: fn = primitiveTakeImpl[uint16, ValT] case 4: fn = primitiveTakeImpl[uint32, ValT] case 8: fn = primitiveTakeImpl[uint64, ValT] default: return fmt.Errorf("%w: invalid byte width for indices", arrow.ErrIndex) } var indexSpan exec.ArraySpan for i, c := range indices.Chunks() { indexSpan.SetMembers(c.Data()) fn(getter, &indexSpan, out[i]) } return nil } func takeIdxDispatch[ValT arrow.IntType](values, indices *exec.ArraySpan, out *exec.ExecResult) error { getter := &primitiveGetterImpl[ValT]{inner: values, values: exec.GetSpanValues[ValT](values, 1)} switch indices.Type.(arrow.FixedWidthDataType).Bytes() { case 1: primitiveTakeImpl[uint8](getter, indices, out) case 2: primitiveTakeImpl[uint16](getter, indices, out) case 4: primitiveTakeImpl[uint32](getter, indices, out) case 8: primitiveTakeImpl[uint64](getter, indices, out) default: return fmt.Errorf("%w: invalid indices byte width", arrow.ErrIndex) } return nil } func PrimitiveTake(ctx *exec.KernelCtx, batch *exec.ExecSpan, out *exec.ExecResult) error { var ( values = &batch.Values[0].Array indices = &batch.Values[1].Array ) if ctx.State.(TakeState).BoundsCheck { if err := checkIndexBounds(indices, uint64(values.Len)); err != nil { return err } } bitWidth := values.Type.(arrow.FixedWidthDataType).BitWidth() allocateValidity := values.Nulls != 0 || indices.Nulls != 0 preallocateData(ctx, indices.Len, bitWidth, allocateValidity, out) switch bitWidth { case 1: return booleanTakeDispatch(values, indices, out) case 8: return takeIdxDispatch[int8](values, indices, out) case 16: return takeIdxDispatch[int16](values, indices, out) case 32: return takeIdxDispatch[int32](values, indices, out) case 64: return takeIdxDispatch[int64](values, indices, out) default: return fmt.Errorf("%w: invalid values byte width for take", arrow.ErrInvalid) } } func ChunkedPrimitiveTake(ctx *exec.KernelCtx, batch []*arrow.Chunked, out *exec.ExecResult) ([]*exec.ExecResult, error) { var ( values = batch[0] indices = batch[1] ) if ctx.State.(TakeState).BoundsCheck { if err := checkIndexBoundsChunked(indices, uint64(values.Len())); err != nil { return nil, err } } bitWidth := values.DataType().(arrow.FixedWidthDataType).BitWidth() allocValidity := values.NullN() != 0 || indices.NullN() != 0 outData := make([]*exec.ExecResult, len(indices.Chunks())) for i, chunk := range indices.Chunks() { outData[i] = &exec.ExecResult{Type: out.Type} preallocateData(ctx, int64(chunk.Len()), bitWidth, allocValidity, outData[i]) } switch bitWidth { case 1: return outData, booleanTakeDispatchChunked(values, indices, outData) case 8: return outData, takeIdxChunkedDispatch[int8](values, indices, outData) case 16: return outData, takeIdxChunkedDispatch[int16](values, indices, outData) case 32: return outData, takeIdxChunkedDispatch[int32](values, indices, outData) case 64: return outData, takeIdxChunkedDispatch[int64](values, indices, outData) default: return nil, fmt.Errorf("%w: invalid values byte width for take", arrow.ErrInvalid) } } func NullTake(ctx *exec.KernelCtx, batch *exec.ExecSpan, out *exec.ExecResult) error { if ctx.State.(TakeState).BoundsCheck { if err := checkIndexBounds(&batch.Values[1].Array, uint64(batch.Values[0].Array.Len)); err != nil { return err } } // batch.length doesn't take into account the take indices out.Len = batch.Values[1].Array.Len out.Type = arrow.Null return nil } func NullFilter(ctx *exec.KernelCtx, batch *exec.ExecSpan, out *exec.ExecResult) error { outputLength := getFilterOutputSize(&batch.Values[1].Array, ctx.State.(FilterState).NullSelection) out.Len = outputLength out.Type = arrow.Null return nil } func filterExec(ctx *exec.KernelCtx, outputLen int64, values, selection *exec.ArraySpan, out *exec.ExecResult, visitValid func(idx int64) error, visitNull func() error) error { var ( nullSelection = ctx.State.(FilterState).NullSelection filterData = selection.Buffers[1].Buf filterIsValid = selection.Buffers[0].Buf filterOffset = selection.Offset // we use 3 block counters for fast scanning // // values valid counter: for values null/not-null // filter valid counter: for filter null/not-null // filter counter: for filter true/false valuesIsValid = bitutil.OptionalBitIndexer{Bitmap: values.Buffers[0].Buf, Offset: int(values.Offset)} valuesValidCounter = bitutils.NewOptionalBitBlockCounter(values.Buffers[0].Buf, values.Offset, values.Len) filterValidCounter = bitutils.NewOptionalBitBlockCounter(filterIsValid, filterOffset, selection.Len) filterCounter = bitutils.NewBitBlockCounter(filterData, filterOffset, selection.Len) inPos int64 validityBuilder = validityBuilder{mem: exec.GetAllocator(ctx.Ctx)} ) validityBuilder.Reserve(outputLen) appendNotNull := func(idx int64) error { validityBuilder.UnsafeAppend(true) return visitValid(idx) } appendNull := func() error { validityBuilder.UnsafeAppend(false) return visitNull() } appendMaybeNull := func(idx int64) error { if valuesIsValid.GetBit(int(idx)) { return appendNotNull(idx) } return appendNull() } for inPos < selection.Len { filterValidBlock := filterValidCounter.NextWord() valuesValidBlock := valuesValidCounter.NextWord() filterBlock := filterCounter.NextWord() switch { case filterBlock.NoneSet() && nullSelection == DropNulls: // for this exceedingly common case in low-selectivity filters // we can skip further analysis of the data and move onto the next block inPos += int64(filterBlock.Len) case filterValidBlock.AllSet(): // simpler path: no filter values are null if filterBlock.AllSet() { // fastest path, filter values are all true and not null if valuesValidBlock.AllSet() { // values aren't null either validityBuilder.UnsafeAppendN(int64(filterBlock.Len), true) for i := 0; i < int(filterBlock.Len); i++ { if err := visitValid(inPos); err != nil { return err } inPos++ } } else { // some values are null in this block for i := 0; i < int(filterBlock.Len); i++ { if err := appendMaybeNull(inPos); err != nil { return err } inPos++ } } } else { // !filterBlock.AllSet() // some filter values are false, but all not null if valuesValidBlock.AllSet() { // all the values are not-null, so we can skip null checking for them for i := 0; i < int(filterBlock.Len); i++ { if bitutil.BitIsSet(filterData, int(filterOffset+inPos)) { if err := appendNotNull(inPos); err != nil { return err } } inPos++ } } else { // some of the values in the block are null // gotta check each one :( for i := 0; i < int(filterBlock.Len); i++ { if bitutil.BitIsSet(filterData, int(filterOffset+inPos)) { if err := appendMaybeNull(inPos); err != nil { return err } } inPos++ } } } default: // !filterValidBlock.AllSet() // some filter values are null, so we have to handle drop // versus emit null if nullSelection == DropNulls { // filter null values are treated as false for i := 0; i < int(filterBlock.Len); i++ { if bitutil.BitIsSet(filterIsValid, int(filterOffset+inPos)) && bitutil.BitIsSet(filterData, int(filterOffset+inPos)) { if err := appendMaybeNull(inPos); err != nil { return err } } inPos++ } } else { // filter null values are appended to output as null // whether the value in the corresponding slot is valid // or not var err error for i := 0; i < int(filterBlock.Len); i++ { filterNotNull := bitutil.BitIsSet(filterIsValid, int(filterOffset+inPos)) if filterNotNull && bitutil.BitIsSet(filterData, int(filterOffset+inPos)) { err = appendMaybeNull(inPos) } else if !filterNotNull { // emit null case err = appendNull() } if err != nil { return err } inPos++ } } } } out.Len = int64(validityBuilder.bitLength) out.Nulls = int64(validityBuilder.falseCount) out.Buffers[0].WrapBuffer(validityBuilder.Finish()) return nil } func binaryFilterNonNull[OffsetT int32 | int64](ctx *exec.KernelCtx, values, filter *exec.ArraySpan, outputLen int64, _ NullSelectionBehavior, out *exec.ExecResult) error { var ( offsetBuilder = newBufferBuilder[OffsetT](exec.GetAllocator(ctx.Ctx)) dataBuilder = newBufferBuilder[uint8](exec.GetAllocator(ctx.Ctx)) rawOffsets = exec.GetSpanOffsets[OffsetT](values, 1) rawData = values.Buffers[2].Buf ) offsetBuilder.reserve(int(outputLen) + 1) // get a rough estimate and pre-size the data builder if values.Len > 0 { meanValueLength := float64(rawOffsets[values.Len]-rawOffsets[0]) / float64(values.Len) dataBuilder.reserve(int(meanValueLength * float64(outputLen))) } spaceAvail := dataBuilder.cap() var offset OffsetT filterData := filter.Buffers[1].Buf err := bitutils.VisitSetBitRuns(filterData, filter.Offset, filter.Len, func(pos, length int64) error { start, end := rawOffsets[pos], rawOffsets[pos+length] // bulk-append raw data runDataBytes := (end - start) if runDataBytes > OffsetT(spaceAvail) { dataBuilder.reserve(int(runDataBytes)) spaceAvail = dataBuilder.cap() - dataBuilder.len() } dataBuilder.unsafeAppendSlice(rawData[start:end]) spaceAvail -= int(runDataBytes) curOffset := start for i := int64(0); i < length; i++ { offsetBuilder.unsafeAppend(offset) offset += rawOffsets[i+pos+1] - curOffset curOffset = rawOffsets[i+pos+1] } return nil }) if err != nil { return err } offsetBuilder.unsafeAppend(offset) out.Len = outputLen out.Buffers[1].WrapBuffer(offsetBuilder.finish()) out.Buffers[2].WrapBuffer(dataBuilder.finish()) return nil } func binaryFilterImpl[OffsetT int32 | int64](ctx *exec.KernelCtx, values, filter *exec.ArraySpan, outputLen int64, nullSelection NullSelectionBehavior, out *exec.ExecResult) error { var ( filterData = filter.Buffers[1].Buf filterIsValid = filter.Buffers[0].Buf filterOffset = filter.Offset valuesIsValid = values.Buffers[0].Buf valuesOffset = values.Offset // output bitmap should already be zero'd out so we just // have to set valid bits to true outIsValid = out.Buffers[0].Buf rawOffsets = exec.GetSpanOffsets[OffsetT](values, 1) rawData = values.Buffers[2].Buf offsetBuilder = newBufferBuilder[OffsetT](exec.GetAllocator(ctx.Ctx)) dataBuilder = newBufferBuilder[uint8](exec.GetAllocator(ctx.Ctx)) ) offsetBuilder.reserve(int(outputLen) + 1) if values.Len > 0 { meanValueLength := float64(rawOffsets[values.Len]-rawOffsets[0]) / float64(values.Len) dataBuilder.reserve(int(meanValueLength * float64(outputLen))) } spaceAvail := dataBuilder.cap() var offset OffsetT // we use 3 block counters for fast scanning of the filter // // * valuesValidCounter: for values null/not-null // * filterValidCounter: for filter null/not-null // * filterCounter: for filter true/false valuesValidCounter := bitutils.NewOptionalBitBlockCounter(values.Buffers[0].Buf, values.Offset, values.Len) filterValidCounter := bitutils.NewOptionalBitBlockCounter(filterIsValid, filterOffset, filter.Len) filterCounter := bitutils.NewBitBlockCounter(filterData, filterOffset, filter.Len) inPos, outPos := int64(0), int64(0) appendRaw := func(data []byte) { if len(data) > spaceAvail { dataBuilder.reserve(len(data)) spaceAvail = dataBuilder.cap() - dataBuilder.len() } dataBuilder.unsafeAppendSlice(data) spaceAvail -= len(data) } appendSingle := func() { data := rawData[rawOffsets[inPos]:rawOffsets[inPos+1]] appendRaw(data) offset += OffsetT(len(data)) } for inPos < filter.Len { filterValidBlock, valuesValidBlock := filterValidCounter.NextWord(), valuesValidCounter.NextWord() filterBlock := filterCounter.NextWord() switch { case filterBlock.NoneSet() && nullSelection == DropNulls: // for this exceedingly common case in low-selectivity filters // we can skip further analysis of the data and move on to the // next block inPos += int64(filterBlock.Len) case filterValidBlock.AllSet(): // simpler path: no filter values are null if filterBlock.AllSet() { // fastest path: filter values are all true and not null if valuesValidBlock.AllSet() { // the values aren't null either bitutil.SetBitsTo(outIsValid, outPos, int64(filterBlock.Len), true) // bulk-append raw data start, end := rawOffsets[inPos], rawOffsets[inPos+int64(filterBlock.Len)] appendRaw(rawData[start:end]) // append offsets for i := 0; i < int(filterBlock.Len); i, inPos = i+1, inPos+1 { offsetBuilder.unsafeAppend(offset) offset += rawOffsets[inPos+1] - rawOffsets[inPos] } outPos += int64(filterBlock.Len) } else { // some of the values in this block are null for i := 0; i < int(filterBlock.Len); i, inPos, outPos = i+1, inPos+1, outPos+1 { offsetBuilder.unsafeAppend(offset) if bitutil.BitIsSet(valuesIsValid, int(valuesOffset+inPos)) { bitutil.SetBit(outIsValid, int(outPos)) appendSingle() } } } continue } // !filterBlock.AllSet() // some of the filter values are false, but all not null if valuesValidBlock.AllSet() { // all the values are non-null, so we can skip null checking for i := 0; i < int(filterBlock.Len); i, inPos = i+1, inPos+1 { if bitutil.BitIsSet(filterData, int(filterOffset+inPos)) { offsetBuilder.unsafeAppend(offset) bitutil.SetBit(outIsValid, int(outPos)) outPos++ appendSingle() } } } else { // some of the values in the block are null, so we have to check for i := 0; i < int(filterBlock.Len); i, inPos = i+1, inPos+1 { if bitutil.BitIsSet(filterData, int(filterOffset+inPos)) { offsetBuilder.unsafeAppend(offset) if bitutil.BitIsSet(valuesIsValid, int(valuesOffset+inPos)) { bitutil.SetBit(outIsValid, int(outPos)) appendSingle() } outPos++ } } } default: // !filterValidBlock.AllSet() // some of the filter values are null, so we have to handle // the DROP vs EMIT_NULL null selection behavior if nullSelection == DropNulls { // filter null values are treated as false if valuesValidBlock.AllSet() { for i := 0; i < int(filterBlock.Len); i, inPos = i+1, inPos+1 { if bitutil.BitIsSet(filterIsValid, int(filterOffset+inPos)) && bitutil.BitIsSet(filterData, int(filterOffset+inPos)) { offsetBuilder.unsafeAppend(offset) bitutil.SetBit(outIsValid, int(outPos)) outPos++ appendSingle() } } } else { for i := 0; i < int(filterBlock.Len); i, inPos = i+1, inPos+1 { if bitutil.BitIsSet(filterIsValid, int(filterOffset+inPos)) && bitutil.BitIsSet(filterData, int(filterOffset+inPos)) { offsetBuilder.unsafeAppend(offset) if bitutil.BitIsSet(valuesIsValid, int(valuesOffset+inPos)) { bitutil.SetBit(outIsValid, int(outPos)) appendSingle() } outPos++ } } } } else { for i := 0; i < int(filterBlock.Len); i, inPos = i+1, inPos+1 { filterNotNull := bitutil.BitIsSet(filterIsValid, int(filterOffset+inPos)) if filterNotNull && bitutil.BitIsSet(filterData, int(filterOffset+inPos)) { offsetBuilder.unsafeAppend(offset) if bitutil.BitIsSet(valuesIsValid, int(valuesOffset+inPos)) { bitutil.SetBit(outIsValid, int(outPos)) appendSingle() } outPos++ } else if !filterNotNull { offsetBuilder.unsafeAppend(offset) outPos++ } } } } } offsetBuilder.unsafeAppend(offset) out.Len = outputLen out.Buffers[1].WrapBuffer(offsetBuilder.finish()) out.Buffers[2].WrapBuffer(dataBuilder.finish()) return nil } func takeExecImpl[T arrow.UintType](ctx *exec.KernelCtx, outputLen int64, values, indices *exec.ArraySpan, out *exec.ExecResult, visitValid func(int64) error, visitNull func() error) error { var ( validityBuilder = validityBuilder{mem: exec.GetAllocator(ctx.Ctx)} indicesValues = exec.GetSpanValues[T](indices, 1) isValid = indices.Buffers[0].Buf valuesHaveNulls = values.MayHaveNulls() indicesIsValid = bitutil.OptionalBitIndexer{Bitmap: isValid, Offset: int(indices.Offset)} valuesIsValid = bitutil.OptionalBitIndexer{Bitmap: values.Buffers[0].Buf, Offset: int(values.Offset)} bitCounter = bitutils.NewOptionalBitBlockCounter(isValid, indices.Offset, indices.Len) pos int64 ) validityBuilder.Reserve(outputLen) for pos < indices.Len { block := bitCounter.NextBlock() indicesHaveNulls := block.Popcnt < block.Len if !indicesHaveNulls && !valuesHaveNulls { // fastest path, neither indices nor values have nulls validityBuilder.UnsafeAppendN(int64(block.Len), true) for i := 0; i < int(block.Len); i++ { if err := visitValid(int64(indicesValues[pos])); err != nil { return err } pos++ } } else if block.Popcnt > 0 { // since we have to branch on whether indices are null or not, // we combine the "non-null indices block but some values null" // and "some null indices block but values non-null" into single loop for i := 0; i < int(block.Len); i++ { if (!indicesHaveNulls || indicesIsValid.GetBit(int(pos))) && valuesIsValid.GetBit(int(indicesValues[pos])) { validityBuilder.UnsafeAppend(true) if err := visitValid(int64(indicesValues[pos])); err != nil { return err } } else { validityBuilder.UnsafeAppend(false) if err := visitNull(); err != nil { return err } } pos++ } } else { // the whole block is null validityBuilder.UnsafeAppendN(int64(block.Len), false) for i := 0; i < int(block.Len); i++ { if err := visitNull(); err != nil { return err } } pos += int64(block.Len) } } out.Len = int64(validityBuilder.bitLength) out.Nulls = int64(validityBuilder.falseCount) out.Buffers[0].WrapBuffer(validityBuilder.Finish()) return nil } func takeExec(ctx *exec.KernelCtx, outputLen int64, values, indices *exec.ArraySpan, out *exec.ExecResult, visitValid func(int64) error, visitNull func() error) error { indexWidth := indices.Type.(arrow.FixedWidthDataType).Bytes() switch indexWidth { case 1: return takeExecImpl[uint8](ctx, outputLen, values, indices, out, visitValid, visitNull) case 2: return takeExecImpl[uint16](ctx, outputLen, values, indices, out, visitValid, visitNull) case 4: return takeExecImpl[uint32](ctx, outputLen, values, indices, out, visitValid, visitNull) case 8: return takeExecImpl[uint64](ctx, outputLen, values, indices, out, visitValid, visitNull) default: return fmt.Errorf("%w: invalid index width", arrow.ErrInvalid) } } type selectionOutputFn func(*exec.KernelCtx, int64, *exec.ArraySpan, *exec.ArraySpan, *exec.ExecResult, func(int64) error, func() error) error type selectionImplFn func(*exec.KernelCtx, *exec.ExecSpan, int64, *exec.ExecResult, selectionOutputFn) error func FilterExec(impl selectionImplFn) exec.ArrayKernelExec { return func(ctx *exec.KernelCtx, batch *exec.ExecSpan, out *exec.ExecResult) error { var ( selection = &batch.Values[1].Array outputLength = getFilterOutputSize(selection, ctx.State.(FilterState).NullSelection) ) return impl(ctx, batch, outputLength, out, filterExec) } } func TakeExec(impl selectionImplFn) exec.ArrayKernelExec { return func(ctx *exec.KernelCtx, batch *exec.ExecSpan, out *exec.ExecResult) error { if ctx.State.(TakeState).BoundsCheck { if err := checkIndexBounds(&batch.Values[1].Array, uint64(batch.Values[0].Array.Len)); err != nil { return err } } return impl(ctx, batch, batch.Values[1].Array.Len, out, takeExec) } } func VarBinaryImpl[OffsetT int32 | int64](ctx *exec.KernelCtx, batch *exec.ExecSpan, outputLength int64, out *exec.ExecResult, fn selectionOutputFn) error { var ( values = &batch.Values[0].Array selection = &batch.Values[1].Array rawOffsets = exec.GetSpanOffsets[OffsetT](values, 1) rawData = values.Buffers[2].Buf offsetBuilder = newBufferBuilder[OffsetT](exec.GetAllocator(ctx.Ctx)) dataBuilder = newBufferBuilder[uint8](exec.GetAllocator(ctx.Ctx)) ) // presize the data builder with a rough estimate of the required data size if values.Len > 0 { dataLength := rawOffsets[values.Len] - rawOffsets[0] meanValueLen := float64(dataLength) / float64(values.Len) dataBuilder.reserve(int(meanValueLen)) } offsetBuilder.reserve(int(outputLength) + 1) spaceAvail := dataBuilder.cap() var offset OffsetT err := fn(ctx, outputLength, values, selection, out, func(idx int64) error { offsetBuilder.unsafeAppend(offset) valOffset := rawOffsets[idx] valSize := rawOffsets[idx+1] - valOffset if valSize == 0 { return nil } offset += valSize if valSize > OffsetT(spaceAvail) { dataBuilder.reserve(int(valSize)) spaceAvail = dataBuilder.cap() - dataBuilder.len() } dataBuilder.unsafeAppendSlice(rawData[valOffset : valOffset+valSize]) spaceAvail -= int(valSize) return nil }, func() error { offsetBuilder.unsafeAppend(offset) return nil }) if err != nil { return err } offsetBuilder.unsafeAppend(offset) out.Buffers[1].WrapBuffer(offsetBuilder.finish()) out.Buffers[2].WrapBuffer(dataBuilder.finish()) return nil } func FSBImpl(ctx *exec.KernelCtx, batch *exec.ExecSpan, outputLength int64, out *exec.ExecResult, fn selectionOutputFn) error { var ( values = &batch.Values[0].Array selection = &batch.Values[1].Array valueSize = int64(values.Type.(arrow.FixedWidthDataType).Bytes()) valueData = values.Buffers[1].Buf[values.Offset*valueSize:] ) out.Buffers[1].WrapBuffer(ctx.Allocate(int(valueSize * outputLength))) buf := out.Buffers[1].Buf err := fn(ctx, outputLength, values, selection, out, func(idx int64) error { start := idx * int64(valueSize) copy(buf, valueData[start:start+valueSize]) buf = buf[valueSize:] return nil }, func() error { buf = buf[valueSize:] return nil }) if err != nil { out.Buffers[1].Buf = nil out.Buffers[1].Owner.Release() out.Buffers[1].Owner = nil return err } return nil } func ListImpl[OffsetT int32 | int64](ctx *exec.KernelCtx, batch *exec.ExecSpan, outputLength int64, out *exec.ExecResult, fn selectionOutputFn) error { var ( values = &batch.Values[0].Array selection = &batch.Values[1].Array rawOffsets = exec.GetSpanOffsets[OffsetT](values, 1) mem = exec.GetAllocator(ctx.Ctx) offsetBuilder = newBufferBuilder[OffsetT](mem) childIdxBuilder = newBufferBuilder[OffsetT](mem) ) if values.Len > 0 { dataLength := rawOffsets[values.Len] - rawOffsets[0] meanListLen := float64(dataLength) / float64(values.Len) childIdxBuilder.reserve(int(meanListLen)) } offsetBuilder.reserve(int(outputLength) + 1) var offset OffsetT err := fn(ctx, outputLength, values, selection, out, func(idx int64) error { offsetBuilder.unsafeAppend(offset) valueOffset := rawOffsets[idx] valueLength := rawOffsets[idx+1] - valueOffset offset += valueLength childIdxBuilder.reserve(int(valueLength)) for j := valueOffset; j < valueOffset+valueLength; j++ { childIdxBuilder.unsafeAppend(j) } return nil }, func() error { offsetBuilder.unsafeAppend(offset) return nil }) if err != nil { return err } offsetBuilder.unsafeAppend(offset) out.Buffers[1].WrapBuffer(offsetBuilder.finish()) out.Children = make([]exec.ArraySpan, 1) out.Children[0].Type = arrow.GetDataType[OffsetT]() out.Children[0].Len = int64(childIdxBuilder.len()) out.Children[0].Buffers[1].WrapBuffer(childIdxBuilder.finish()) return nil } func FSLImpl(ctx *exec.KernelCtx, batch *exec.ExecSpan, outputLength int64, out *exec.ExecResult, fn selectionOutputFn) error { var ( values = &batch.Values[0].Array selection = &batch.Values[1].Array listSize = values.Type.(*arrow.FixedSizeListType).Len() baseOffset = values.Offset childIdxBuilder = array.NewInt64Builder(exec.GetAllocator(ctx.Ctx)) ) // we need to take listSize elements even for null elements of indices childIdxBuilder.Reserve(int(outputLength) * int(listSize)) err := fn(ctx, outputLength, values, selection, out, func(idx int64) error { offset := (baseOffset + idx) * int64(listSize) for j := offset; j < (offset + int64(listSize)); j++ { childIdxBuilder.UnsafeAppend(j) } return nil }, func() error { for n := int32(0); n < listSize; n++ { childIdxBuilder.AppendNull() } return nil }) if err != nil { return err } arr := childIdxBuilder.NewArray() defer arr.Release() out.Children = make([]exec.ArraySpan, 1) out.Children[0].TakeOwnership(arr.Data()) return nil } func DenseUnionImpl(ctx *exec.KernelCtx, batch *exec.ExecSpan, outputLength int64, out *exec.ExecResult, fn selectionOutputFn) error { var ( values = &batch.Values[0].Array selection = &batch.Values[1].Array mem = exec.GetAllocator(ctx.Ctx) valueOffsetBldr = newBufferBuilder[int32](mem) childIdBldr = newBufferBuilder[int8](mem) typeCodes = values.Type.(arrow.UnionType).TypeCodes() childIndicesBldrs = make([]*array.Int32Builder, len(typeCodes)) ) for i := range childIndicesBldrs { childIndicesBldrs[i] = array.NewInt32Builder(mem) } childIdBldr.reserve(int(outputLength)) valueOffsetBldr.reserve(int(outputLength)) typedValues := values.MakeArray().(*array.DenseUnion) defer typedValues.Release() err := fn(ctx, outputLength, values, selection, out, func(idx int64) error { childID := typedValues.ChildID(int(idx)) childIdBldr.unsafeAppend(typeCodes[childID]) valueOffset := typedValues.ValueOffset(int(idx)) valueOffsetBldr.unsafeAppend(int32(childIndicesBldrs[childID].Len())) childIndicesBldrs[childID].Append(valueOffset) return nil }, func() error { childID := 0 childIdBldr.unsafeAppend(typeCodes[childID]) valueOffsetBldr.unsafeAppend(int32(childIndicesBldrs[childID].Len())) childIndicesBldrs[childID].AppendNull() return nil }) if err != nil { return err } out.Type = typedValues.DataType() out.Buffers[1].WrapBuffer(childIdBldr.finish()) out.Buffers[2].WrapBuffer(valueOffsetBldr.finish()) out.Children = make([]exec.ArraySpan, len(childIndicesBldrs)) for i, b := range childIndicesBldrs { arr := b.NewArray() out.Children[i].TakeOwnership(arr.Data()) arr.Release() b.Release() } return nil } func FilterBinary(ctx *exec.KernelCtx, batch *exec.ExecSpan, out *exec.ExecResult) error { var ( nullSelect = ctx.State.(FilterState).NullSelection values = &batch.Values[0].Array filter = &batch.Values[1].Array outputLen = getFilterOutputSize(filter, nullSelect) ) // the output precomputed null count is unknown except in the // narrow condition that all the values are non-null and the filter // will not cause any new nulls to be created if values.Nulls == 0 && (nullSelect == DropNulls || filter.Nulls == 0) { out.Nulls = 0 } else { out.Nulls = array.UnknownNullCount } typeID := values.Type.ID() if values.Nulls == 0 && filter.Nulls == 0 { // faster no nulls case switch { case arrow.IsBinaryLike(typeID): return binaryFilterNonNull[int32](ctx, values, filter, outputLen, nullSelect, out) case arrow.IsLargeBinaryLike(typeID): return binaryFilterNonNull[int64](ctx, values, filter, outputLen, nullSelect, out) default: return fmt.Errorf("%w: invalid type for binary filter", arrow.ErrInvalid) } } // output may have nulls out.Buffers[0].WrapBuffer(ctx.AllocateBitmap(outputLen)) switch { case arrow.IsBinaryLike(typeID): return binaryFilterImpl[int32](ctx, values, filter, outputLen, nullSelect, out) case arrow.IsLargeBinaryLike(typeID): return binaryFilterImpl[int64](ctx, values, filter, outputLen, nullSelect, out) } return fmt.Errorf("%w: invalid type for binary filter", arrow.ErrInvalid) } func visitNoop() error { return nil } func visitIdxNoop(int64) error { return nil } func StructImpl(ctx *exec.KernelCtx, batch *exec.ExecSpan, outputLength int64, out *exec.ExecResult, fn selectionOutputFn) error { var ( values = &batch.Values[0].Array selection = &batch.Values[1].Array ) // nothing we need to do other than generate the validity bitmap return fn(ctx, outputLength, values, selection, out, visitIdxNoop, visitNoop) } type SelectionKernelData struct { In exec.InputType Exec exec.ArrayKernelExec Chunked exec.ChunkedExec } func ChunkedTakeSupported(dt arrow.DataType) bool { return arrow.IsPrimitive(dt.ID()) } func GetVectorSelectionKernels() (filterkernels, takeKernels []SelectionKernelData) { filterkernels = []SelectionKernelData{ {In: exec.NewMatchedInput(exec.Primitive()), Exec: PrimitiveFilter}, {In: exec.NewExactInput(arrow.Null), Exec: NullFilter}, {In: exec.NewIDInput(arrow.DECIMAL128), Exec: FilterExec(FSBImpl)}, {In: exec.NewIDInput(arrow.DECIMAL256), Exec: FilterExec(FSBImpl)}, {In: exec.NewIDInput(arrow.FIXED_SIZE_BINARY), Exec: FilterExec(FSBImpl)}, {In: exec.NewMatchedInput(exec.BinaryLike()), Exec: FilterBinary}, {In: exec.NewMatchedInput(exec.LargeBinaryLike()), Exec: FilterBinary}, } takeKernels = []SelectionKernelData{ {In: exec.NewExactInput(arrow.Null), Exec: NullTake}, {In: exec.NewMatchedInput(exec.Primitive()), Exec: PrimitiveTake, Chunked: ChunkedPrimitiveTake}, {In: exec.NewIDInput(arrow.DECIMAL128), Exec: TakeExec(FSBImpl)}, {In: exec.NewIDInput(arrow.DECIMAL256), Exec: TakeExec(FSBImpl)}, {In: exec.NewIDInput(arrow.FIXED_SIZE_BINARY), Exec: TakeExec(FSBImpl)}, {In: exec.NewMatchedInput(exec.BinaryLike()), Exec: TakeExec(VarBinaryImpl[int32])}, {In: exec.NewMatchedInput(exec.LargeBinaryLike()), Exec: TakeExec(VarBinaryImpl[int64])}, } return }