arrow/compute/exec/utils.go (196 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. //go:build go1.18 package exec import ( "fmt" "math" "sync/atomic" "unsafe" "github.com/apache/arrow-go/v18/arrow" "github.com/apache/arrow-go/v18/arrow/array" "github.com/apache/arrow-go/v18/arrow/bitutil" "github.com/apache/arrow-go/v18/arrow/memory" "golang.org/x/exp/constraints" "golang.org/x/exp/slices" ) // GetSpanValues returns a properly typed slice by reinterpreting // the buffer at index i using unsafe.Slice. This will take into account // the offset of the given ArraySpan. func GetSpanValues[T arrow.FixedWidthType](span *ArraySpan, i int) []T { if len(span.Buffers[i].Buf) == 0 { return nil } ret := unsafe.Slice((*T)(unsafe.Pointer(&span.Buffers[i].Buf[0])), span.Offset+span.Len) return ret[span.Offset:] } // GetSpanOffsets is like GetSpanValues, except it is only for int32 // or int64 and adds the additional 1 expected value for an offset // buffer (ie. len(output) == span.Len+1) func GetSpanOffsets[T int32 | int64](span *ArraySpan, i int) []T { ret := unsafe.Slice((*T)(unsafe.Pointer(&span.Buffers[i].Buf[0])), span.Offset+span.Len+1) return ret[span.Offset:] } func Min[T constraints.Ordered](a, b T) T { if a < b { return a } return b } func Max[T constraints.Ordered](a, b T) T { if a > b { return a } return b } // OptionsInit should be used in the case where a KernelState is simply // represented with a specific type by value (instead of pointer). // This will initialize the KernelState as a value-copied instance of // the passed in function options argument to ensure separation // and allow the kernel to manipulate the options if necessary without // any negative consequences since it will have its own copy of the options. func OptionsInit[T any](_ *KernelCtx, args KernelInitArgs) (KernelState, error) { if opts, ok := args.Options.(*T); ok { return *opts, nil } return nil, fmt.Errorf("%w: attempted to initialize kernel state from invalid function options", arrow.ErrInvalid) } type arrayBuilder[T arrow.NumericType | bool] interface { array.Builder Append(T) AppendValues([]T, []bool) } func ArrayFromSlice[T arrow.NumericType | bool](mem memory.Allocator, data []T) arrow.Array { bldr := array.NewBuilder(mem, arrow.GetDataType[T]()).(arrayBuilder[T]) defer bldr.Release() bldr.AppendValues(data, nil) return bldr.NewArray() } func ArrayFromSliceWithValid[T arrow.NumericType | bool](mem memory.Allocator, data []T, valid []bool) arrow.Array { bldr := array.NewBuilder(mem, arrow.GetDataType[T]()).(arrayBuilder[T]) defer bldr.Release() bldr.AppendValues(data, valid) return bldr.NewArray() } func RechunkArraysConsistently(groups [][]arrow.Array) [][]arrow.Array { if len(groups) <= 1 { return groups } var totalLen int for _, a := range groups[0] { totalLen += a.Len() } if totalLen == 0 { return groups } rechunked := make([][]arrow.Array, len(groups)) offsets := make([]int64, len(groups)) // scan all array vectors at once, rechunking along the way var start int64 for start < int64(totalLen) { // first compute max possible length for next chunk var chunkLength int64 = math.MaxInt64 for i, g := range groups { offset := offsets[i] // skip any done arrays including 0-length for offset == int64(g[0].Len()) { g = g[1:] offset = 0 } arr := g[0] chunkLength = Min(chunkLength, int64(arr.Len())-offset) offsets[i] = offset groups[i] = g } // now slice all the arrays along this chunk size for i, g := range groups { offset := offsets[i] arr := g[0] if offset == 0 && int64(arr.Len()) == chunkLength { // slice spans entire array arr.Retain() rechunked[i] = append(rechunked[i], arr) } else { rechunked[i] = append(rechunked[i], array.NewSlice(arr, int64(offset), int64(offset+chunkLength))) } offsets[i] += chunkLength } start += int64(chunkLength) } return rechunked } type ChunkResolver struct { offsets []int64 cached atomic.Int64 } func NewChunkResolver(chunks []arrow.Array) *ChunkResolver { offsets := make([]int64, len(chunks)+1) var offset int64 for i, c := range chunks { curOffset := offset offset += int64(c.Len()) offsets[i] = curOffset } offsets[len(chunks)] = offset return &ChunkResolver{offsets: offsets} } func (c *ChunkResolver) Resolve(idx int64) (chunk, index int64) { // some algorithms consecutively access indexes that are a // relatively small distance from each other, falling into // the same chunk. // This is trivial when merging (assuming each side of the // merge uses its own resolver), but also in the inner // recursive invocations of partitioning. if len(c.offsets) <= 1 { return 0, idx } cached := c.cached.Load() cacheHit := idx >= c.offsets[cached] && idx < c.offsets[cached+1] if cacheHit { return cached, idx - c.offsets[cached] } chkIdx, found := slices.BinarySearch(c.offsets, idx) if !found { chkIdx-- } chunk, index = int64(chkIdx), idx-c.offsets[chkIdx] c.cached.Store(chunk) return } type arrayTypes interface { arrow.FixedWidthType | arrow.TemporalType | bool | string | []byte } type ArrayIter[T arrayTypes] interface { Next() T } type BoolIter struct { Rdr *bitutil.BitmapReader } func NewBoolIter(arr *ArraySpan) ArrayIter[bool] { return &BoolIter{ Rdr: bitutil.NewBitmapReader(arr.Buffers[1].Buf, int(arr.Offset), int(arr.Len)), } } func (b *BoolIter) Next() (out bool) { out = b.Rdr.Set() b.Rdr.Next() return } type PrimitiveIter[T arrow.FixedWidthType] struct { Values []T } func NewPrimitiveIter[T arrow.FixedWidthType](arr *ArraySpan) ArrayIter[T] { return &PrimitiveIter[T]{Values: GetSpanValues[T](arr, 1)} } func (p *PrimitiveIter[T]) Next() (v T) { v = p.Values[0] p.Values = p.Values[1:] return } type VarBinaryIter[OffsetT int32 | int64] struct { Offsets []OffsetT Data []byte Pos int64 } func NewVarBinaryIter[OffsetT int32 | int64](arr *ArraySpan) ArrayIter[[]byte] { return &VarBinaryIter[OffsetT]{ Offsets: GetSpanOffsets[OffsetT](arr, 1), Data: arr.Buffers[2].Buf, } } func (v *VarBinaryIter[OffsetT]) Next() []byte { cur := v.Pos v.Pos++ return v.Data[v.Offsets[cur]:v.Offsets[v.Pos]] } type FSBIter struct { Data []byte Width int Pos int64 } func NewFSBIter(arr *ArraySpan) ArrayIter[[]byte] { return &FSBIter{ Data: arr.Buffers[1].Buf, Width: arr.Type.(arrow.FixedWidthDataType).Bytes(), } } func (f *FSBIter) Next() []byte { start := f.Width * int(f.Pos) f.Pos++ return f.Data[start : start+f.Width] }