arrow/compute/exec/kernel.go (456 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//go:build go1.18
package exec
import (
"context"
"fmt"
"hash/maphash"
"strings"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/bitutil"
"github.com/apache/arrow-go/v18/arrow/internal/debug"
"github.com/apache/arrow-go/v18/arrow/memory"
"golang.org/x/exp/slices"
)
var hashSeed = maphash.MakeSeed()
type ctxAllocKey struct{}
// WithAllocator returns a new context with the provided allocator
// embedded into the context.
func WithAllocator(ctx context.Context, mem memory.Allocator) context.Context {
return context.WithValue(ctx, ctxAllocKey{}, mem)
}
// GetAllocator retrieves the allocator from the context, or returns
// memory.DefaultAllocator if there was no allocator in the provided
// context.
func GetAllocator(ctx context.Context) memory.Allocator {
mem, ok := ctx.Value(ctxAllocKey{}).(memory.Allocator)
if !ok {
return memory.DefaultAllocator
}
return mem
}
// Kernel defines the minimum interface required for the basic execution
// kernel. It will grow as the implementation requires.
type Kernel interface {
GetInitFn() KernelInitFn
GetSig() *KernelSignature
}
// NonAggKernel builds on the base Kernel interface for
// non aggregate execution kernels. Specifically this will
// represent Scalar and Vector kernels.
type NonAggKernel interface {
Kernel
Exec(*KernelCtx, *ExecSpan, *ExecResult) error
GetNullHandling() NullHandling
GetMemAlloc() MemAlloc
CanFillSlices() bool
Cleanup() error
}
// KernelCtx is a small struct holding the context for a kernel execution
// consisting of a pointer to the kernel, initialized state (if needed)
// and the context for this execution.
type KernelCtx struct {
Ctx context.Context
Kernel Kernel
State KernelState
}
func (k *KernelCtx) Allocate(bufsize int) *memory.Buffer {
buf := memory.NewResizableBuffer(GetAllocator(k.Ctx))
buf.Resize(bufsize)
return buf
}
func (k *KernelCtx) AllocateBitmap(nbits int64) *memory.Buffer {
nbytes := bitutil.BytesForBits(nbits)
return k.Allocate(int(nbytes))
}
// TypeMatcher define an interface for matching Input or Output types
// for execution kernels. There are multiple implementations of this
// interface provided by this package.
type TypeMatcher interface {
fmt.Stringer
Matches(typ arrow.DataType) bool
Equals(other TypeMatcher) bool
}
type sameTypeIDMatcher struct {
accepted arrow.Type
}
func (s sameTypeIDMatcher) Matches(typ arrow.DataType) bool { return s.accepted == typ.ID() }
func (s sameTypeIDMatcher) Equals(other TypeMatcher) bool {
if s == other {
return true
}
o, ok := other.(*sameTypeIDMatcher)
if !ok {
return false
}
return s.accepted == o.accepted
}
func (s sameTypeIDMatcher) String() string {
return "Type::" + s.accepted.String()
}
// SameTypeID returns a type matcher which will match
// any DataType that uses the same arrow.Type ID as the one
// passed in here.
func SameTypeID(id arrow.Type) TypeMatcher { return &sameTypeIDMatcher{id} }
type timeUnitMatcher struct {
id arrow.Type
unit arrow.TimeUnit
}
func (s timeUnitMatcher) Matches(typ arrow.DataType) bool {
if typ.ID() != s.id {
return false
}
return s.unit == typ.(arrow.TemporalWithUnit).TimeUnit()
}
func (s timeUnitMatcher) String() string {
return strings.ToLower(s.id.String()) + "(" + s.unit.String() + ")"
}
func (s *timeUnitMatcher) Equals(other TypeMatcher) bool {
if s == other {
return true
}
o, ok := other.(*timeUnitMatcher)
if !ok {
return false
}
return o.id == s.id && o.unit == s.unit
}
// TimestampTypeUnit returns a TypeMatcher that will match only
// a Timestamp datatype with the specified TimeUnit.
func TimestampTypeUnit(unit arrow.TimeUnit) TypeMatcher {
return &timeUnitMatcher{arrow.TIMESTAMP, unit}
}
// Time32TypeUnit returns a TypeMatcher that will match only
// a Time32 datatype with the specified TimeUnit.
func Time32TypeUnit(unit arrow.TimeUnit) TypeMatcher {
return &timeUnitMatcher{arrow.TIME32, unit}
}
// Time64TypeUnit returns a TypeMatcher that will match only
// a Time64 datatype with the specified TimeUnit.
func Time64TypeUnit(unit arrow.TimeUnit) TypeMatcher {
return &timeUnitMatcher{arrow.TIME64, unit}
}
// DurationTypeUnit returns a TypeMatcher that will match only
// a Duration datatype with the specified TimeUnit.
func DurationTypeUnit(unit arrow.TimeUnit) TypeMatcher {
return &timeUnitMatcher{arrow.DURATION, unit}
}
type integerMatcher struct{}
func (integerMatcher) String() string { return "integer" }
func (integerMatcher) Matches(typ arrow.DataType) bool { return arrow.IsInteger(typ.ID()) }
func (integerMatcher) Equals(other TypeMatcher) bool {
_, ok := other.(integerMatcher)
return ok
}
type binaryLikeMatcher struct{}
func (binaryLikeMatcher) String() string { return "binary-like" }
func (binaryLikeMatcher) Matches(typ arrow.DataType) bool { return arrow.IsBinaryLike(typ.ID()) }
func (binaryLikeMatcher) Equals(other TypeMatcher) bool {
_, ok := other.(binaryLikeMatcher)
return ok
}
type largeBinaryLikeMatcher struct{}
func (largeBinaryLikeMatcher) String() string { return "large-binary-like" }
func (largeBinaryLikeMatcher) Matches(typ arrow.DataType) bool {
return arrow.IsLargeBinaryLike(typ.ID())
}
func (largeBinaryLikeMatcher) Equals(other TypeMatcher) bool {
_, ok := other.(largeBinaryLikeMatcher)
return ok
}
type fsbLikeMatcher struct{}
func (fsbLikeMatcher) String() string { return "fixed-size-binary-like" }
func (fsbLikeMatcher) Matches(typ arrow.DataType) bool { return arrow.IsFixedSizeBinary(typ.ID()) }
func (fsbLikeMatcher) Equals(other TypeMatcher) bool {
_, ok := other.(fsbLikeMatcher)
return ok
}
// Integer returns a TypeMatcher which will match any integral type like int8 or uint16
func Integer() TypeMatcher { return integerMatcher{} }
// BinaryLike returns a TypeMatcher that will match Binary or String
func BinaryLike() TypeMatcher { return binaryLikeMatcher{} }
// LargeBinaryLike returns a TypeMatcher which will match LargeBinary or LargeString
func LargeBinaryLike() TypeMatcher { return largeBinaryLikeMatcher{} }
// FixedSizeBinaryLike returns a TypeMatcher that will match FixedSizeBinary
// or Decimal128/256
func FixedSizeBinaryLike() TypeMatcher { return fsbLikeMatcher{} }
type primitiveMatcher struct{}
func (primitiveMatcher) String() string { return "primitive" }
func (primitiveMatcher) Matches(typ arrow.DataType) bool { return arrow.IsPrimitive(typ.ID()) }
func (primitiveMatcher) Equals(other TypeMatcher) bool {
_, ok := other.(primitiveMatcher)
return ok
}
// Primitive returns a TypeMatcher that will match any type that arrow.IsPrimitive
// returns true for.
func Primitive() TypeMatcher { return primitiveMatcher{} }
type reeMatcher struct {
runEndsMatcher TypeMatcher
encodedMatcher TypeMatcher
}
func (r reeMatcher) Matches(typ arrow.DataType) bool {
if typ.ID() != arrow.RUN_END_ENCODED {
return false
}
dt := typ.(*arrow.RunEndEncodedType)
return r.runEndsMatcher.Matches(dt.RunEnds()) && r.encodedMatcher.Matches(dt.Encoded())
}
func (r reeMatcher) Equals(other TypeMatcher) bool {
o, ok := other.(reeMatcher)
if !ok {
return false
}
return r.runEndsMatcher.Equals(o.runEndsMatcher) && r.encodedMatcher.Equals(o.encodedMatcher)
}
func (r reeMatcher) String() string {
return "run_end_encoded(run_ends=" + r.runEndsMatcher.String() + ", values=" + r.encodedMatcher.String() + ")"
}
// RunEndEncoded returns a matcher which matches a RunEndEncoded
// type whose encoded type is matched by the passed in matcher.
func RunEndEncoded(runEndsMatcher, encodedMatcher TypeMatcher) TypeMatcher {
return reeMatcher{
runEndsMatcher: runEndsMatcher,
encodedMatcher: encodedMatcher}
}
// InputKind is an enum representing the type of Input matching
// that will be done. Either accepting any type, an exact specific type
// or using a TypeMatcher.
type InputKind int8
const (
InputAny InputKind = iota
InputExact
InputUseMatcher
)
// InputType is used for type checking arguments passed to a kernel
// and stored within a KernelSignature. The type-checking rule can
// be supplied either with an exact DataType instance or a custom
// TypeMatcher.
type InputType struct {
Kind InputKind
Type arrow.DataType
Matcher TypeMatcher
}
func NewExactInput(dt arrow.DataType) InputType { return InputType{Kind: InputExact, Type: dt} }
func NewMatchedInput(match TypeMatcher) InputType {
return InputType{Kind: InputUseMatcher, Matcher: match}
}
func NewIDInput(id arrow.Type) InputType { return NewMatchedInput(SameTypeID(id)) }
func (it InputType) MatchID() arrow.Type {
switch it.Kind {
case InputExact:
return it.Type.ID()
case InputUseMatcher:
if idMatch, ok := it.Matcher.(*sameTypeIDMatcher); ok {
return idMatch.accepted
}
}
debug.Assert(false, "MatchID called on non-id matching InputType")
return -1
}
func (it InputType) String() string {
switch it.Kind {
case InputAny:
return "any"
case InputUseMatcher:
return it.Matcher.String()
case InputExact:
return it.Type.String()
}
return ""
}
func (it *InputType) Equals(other *InputType) bool {
if it == other {
return true
}
if it.Kind != other.Kind {
return false
}
switch it.Kind {
case InputAny:
return true
case InputExact:
return arrow.TypeEqual(it.Type, other.Type)
case InputUseMatcher:
return it.Matcher.Equals(other.Matcher)
default:
return false
}
}
func (it InputType) Hash() uint64 {
var h maphash.Hash
h.SetSeed(hashSeed)
result := HashCombine(h.Sum64(), uint64(it.Kind))
switch it.Kind {
case InputExact:
result = HashCombine(result, arrow.HashType(hashSeed, it.Type))
}
return result
}
func (it InputType) Matches(dt arrow.DataType) bool {
switch it.Kind {
case InputExact:
return arrow.TypeEqual(it.Type, dt)
case InputUseMatcher:
return it.Matcher.Matches(dt)
case InputAny:
return true
default:
debug.Assert(false, "invalid InputKind")
return true
}
}
// ResolveKind defines the way that a particular OutputType resolves
// its type. Either it has a fixed type to resolve to or it contains
// a Resolver which will compute the resolved type based on
// the input types.
type ResolveKind int8
const (
ResolveFixed ResolveKind = iota
ResolveComputed
)
// TypeResolver is simply a function that takes a KernelCtx and a list of input types
// and returns the resolved type or an error.
type TypeResolver = func(*KernelCtx, []arrow.DataType) (arrow.DataType, error)
type OutputType struct {
Kind ResolveKind
Type arrow.DataType
Resolver TypeResolver
}
func NewOutputType(dt arrow.DataType) OutputType {
return OutputType{Kind: ResolveFixed, Type: dt}
}
func NewComputedOutputType(resolver TypeResolver) OutputType {
return OutputType{Kind: ResolveComputed, Resolver: resolver}
}
func (o OutputType) String() string {
if o.Kind == ResolveFixed {
return o.Type.String()
}
return "computed"
}
func (o OutputType) Resolve(ctx *KernelCtx, types []arrow.DataType) (arrow.DataType, error) {
switch o.Kind {
case ResolveFixed:
return o.Type, nil
}
return o.Resolver(ctx, types)
}
// NullHandling is an enum representing how a particular Kernel
// wants the executor to handle nulls.
type NullHandling int8
const (
// Compute the output validity bitmap by intersection the validity
// bitmaps of the arguments using bitwise-and operations. This means
// that values in the output are valid/non-null only if the corresponding
// values in all input arguments were valid/non-null. Kernels generally
// do not have to touch the bitmap afterwards, but a kernel's exec function
// is permitted to alter the bitmap after the null intersection is computed
// if necessary.
NullIntersection NullHandling = iota
// Kernel expects a pre-allocated buffer to write the result bitmap
// into.
NullComputedPrealloc
// Kernel will allocate and set the validity bitmap of the output
NullComputedNoPrealloc
// kernel output is never null and a validity bitmap doesn't need to
// be allocated
NullNoOutput
)
// MemAlloc is the preference for preallocating memory of fixed-width
// type outputs during kernel execution.
type MemAlloc int8
const (
// For data types that support pre-allocation (fixed-width), the
// kernel expects to be provided a pre-allocated buffer to write into.
// Non-fixed-width types must always allocate their own buffers.
// The allocation is made for the same length as the execution batch,
// so vector kernels yielding differently sized outputs should not
// use this.
//
// It is valid for the data to not be preallocated but the validity
// bitmap is (or is computed using intersection).
//
// For variable-size output types like Binary or String, or for nested
// types, this option has no effect.
MemPrealloc MemAlloc = iota
// The kernel is responsible for allocating its own data buffer
// for fixed-width output types.
MemNoPrealloc
)
type KernelState any
// KernelInitArgs are the arguments required to initialize an Kernel's
// state using the input types and any options.
type KernelInitArgs struct {
Kernel Kernel
Inputs []arrow.DataType
// Options are opaque and specific to the Kernel being initialized,
// may be nil if the kernel doesn't require options.
Options any
}
// KernelInitFn is any function that receives a KernelCtx and initialization
// arguments and returns the initialized state or an error.
type KernelInitFn = func(*KernelCtx, KernelInitArgs) (KernelState, error)
// KernelSignature holds the input and output types for a kernel.
//
// Variable argument functions with a minimum of N arguments should pass
// up to N input types to be used to validate for invocation. The first
// N-1 types will be matched against the first N-1 arguments and the last
// type will be matched against the remaining arguments.
type KernelSignature struct {
InputTypes []InputType
OutType OutputType
IsVarArgs bool
// store the hashcode after it is computed so we don't
// need to recompute it
hashCode uint64
}
func (k KernelSignature) String() string {
var b strings.Builder
if k.IsVarArgs {
b.WriteString("varargs[")
} else {
b.WriteByte('(')
}
for i, t := range k.InputTypes {
if i != 0 {
b.WriteString(", ")
}
b.WriteString(t.String())
}
if k.IsVarArgs {
b.WriteString("*]")
} else {
b.WriteByte(')')
}
b.WriteString(" -> ")
b.WriteString(k.OutType.String())
return b.String()
}
func (k KernelSignature) Equals(other KernelSignature) bool {
if k.IsVarArgs != other.IsVarArgs {
return false
}
return slices.EqualFunc(k.InputTypes, other.InputTypes, func(e1, e2 InputType) bool {
return e1.Equals(&e2)
})
}
func (k *KernelSignature) Hash() uint64 {
if k.hashCode != 0 {
return k.hashCode
}
var h maphash.Hash
h.SetSeed(hashSeed)
result := h.Sum64()
for _, typ := range k.InputTypes {
result = HashCombine(result, typ.Hash())
}
k.hashCode = result
return result
}
func (k KernelSignature) MatchesInputs(types []arrow.DataType) bool {
switch k.IsVarArgs {
case true:
// check that it has enough to match at least the non-vararg types
if len(types) < (len(k.InputTypes) - 1) {
return false
}
for i, t := range types {
if !k.InputTypes[Min(i, len(k.InputTypes)-1)].Matches(t) {
return false
}
}
case false:
if len(types) != len(k.InputTypes) {
return false
}
for i, t := range types {
if !k.InputTypes[i].Matches(t) {
return false
}
}
}
return true
}
// ArrayKernelExec is an alias definition for a kernel's execution function.
//
// This is used for both stateless and stateful kernels. If a kernel
// depends on some execution state, it can be accessed from the KernelCtx
// object, which also contains the context.Context object which can be
// used for shortcircuiting by checking context.Done / context.Err.
// This allows kernels to control handling timeouts or cancellation of
// computation.
type ArrayKernelExec = func(*KernelCtx, *ExecSpan, *ExecResult) error
type kernel struct {
Init KernelInitFn
Signature *KernelSignature
Data KernelState
Parallelizable bool
}
func (k kernel) GetInitFn() KernelInitFn { return k.Init }
func (k kernel) GetSig() *KernelSignature { return k.Signature }
// A ScalarKernel is the kernel implementation for a Scalar Function.
// In addition to the members found in the base Kernel, it contains
// the null handling and memory pre-allocation preferences.
type ScalarKernel struct {
kernel
ExecFn ArrayKernelExec
CanWriteIntoSlices bool
NullHandling NullHandling
MemAlloc MemAlloc
CleanupFn func(KernelState) error
}
// NewScalarKernel constructs a new kernel for scalar execution, constructing
// a KernelSignature with the provided input types and output type, and using
// the passed in execution implementation and initialization function.
func NewScalarKernel(in []InputType, out OutputType, exec ArrayKernelExec, init KernelInitFn) ScalarKernel {
return NewScalarKernelWithSig(&KernelSignature{
InputTypes: in,
OutType: out,
}, exec, init)
}
// NewScalarKernelWithSig is a convenience when you already have a signature
// to use for constructing a kernel. It's equivalent to passing the components
// of the signature (input and output types) to NewScalarKernel.
func NewScalarKernelWithSig(sig *KernelSignature, exec ArrayKernelExec, init KernelInitFn) ScalarKernel {
return ScalarKernel{
kernel: kernel{Signature: sig, Init: init, Parallelizable: true},
ExecFn: exec,
CanWriteIntoSlices: true,
NullHandling: NullIntersection,
MemAlloc: MemPrealloc,
}
}
func (s *ScalarKernel) Cleanup() error {
if s.CleanupFn != nil {
return s.CleanupFn(s.Data)
}
return nil
}
func (s *ScalarKernel) Exec(ctx *KernelCtx, sp *ExecSpan, out *ExecResult) error {
return s.ExecFn(ctx, sp, out)
}
func (s ScalarKernel) GetNullHandling() NullHandling { return s.NullHandling }
func (s ScalarKernel) GetMemAlloc() MemAlloc { return s.MemAlloc }
func (s ScalarKernel) CanFillSlices() bool { return s.CanWriteIntoSlices }
// ChunkedExec is the signature for executing a stateful vector kernel
// against a ChunkedArray input. It is optional
type ChunkedExec func(*KernelCtx, []*arrow.Chunked, *ExecResult) ([]*ExecResult, error)
// FinalizeFunc is an optional finalizer function for any postprocessing
// that may need to be done on data before returning it
type FinalizeFunc func(*KernelCtx, []*ArraySpan) ([]*ArraySpan, error)
// VectorKernel is a structure for implementations of vector functions.
// It can optionally contain a finalizer function, the null handling
// and memory pre-allocation preferences (different defaults from
// scalar kernels when using NewVectorKernel), and other execution related
// options.
type VectorKernel struct {
kernel
ExecFn ArrayKernelExec
ExecChunked ChunkedExec
Finalize FinalizeFunc
NullHandling NullHandling
MemAlloc MemAlloc
CanWriteIntoSlices bool
CanExecuteChunkWise bool
OutputChunked bool
}
// NewVectorKernel constructs a new kernel for execution of vector functions,
// which take into account more than just the individual scalar values
// of its input. Output of a vector kernel may be a different length
// than its inputs.
func NewVectorKernel(inTypes []InputType, outType OutputType, exec ArrayKernelExec, init KernelInitFn) VectorKernel {
return NewVectorKernelWithSig(&KernelSignature{
InputTypes: inTypes, OutType: outType}, exec, init)
}
// NewVectorKernelWithSig is a convenience function for creating a kernel
// when you already have a signature constructed.
func NewVectorKernelWithSig(sig *KernelSignature, exec ArrayKernelExec, init KernelInitFn) VectorKernel {
return VectorKernel{
kernel: kernel{Signature: sig, Init: init, Parallelizable: true},
ExecFn: exec,
CanWriteIntoSlices: true,
CanExecuteChunkWise: true,
OutputChunked: true,
NullHandling: NullComputedNoPrealloc,
MemAlloc: MemNoPrealloc,
}
}
func (s *VectorKernel) Exec(ctx *KernelCtx, sp *ExecSpan, out *ExecResult) error {
return s.ExecFn(ctx, sp, out)
}
func (s VectorKernel) GetNullHandling() NullHandling { return s.NullHandling }
func (s VectorKernel) GetMemAlloc() MemAlloc { return s.MemAlloc }
func (s VectorKernel) CanFillSlices() bool { return s.CanWriteIntoSlices }
func (s VectorKernel) Cleanup() error { return nil }