arrow/compute/executor.go (832 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//go:build go1.18
package compute
import (
"context"
"errors"
"fmt"
"math"
"runtime"
"sync"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/bitutil"
"github.com/apache/arrow-go/v18/arrow/compute/exec"
"github.com/apache/arrow-go/v18/arrow/internal"
"github.com/apache/arrow-go/v18/arrow/internal/debug"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/apache/arrow-go/v18/arrow/scalar"
)
// ExecCtx holds simple contextual information for execution
// such as the default ChunkSize for batch iteration, whether or not
// to ensure contiguous preallocations for kernels that want preallocation,
// and a reference to the desired function registry to use.
//
// An ExecCtx should be placed into a context.Context by using
// SetExecCtx and GetExecCtx to pass it along for execution.
type ExecCtx struct {
// ChunkSize is the size used when iterating batches for execution
// ChunkSize elements will be operated on as a time unless an argument
// is a chunkedarray with a chunk that is smaller
ChunkSize int64
// PreallocContiguous determines whether preallocating memory for
// execution of compute attempts to preallocate a full contiguous
// buffer for all of the chunks beforehand.
PreallocContiguous bool
// Registry allows specifying the Function Registry to utilize
// when searching for kernel implementations.
Registry FunctionRegistry
// ExecChannelSize is the size of the channel used for passing
// exec results to the WrapResults function.
ExecChannelSize int
// NumParallel determines the number of parallel goroutines
// allowed for parallel executions.
NumParallel int
}
type ctxExecKey struct{}
const DefaultMaxChunkSize = math.MaxInt64
var (
// global default ExecCtx object, initialized with the
// default max chunk size, contiguous preallocations, and
// the default function registry.
defaultExecCtx ExecCtx
// WithAllocator returns a new context with the provided allocator
// embedded into the context.
WithAllocator = exec.WithAllocator
// GetAllocator retrieves the allocator from the context, or returns
// memory.DefaultAllocator if there was no allocator in the provided
// context.
GetAllocator = exec.GetAllocator
)
// DefaultExecCtx returns the default exec context which will be used
// if there is no ExecCtx set into the context for execution.
//
// This can be called to get a copy of the default values which can
// then be modified to set into a context.
//
// The default exec context uses the following values:
// - ChunkSize = DefaultMaxChunkSize (MaxInt64)
// - PreallocContiguous = true
// - Registry = GetFunctionRegistry()
// - ExecChannelSize = 10
// - NumParallel = runtime.NumCPU()
func DefaultExecCtx() ExecCtx { return defaultExecCtx }
func init() {
defaultExecCtx.ChunkSize = DefaultMaxChunkSize
defaultExecCtx.PreallocContiguous = true
defaultExecCtx.Registry = GetFunctionRegistry()
defaultExecCtx.ExecChannelSize = 10
// default level of parallelism
// set to 1 to disable parallelization
defaultExecCtx.NumParallel = runtime.NumCPU()
}
// SetExecCtx returns a new child context containing the passed in ExecCtx
func SetExecCtx(ctx context.Context, e ExecCtx) context.Context {
return context.WithValue(ctx, ctxExecKey{}, e)
}
// GetExecCtx returns an embedded ExecCtx from the provided context.
// If it does not contain an ExecCtx, then the default one is returned.
func GetExecCtx(ctx context.Context) ExecCtx {
e, ok := ctx.Value(ctxExecKey{}).(ExecCtx)
if ok {
return e
}
return defaultExecCtx
}
// ExecBatch is a unit of work for kernel execution. It contains a collection
// of Array and Scalar values.
//
// ExecBatch is semantically similar to a RecordBatch but for a SQL-style
// execution context. It represents a collection or records, but constant
// "columns" are represented by Scalar values rather than having to be
// converted into arrays with repeated values.
type ExecBatch struct {
Values []Datum
// Guarantee is a predicate Expression guaranteed to evaluate to true for
// all rows in this batch.
// Guarantee Expression
// Len is the semantic length of this ExecBatch. When the values are
// all scalars, the length should be set to 1 for non-aggregate kernels.
// Otherwise the length is taken from the array values. Aggregate kernels
// can have an ExecBatch formed by projecting just the partition columns
// from a batch in which case it would have scalar rows with length > 1
//
// If the array values are of length 0, then the length is 0 regardless of
// whether any values are Scalar.
Len int64
}
func (e ExecBatch) NumValues() int { return len(e.Values) }
// simple struct for defining how to preallocate a particular buffer.
type bufferPrealloc struct {
bitWidth int
addLen int
}
func allocateDataBuffer(ctx *exec.KernelCtx, length, bitWidth int) *memory.Buffer {
switch bitWidth {
case 1:
return ctx.AllocateBitmap(int64(length))
default:
bufsiz := int(bitutil.BytesForBits(int64(length * bitWidth)))
return ctx.Allocate(bufsiz)
}
}
func addComputeDataPrealloc(dt arrow.DataType, widths []bufferPrealloc) []bufferPrealloc {
if typ, ok := dt.(arrow.FixedWidthDataType); ok {
return append(widths, bufferPrealloc{bitWidth: typ.BitWidth()})
}
switch dt.ID() {
case arrow.BINARY, arrow.STRING, arrow.LIST, arrow.MAP:
return append(widths, bufferPrealloc{bitWidth: 32, addLen: 1})
case arrow.LARGE_BINARY, arrow.LARGE_STRING, arrow.LARGE_LIST:
return append(widths, bufferPrealloc{bitWidth: 64, addLen: 1})
case arrow.STRING_VIEW, arrow.BINARY_VIEW:
return append(widths, bufferPrealloc{bitWidth: arrow.ViewHeaderSizeBytes * 8})
}
return widths
}
// enum to define a generalized assumption of the nulls in the inputs
type nullGeneralization int8
const (
nullGenPerhapsNull nullGeneralization = iota
nullGenAllValid
nullGenAllNull
)
func getNullGen(val *exec.ExecValue) nullGeneralization {
dtID := val.Type().ID()
switch {
case dtID == arrow.NULL:
return nullGenAllNull
case !internal.DefaultHasValidityBitmap(dtID):
return nullGenAllValid
case val.IsScalar():
if val.Scalar.IsValid() {
return nullGenAllValid
}
return nullGenAllNull
default:
arr := val.Array
// do not count if they haven't been counted already
if arr.Nulls == 0 || arr.Buffers[0].Buf == nil {
return nullGenAllValid
}
if arr.Nulls == arr.Len {
return nullGenAllNull
}
}
return nullGenPerhapsNull
}
func getNullGenDatum(datum Datum) nullGeneralization {
var val exec.ExecValue
switch datum.Kind() {
case KindArray:
val.Array.SetMembers(datum.(*ArrayDatum).Value)
case KindScalar:
val.Scalar = datum.(*ScalarDatum).Value
case KindChunked:
return nullGenPerhapsNull
default:
debug.Assert(false, "should be array, scalar, or chunked!")
return nullGenPerhapsNull
}
return getNullGen(&val)
}
// populate the validity bitmaps with the intersection of the nullity
// of the arguments. If a preallocated bitmap is not provided, then one
// will be allocated if needed (in some cases a bitmap can be zero-copied
// from the arguments). If any Scalar value is null, then the entire
// validity bitmap will be set to null.
func propagateNulls(ctx *exec.KernelCtx, batch *exec.ExecSpan, out *exec.ArraySpan) (err error) {
if out.Type.ID() == arrow.NULL {
// null output type is a no-op (rare but it happens)
return
}
// this function is ONLY able to write into output with non-zero offset
// when the bitmap is preallocated.
if out.Offset != 0 && out.Buffers[0].Buf == nil {
return fmt.Errorf("%w: can only propagate nulls into pre-allocated memory when output offset is non-zero", arrow.ErrInvalid)
}
var (
arrsWithNulls = make([]*exec.ArraySpan, 0, len(batch.Values))
isAllNull bool
prealloc bool = out.Buffers[0].Buf != nil
)
for i := range batch.Values {
v := &batch.Values[i]
nullGen := getNullGen(v)
if nullGen == nullGenAllNull {
isAllNull = true
}
if nullGen != nullGenAllValid && v.IsArray() {
arrsWithNulls = append(arrsWithNulls, &v.Array)
}
}
outBitmap := out.Buffers[0].Buf
if isAllNull {
// an all-null value gives us a short circuit opportunity
// output should all be null
out.Nulls = out.Len
if prealloc {
bitutil.SetBitsTo(outBitmap, out.Offset, out.Len, false)
return
}
// walk all the values with nulls instead of breaking on the first
// in case we find a bitmap that can be reused in the non-preallocated case
for _, arr := range arrsWithNulls {
if arr.Nulls == arr.Len && arr.Buffers[0].Owner != nil {
buf := arr.GetBuffer(0)
buf.Retain()
out.Buffers[0].Buf = buf.Bytes()
out.Buffers[0].Owner = buf
return
}
}
buf := ctx.AllocateBitmap(int64(out.Len))
out.Buffers[0].Owner = buf
out.Buffers[0].Buf = buf.Bytes()
out.Buffers[0].SelfAlloc = true
bitutil.SetBitsTo(out.Buffers[0].Buf, out.Offset, out.Len, false)
return
}
out.Nulls = array.UnknownNullCount
switch len(arrsWithNulls) {
case 0:
out.Nulls = 0
if prealloc {
bitutil.SetBitsTo(outBitmap, out.Offset, out.Len, true)
}
case 1:
arr := arrsWithNulls[0]
out.Nulls = arr.Nulls
if prealloc {
bitutil.CopyBitmap(arr.Buffers[0].Buf, int(arr.Offset), int(arr.Len), outBitmap, int(out.Offset))
return
}
switch {
case arr.Offset == 0:
out.Buffers[0] = arr.Buffers[0]
out.Buffers[0].Owner.Retain()
case arr.Offset%8 == 0:
buf := memory.SliceBuffer(arr.GetBuffer(0), int(arr.Offset)/8, int(bitutil.BytesForBits(arr.Len)))
out.Buffers[0].Buf = buf.Bytes()
out.Buffers[0].Owner = buf
default:
buf := ctx.AllocateBitmap(int64(out.Len))
out.Buffers[0].Owner = buf
out.Buffers[0].Buf = buf.Bytes()
out.Buffers[0].SelfAlloc = true
bitutil.CopyBitmap(arr.Buffers[0].Buf, int(arr.Offset), int(arr.Len), out.Buffers[0].Buf, 0)
}
return
default:
if !prealloc {
buf := ctx.AllocateBitmap(int64(out.Len))
out.Buffers[0].Owner = buf
out.Buffers[0].Buf = buf.Bytes()
out.Buffers[0].SelfAlloc = true
outBitmap = out.Buffers[0].Buf
}
acc := func(left, right *exec.ArraySpan) {
debug.Assert(left.Buffers[0].Buf != nil, "invalid intersection for null propagation")
debug.Assert(right.Buffers[0].Buf != nil, "invalid intersection for null propagation")
bitutil.BitmapAnd(left.Buffers[0].Buf, right.Buffers[0].Buf, left.Offset, right.Offset, outBitmap, out.Offset, out.Len)
}
acc(arrsWithNulls[0], arrsWithNulls[1])
for _, arr := range arrsWithNulls[2:] {
acc(out, arr)
}
}
return
}
func inferBatchLength(values []Datum) (length int64, allSame bool) {
length, allSame = -1, true
areAllScalar := true
for _, arg := range values {
switch arg := arg.(type) {
case *ArrayDatum:
argLength := arg.Len()
if length < 0 {
length = argLength
} else {
if length != argLength {
allSame = false
return
}
}
areAllScalar = false
case *ChunkedDatum:
argLength := arg.Len()
if length < 0 {
length = argLength
} else {
if length != argLength {
allSame = false
return
}
}
areAllScalar = false
}
}
if areAllScalar && len(values) > 0 {
length = 1
} else if length < 0 {
length = 0
}
allSame = true
return
}
// KernelExecutor is the interface for all executors to initialize and
// call kernel execution functions on batches.
type KernelExecutor interface {
// Init must be called *after* the kernel's init method and any
// KernelState must be set into the KernelCtx *before* calling
// this Init method. This is to facilitate the case where
// Init may be expensive and does not need to be called
// again for each execution of the kernel. For example,
// the same lookup table can be re-used for all scanned batches
// in a dataset filter.
Init(*exec.KernelCtx, exec.KernelInitArgs) error
// Execute the kernel for the provided batch and pass the resulting
// Datum values to the provided channel.
Execute(context.Context, *ExecBatch, chan<- Datum) error
// WrapResults exists for the case where an executor wants to post process
// the batches of result datums. Such as creating a ChunkedArray from
// multiple output batches or so on. Results from individual batch
// executions should be read from the out channel, and WrapResults should
// return the final Datum result.
WrapResults(ctx context.Context, out <-chan Datum, chunkedArgs bool) Datum
// CheckResultType checks the actual result type against the resolved
// output type. If the types don't match an error is returned
CheckResultType(out Datum) error
// Clear resets the state in the executor so that it can be reused.
Clear()
}
// the base implementation for executing non-aggregate kernels.
type nonAggExecImpl struct {
ctx *exec.KernelCtx
ectx ExecCtx
kernel exec.NonAggKernel
outType arrow.DataType
numOutBuf int
dataPrealloc []bufferPrealloc
preallocValidity bool
}
func (e *nonAggExecImpl) Clear() {
e.ctx, e.kernel, e.outType = nil, nil, nil
if e.dataPrealloc != nil {
e.dataPrealloc = e.dataPrealloc[:0]
}
}
func (e *nonAggExecImpl) Init(ctx *exec.KernelCtx, args exec.KernelInitArgs) (err error) {
e.ctx, e.kernel = ctx, args.Kernel.(exec.NonAggKernel)
e.outType, err = e.kernel.GetSig().OutType.Resolve(ctx, args.Inputs)
e.ectx = GetExecCtx(ctx.Ctx)
return
}
func (e *nonAggExecImpl) prepareOutput(length int) *exec.ExecResult {
var nullCount int = array.UnknownNullCount
if e.kernel.GetNullHandling() == exec.NullNoOutput {
nullCount = 0
}
output := &exec.ArraySpan{
Type: e.outType,
Len: int64(length),
Nulls: int64(nullCount),
}
if e.preallocValidity {
buf := e.ctx.AllocateBitmap(int64(length))
output.Buffers[0].Owner = buf
output.Buffers[0].Buf = buf.Bytes()
output.Buffers[0].SelfAlloc = true
}
for i, pre := range e.dataPrealloc {
if pre.bitWidth >= 0 {
buf := allocateDataBuffer(e.ctx, length+pre.addLen, pre.bitWidth)
output.Buffers[i+1].Owner = buf
output.Buffers[i+1].Buf = buf.Bytes()
output.Buffers[i+1].SelfAlloc = true
}
}
return output
}
func (e *nonAggExecImpl) CheckResultType(out Datum) error {
typ := out.(ArrayLikeDatum).Type()
if typ != nil && !arrow.TypeEqual(e.outType, typ) {
return fmt.Errorf("%w: kernel type result mismatch: declared as %s, actual is %s",
arrow.ErrType, e.outType, typ)
}
return nil
}
type spanIterator func() (exec.ExecSpan, int64, bool)
func NewScalarExecutor() KernelExecutor { return &scalarExecutor{} }
type scalarExecutor struct {
nonAggExecImpl
elideValidityBitmap bool
preallocAllBufs bool
preallocContiguous bool
allScalars bool
iter spanIterator
iterLen int64
}
func (s *scalarExecutor) Execute(ctx context.Context, batch *ExecBatch, data chan<- Datum) (err error) {
s.allScalars, s.iter, err = iterateExecSpans(batch, s.ectx.ChunkSize, true)
if err != nil {
return
}
s.iterLen = batch.Len
if batch.Len == 0 {
result := array.MakeArrayOfNull(exec.GetAllocator(s.ctx.Ctx), s.outType, 0)
defer result.Release()
out := &exec.ArraySpan{}
out.SetMembers(result.Data())
return s.emitResult(out, data)
}
if err = s.setupPrealloc(batch.Len, batch.Values); err != nil {
return
}
return s.executeSpans(data)
}
func (s *scalarExecutor) WrapResults(ctx context.Context, out <-chan Datum, hasChunked bool) Datum {
var (
output Datum
acc []arrow.Array
)
toChunked := func() {
acc = output.(ArrayLikeDatum).Chunks()
output.Release()
output = nil
}
// get first output
select {
case <-ctx.Done():
return nil
case output = <-out:
// if the inputs contained at least one chunked array
// then we want to return chunked output
if hasChunked {
toChunked()
}
}
for {
select {
case <-ctx.Done():
// context is done, either cancelled or a timeout.
// either way, we end early and return what we've got so far.
return output
case o, ok := <-out:
if !ok { // channel closed, wrap it up
if output != nil {
return output
}
for _, c := range acc {
defer c.Release()
}
chkd := arrow.NewChunked(s.outType, acc)
defer chkd.Release()
return NewDatum(chkd)
}
// if we get multiple batches of output, then we need
// to return it as a chunked array.
if acc == nil {
toChunked()
}
defer o.Release()
if o.Len() == 0 { // skip any empty batches
continue
}
acc = append(acc, o.(*ArrayDatum).MakeArray())
}
}
}
func (s *scalarExecutor) executeSpans(data chan<- Datum) (err error) {
defer func() {
err = errors.Join(err, s.kernel.Cleanup())
}()
var (
input exec.ExecSpan
output exec.ExecResult
next bool
)
if s.preallocContiguous {
// make one big output alloc
output := s.prepareOutput(int(s.iterLen))
output.Offset = 0
var resultOffset int64
var nextOffset int64
for err == nil {
if input, nextOffset, next = s.iter(); !next {
break
}
output.SetSlice(resultOffset, input.Len)
err = s.executeSingleSpan(&input, output)
resultOffset = nextOffset
}
if err != nil {
output.Release()
return
}
if output.Offset != 0 {
output.SetSlice(0, s.iterLen)
}
return s.emitResult(output, data)
}
// fully preallocating, but not contiguously
// we (maybe) preallocate only for the output of processing
// the current chunk
for err == nil {
if input, _, next = s.iter(); !next {
break
}
output = *s.prepareOutput(int(input.Len))
if err = s.executeSingleSpan(&input, &output); err != nil {
output.Release()
return
}
err = s.emitResult(&output, data)
}
return
}
func (s *scalarExecutor) executeSingleSpan(input *exec.ExecSpan, out *exec.ExecResult) error {
switch {
case out.Type.ID() == arrow.NULL:
out.Nulls = out.Len
case s.kernel.GetNullHandling() == exec.NullIntersection:
if !s.elideValidityBitmap {
propagateNulls(s.ctx, input, out)
}
case s.kernel.GetNullHandling() == exec.NullNoOutput:
out.Nulls = 0
}
return s.kernel.Exec(s.ctx, input, out)
}
func (s *scalarExecutor) setupPrealloc(_ int64, args []Datum) error {
s.numOutBuf = len(s.outType.Layout().Buffers)
outTypeID := s.outType.ID()
// default to no validity pre-allocation for the following cases:
// - Output Array is NullArray
// - kernel.NullHandling is ComputeNoPrealloc or OutputNotNull
s.preallocValidity = false
if outTypeID != arrow.NULL {
switch s.kernel.GetNullHandling() {
case exec.NullComputedPrealloc:
s.preallocValidity = true
case exec.NullIntersection:
s.elideValidityBitmap = true
for _, a := range args {
nullGen := getNullGenDatum(a) == nullGenAllValid
s.elideValidityBitmap = s.elideValidityBitmap && nullGen
}
s.preallocValidity = !s.elideValidityBitmap
case exec.NullNoOutput:
s.elideValidityBitmap = true
}
}
if s.kernel.GetMemAlloc() == exec.MemPrealloc {
s.dataPrealloc = addComputeDataPrealloc(s.outType, s.dataPrealloc)
}
// validity bitmap either preallocated or elided, and all data buffers allocated
// this is basically only true for primitive types that are not dict-encoded
s.preallocAllBufs =
((s.preallocValidity || s.elideValidityBitmap) && len(s.dataPrealloc) == (s.numOutBuf-1) &&
!arrow.IsNested(outTypeID) && outTypeID != arrow.DICTIONARY)
// contiguous prealloc only possible on non-nested types if all
// buffers are preallocated. otherwise we have to go chunk by chunk
//
// some kernels are also unable to write into sliced outputs, so
// we respect the kernel's attributes
s.preallocContiguous =
(s.ectx.PreallocContiguous && s.kernel.CanFillSlices() &&
s.preallocAllBufs)
return nil
}
func (s *scalarExecutor) emitResult(resultData *exec.ArraySpan, data chan<- Datum) error {
var output Datum
if len(resultData.Buffers[0].Buf) != 0 {
resultData.UpdateNullCount()
}
if s.allScalars {
// we boxed scalar inputs as ArraySpan so now we have to unbox the output
arr := resultData.MakeArray()
defer arr.Release()
sc, err := scalar.GetScalar(arr, 0)
if err != nil {
return err
}
if r, ok := sc.(scalar.Releasable); ok {
defer r.Release()
}
output = NewDatum(sc)
} else {
d := resultData.MakeData()
defer d.Release()
output = NewDatum(d)
}
data <- output
return nil
}
func checkAllIsValue(vals []Datum) error {
for _, v := range vals {
if !DatumIsValue(v) {
return fmt.Errorf("%w: tried executing function with non-value type: %s",
arrow.ErrInvalid, v)
}
}
return nil
}
func checkIfAllScalar(batch *ExecBatch) bool {
for _, v := range batch.Values {
if v.Kind() != KindScalar {
return false
}
}
return batch.NumValues() > 0
}
// iterateExecSpans sets up and returns a function which can iterate a batch
// according to the chunk sizes. If the inputs contain chunked arrays, then
// we will find the min(chunk sizes, maxChunkSize) to ensure we return
// contiguous spans to execute on.
//
// the iteration function returns the next span to execute on, the current
// position in the full batch, and a boolean indicating whether or not
// a span was actually returned (there is data to process).
func iterateExecSpans(batch *ExecBatch, maxChunkSize int64, promoteIfAllScalar bool) (haveAllScalars bool, itr spanIterator, err error) {
if batch.NumValues() > 0 {
inferred, allArgsSame := inferBatchLength(batch.Values)
if inferred != batch.Len {
return false, nil, fmt.Errorf("%w: value lengths differed from execbatch length", arrow.ErrInvalid)
}
if !allArgsSame {
return false, nil, fmt.Errorf("%w: array args must all be the same length", arrow.ErrInvalid)
}
}
var (
args []Datum = batch.Values
haveChunked bool
chunkIdxes = make([]int, len(args))
valuePositions = make([]int64, len(args))
valueOffsets = make([]int64, len(args))
pos, length int64 = 0, batch.Len
)
haveAllScalars = checkIfAllScalar(batch)
maxChunkSize = exec.Min(length, maxChunkSize)
span := exec.ExecSpan{Values: make([]exec.ExecValue, len(args)), Len: 0}
for i, a := range args {
switch arg := a.(type) {
case *ScalarDatum:
span.Values[i].Scalar = arg.Value
case *ArrayDatum:
span.Values[i].Array.SetMembers(arg.Value)
valueOffsets[i] = int64(arg.Value.Offset())
case *ChunkedDatum:
// populate from first chunk
carr := arg.Value
if len(carr.Chunks()) > 0 {
arr := carr.Chunk(0).Data()
span.Values[i].Array.SetMembers(arr)
valueOffsets[i] = int64(arr.Offset())
} else {
// fill as zero len
exec.FillZeroLength(carr.DataType(), &span.Values[i].Array)
}
haveChunked = true
}
}
if haveAllScalars && promoteIfAllScalar {
exec.PromoteExecSpanScalars(span)
}
nextChunkSpan := func(iterSz int64, span exec.ExecSpan) int64 {
for i := 0; i < len(args) && iterSz > 0; i++ {
// if the argument is not chunked, it's either a scalar or an array
// in which case it doesn't influence the size of the span
chunkedArg, ok := args[i].(*ChunkedDatum)
if !ok {
continue
}
arg := chunkedArg.Value
if len(arg.Chunks()) == 0 {
iterSz = 0
continue
}
var curChunk arrow.Array
for {
curChunk = arg.Chunk(chunkIdxes[i])
if valuePositions[i] == int64(curChunk.Len()) {
// chunk is zero-length, or was exhausted in the previous
// iteration, move to next chunk
chunkIdxes[i]++
curChunk = arg.Chunk(chunkIdxes[i])
span.Values[i].Array.SetMembers(curChunk.Data())
valuePositions[i] = 0
valueOffsets[i] = int64(curChunk.Data().Offset())
continue
}
break
}
iterSz = exec.Min(int64(curChunk.Len())-valuePositions[i], iterSz)
}
return iterSz
}
return haveAllScalars, func() (exec.ExecSpan, int64, bool) {
if pos == length {
return exec.ExecSpan{}, pos, false
}
iterationSize := exec.Min(length-pos, maxChunkSize)
if haveChunked {
iterationSize = nextChunkSpan(iterationSize, span)
}
span.Len = iterationSize
for i, a := range args {
if a.Kind() != KindScalar {
span.Values[i].Array.SetSlice(valuePositions[i]+valueOffsets[i], iterationSize)
valuePositions[i] += iterationSize
}
}
pos += iterationSize
debug.Assert(pos <= length, "bad state for iteration exec span")
return span, pos, true
}, nil
}
var (
// have a pool of scalar executors to avoid excessive object creation
scalarExecPool = sync.Pool{
New: func() any { return &scalarExecutor{} },
}
vectorExecPool = sync.Pool{
New: func() any { return &vectorExecutor{} },
}
)
func checkCanExecuteChunked(k *exec.VectorKernel) error {
if k.ExecChunked == nil {
return fmt.Errorf("%w: vector kernel cannot execute chunkwise and no chunked exec function defined", arrow.ErrInvalid)
}
if k.NullHandling == exec.NullIntersection {
return fmt.Errorf("%w: null pre-propagation is unsupported for chunkedarray execution in vector kernels", arrow.ErrInvalid)
}
return nil
}
type vectorExecutor struct {
nonAggExecImpl
iter spanIterator
results []*exec.ArraySpan
iterLen int64
allScalars bool
}
func (v *vectorExecutor) Execute(ctx context.Context, batch *ExecBatch, data chan<- Datum) (err error) {
final := v.kernel.(*exec.VectorKernel).Finalize
if final != nil {
if v.results == nil {
v.results = make([]*exec.ArraySpan, 0, 1)
} else {
v.results = v.results[:0]
}
}
// some vector kernels have a separate code path for handling chunked
// arrays (VectorKernel.ExecChunked) so we check for any chunked
// arrays. If we do and an ExecChunked function is defined
// then we call that.
hasChunked := haveChunkedArray(batch.Values)
v.numOutBuf = len(v.outType.Layout().Buffers)
v.preallocValidity = v.kernel.GetNullHandling() != exec.NullComputedNoPrealloc &&
v.kernel.GetNullHandling() != exec.NullNoOutput
if v.kernel.GetMemAlloc() == exec.MemPrealloc {
v.dataPrealloc = addComputeDataPrealloc(v.outType, v.dataPrealloc)
}
if v.kernel.(*exec.VectorKernel).CanExecuteChunkWise {
v.allScalars, v.iter, err = iterateExecSpans(batch, v.ectx.ChunkSize, true)
v.iterLen = batch.Len
var (
input exec.ExecSpan
next bool
)
if v.iterLen == 0 {
input.Values = make([]exec.ExecValue, batch.NumValues())
for i, v := range batch.Values {
exec.FillZeroLength(v.(ArrayLikeDatum).Type(), &input.Values[i].Array)
}
err = v.exec(&input, data)
}
for err == nil {
if input, _, next = v.iter(); !next {
break
}
err = v.exec(&input, data)
}
if err != nil {
return
}
} else {
// kernel cannot execute chunkwise. if we have any chunked arrays,
// then execchunked must be defined or we raise an error
if hasChunked {
if err = v.execChunked(batch, data); err != nil {
return
}
} else {
// no chunked arrays. we pack the args into an execspan
// and call regular exec code path
span := ExecSpanFromBatch(batch)
if checkIfAllScalar(batch) {
exec.PromoteExecSpanScalars(*span)
}
if err = v.exec(span, data); err != nil {
return
}
}
}
if final != nil {
// intermediate results require post-processing after execution is
// completed (possibly involving some accumulated state)
output, err := final(v.ctx, v.results)
if err != nil {
return err
}
for _, r := range output {
d := r.MakeData()
defer d.Release()
data <- NewDatum(d)
}
}
return nil
}
func (v *vectorExecutor) WrapResults(ctx context.Context, out <-chan Datum, hasChunked bool) Datum {
// if kernel doesn't output chunked, just grab the one output and return it
if !v.kernel.(*exec.VectorKernel).OutputChunked {
var output Datum
select {
case <-ctx.Done():
return nil
case output = <-out:
}
// we got an output datum, but let's wait for the channel to
// close so we don't have any race conditions
select {
case <-ctx.Done():
output.Release()
return nil
case <-out:
return output
}
}
// if execution yielded multiple chunks then the result is a chunked array
var (
output Datum
acc []arrow.Array
)
toChunked := func() {
out := output.(ArrayLikeDatum).Chunks()
acc = make([]arrow.Array, 0, len(out))
for _, o := range out {
if o.Len() > 0 {
acc = append(acc, o)
}
}
if output.Kind() != KindChunked {
output.Release()
}
output = nil
}
// get first output
select {
case <-ctx.Done():
return nil
case output = <-out:
if output == nil || ctx.Err() != nil {
return nil
}
// if the inputs contained at least one chunked array
// then we want to return chunked output
if hasChunked {
toChunked()
}
}
for {
select {
case <-ctx.Done():
// context is done, either cancelled or a timeout.
// either way, we end early and return what we've got so far.
return output
case o, ok := <-out:
if !ok { // channel closed, wrap it up
if output != nil {
return output
}
for _, c := range acc {
defer c.Release()
}
chkd := arrow.NewChunked(v.outType, acc)
defer chkd.Release()
return NewDatum(chkd)
}
// if we get multiple batches of output, then we need
// to return it as a chunked array.
if acc == nil {
toChunked()
}
defer o.Release()
if o.Len() == 0 { // skip any empty batches
continue
}
acc = append(acc, o.(*ArrayDatum).MakeArray())
}
}
}
func (v *vectorExecutor) exec(span *exec.ExecSpan, data chan<- Datum) (err error) {
out := v.prepareOutput(int(span.Len))
if v.kernel.GetNullHandling() == exec.NullIntersection {
if err = propagateNulls(v.ctx, span, out); err != nil {
return
}
}
if err = v.kernel.Exec(v.ctx, span, out); err != nil {
return
}
return v.emitResult(out, data)
}
func (v *vectorExecutor) emitResult(result *exec.ArraySpan, data chan<- Datum) (err error) {
if v.kernel.(*exec.VectorKernel).Finalize == nil {
d := result.MakeData()
defer d.Release()
data <- NewDatum(d)
} else {
v.results = append(v.results, result)
}
return nil
}
func (v *vectorExecutor) execChunked(batch *ExecBatch, out chan<- Datum) error {
if err := checkCanExecuteChunked(v.kernel.(*exec.VectorKernel)); err != nil {
return err
}
output := v.prepareOutput(int(batch.Len))
input := make([]*arrow.Chunked, len(batch.Values))
for i, v := range batch.Values {
switch val := v.(type) {
case *ArrayDatum:
chks := val.Chunks()
input[i] = arrow.NewChunked(val.Type(), chks)
chks[0].Release()
defer input[i].Release()
case *ChunkedDatum:
input[i] = val.Value
default:
return fmt.Errorf("%w: handling with exec chunked", arrow.ErrNotImplemented)
}
}
result, err := v.kernel.(*exec.VectorKernel).ExecChunked(v.ctx, input, output)
if err != nil {
return err
}
if len(result) == 0 {
empty := output.MakeArray()
defer empty.Release()
out <- &ChunkedDatum{Value: arrow.NewChunked(output.Type, []arrow.Array{empty})}
return nil
}
for _, r := range result {
if err := v.emitResult(r, out); err != nil {
return err
}
}
return nil
}