arrow/compute/internal/kernels/vector_hash.go (457 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//go:build go1.18
package kernels
import (
"fmt"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/compute/exec"
"github.com/apache/arrow-go/v18/arrow/internal/debug"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/apache/arrow-go/v18/internal/bitutils"
"github.com/apache/arrow-go/v18/internal/hashing"
)
type HashState interface {
// Reset for another run
Reset() error
// Flush out accumulated results from last invocation
Flush(*exec.ExecResult) error
// FlushFinal flushes the accumulated results across all invocations
// of calls. The kernel should not be used again until after
// Reset() is called.
FlushFinal(out *exec.ExecResult) error
// GetDictionary returns the values (keys) accumulated in the dictionary
// so far.
GetDictionary() (arrow.ArrayData, error)
ValueType() arrow.DataType
// Append prepares the action for the given input (reserving appropriately
// sized data structures, etc.) and visits the input with the Action
Append(*exec.KernelCtx, *exec.ArraySpan) error
Allocator() memory.Allocator
}
type Action interface {
Reset() error
Reserve(int) error
Flush(*exec.ExecResult) error
FlushFinal(*exec.ExecResult) error
ObserveFound(int)
ObserveNotFound(int) error
ObserveNullFound(int)
ObserveNullNotFound(int) error
ShouldEncodeNulls() bool
}
type emptyAction struct {
mem memory.Allocator
dt arrow.DataType
}
func (emptyAction) Reset() error { return nil }
func (emptyAction) Reserve(int) error { return nil }
func (emptyAction) Flush(*exec.ExecResult) error { return nil }
func (emptyAction) FlushFinal(*exec.ExecResult) error { return nil }
func (emptyAction) ObserveFound(int) {}
func (emptyAction) ObserveNotFound(int) error { return nil }
func (emptyAction) ObserveNullFound(int) {}
func (emptyAction) ObserveNullNotFound(int) error { return nil }
func (emptyAction) ShouldEncodeNulls() bool { return true }
type uniqueAction = emptyAction
type regularHashState struct {
mem memory.Allocator
typ arrow.DataType
memoTable hashing.MemoTable
action Action
doAppend func(Action, hashing.MemoTable, *exec.ArraySpan) error
}
func (rhs *regularHashState) Allocator() memory.Allocator { return rhs.mem }
func (rhs *regularHashState) ValueType() arrow.DataType { return rhs.typ }
func (rhs *regularHashState) Reset() error {
rhs.memoTable.Reset()
return rhs.action.Reset()
}
func (rhs *regularHashState) Append(_ *exec.KernelCtx, arr *exec.ArraySpan) error {
if err := rhs.action.Reserve(int(arr.Len)); err != nil {
return err
}
return rhs.doAppend(rhs.action, rhs.memoTable, arr)
}
func (rhs *regularHashState) Flush(out *exec.ExecResult) error { return rhs.action.Flush(out) }
func (rhs *regularHashState) FlushFinal(out *exec.ExecResult) error {
return rhs.action.FlushFinal(out)
}
func (rhs *regularHashState) GetDictionary() (arrow.ArrayData, error) {
return array.GetDictArrayData(rhs.mem, rhs.typ, rhs.memoTable, 0)
}
func doAppendBinary[OffsetT int32 | int64](action Action, memo hashing.MemoTable, arr *exec.ArraySpan) error {
var (
bitmap = arr.Buffers[0].Buf
offsets = exec.GetSpanOffsets[OffsetT](arr, 1)
data = arr.Buffers[2].Buf
shouldEncodeNulls = action.ShouldEncodeNulls()
)
return bitutils.VisitBitBlocksShort(bitmap, arr.Offset, arr.Len,
func(pos int64) error {
v := data[offsets[pos]:offsets[pos+1]]
idx, found, err := memo.GetOrInsert(v)
if err != nil {
return err
}
if found {
action.ObserveFound(idx)
return nil
}
return action.ObserveNotFound(idx)
},
func() error {
if !shouldEncodeNulls {
return action.ObserveNullNotFound(-1)
}
idx, found := memo.GetOrInsertNull()
if found {
action.ObserveNullFound(idx)
}
return action.ObserveNullNotFound(idx)
})
}
func doAppendFixedSize(action Action, memo hashing.MemoTable, arr *exec.ArraySpan) error {
sz := int64(arr.Type.(arrow.FixedWidthDataType).Bytes())
arrData := arr.Buffers[1].Buf[arr.Offset*sz:]
shouldEncodeNulls := action.ShouldEncodeNulls()
return bitutils.VisitBitBlocksShort(arr.Buffers[0].Buf, arr.Offset, arr.Len,
func(pos int64) error {
// fixed size type memo table we use a binary memo table
// so get the raw bytes
idx, found, err := memo.GetOrInsert(arrData[pos*sz : (pos+1)*sz])
if err != nil {
return err
}
if found {
action.ObserveFound(idx)
return nil
}
return action.ObserveNotFound(idx)
}, func() error {
if !shouldEncodeNulls {
return action.ObserveNullNotFound(-1)
}
idx, found := memo.GetOrInsertNull()
if found {
action.ObserveNullFound(idx)
}
return action.ObserveNullNotFound(idx)
})
}
func doAppendNumeric[T arrow.IntType | arrow.UintType | arrow.FloatType](action Action, memo hashing.MemoTable, arr *exec.ArraySpan) error {
arrData := exec.GetSpanValues[T](arr, 1)
shouldEncodeNulls := action.ShouldEncodeNulls()
return bitutils.VisitBitBlocksShort(arr.Buffers[0].Buf, arr.Offset, arr.Len,
func(pos int64) error {
idx, found, err := memo.GetOrInsert(arrData[pos])
if err != nil {
return err
}
if found {
action.ObserveFound(idx)
return nil
}
return action.ObserveNotFound(idx)
}, func() error {
if !shouldEncodeNulls {
return action.ObserveNullNotFound(-1)
}
idx, found := memo.GetOrInsertNull()
if found {
action.ObserveNullFound(idx)
}
return action.ObserveNullNotFound(idx)
})
}
type nullHashState struct {
mem memory.Allocator
typ arrow.DataType
seenNull bool
action Action
}
func (nhs *nullHashState) Allocator() memory.Allocator { return nhs.mem }
func (nhs *nullHashState) ValueType() arrow.DataType { return nhs.typ }
func (nhs *nullHashState) Reset() error {
return nhs.action.Reset()
}
func (nhs *nullHashState) Append(_ *exec.KernelCtx, arr *exec.ArraySpan) (err error) {
if err := nhs.action.Reserve(int(arr.Len)); err != nil {
return err
}
for i := 0; i < int(arr.Len); i++ {
if i == 0 {
nhs.seenNull = true
err = nhs.action.ObserveNullNotFound(0)
} else {
nhs.action.ObserveNullFound(0)
}
}
return
}
func (nhs *nullHashState) Flush(out *exec.ExecResult) error { return nhs.action.Flush(out) }
func (nhs *nullHashState) FlushFinal(out *exec.ExecResult) error {
return nhs.action.FlushFinal(out)
}
func (nhs *nullHashState) GetDictionary() (arrow.ArrayData, error) {
var out arrow.Array
if nhs.seenNull {
out = array.NewNull(1)
} else {
out = array.NewNull(0)
}
data := out.Data()
data.Retain()
out.Release()
return data, nil
}
type dictionaryHashState struct {
indicesKernel HashState
dictionary arrow.Array
dictValueType arrow.DataType
}
func (dhs *dictionaryHashState) Allocator() memory.Allocator { return dhs.indicesKernel.Allocator() }
func (dhs *dictionaryHashState) Reset() error { return dhs.indicesKernel.Reset() }
func (dhs *dictionaryHashState) Flush(out *exec.ExecResult) error {
return dhs.indicesKernel.Flush(out)
}
func (dhs *dictionaryHashState) FlushFinal(out *exec.ExecResult) error {
return dhs.indicesKernel.FlushFinal(out)
}
func (dhs *dictionaryHashState) GetDictionary() (arrow.ArrayData, error) {
return dhs.indicesKernel.GetDictionary()
}
func (dhs *dictionaryHashState) ValueType() arrow.DataType { return dhs.indicesKernel.ValueType() }
func (dhs *dictionaryHashState) DictionaryValueType() arrow.DataType { return dhs.dictValueType }
func (dhs *dictionaryHashState) Dictionary() arrow.Array { return dhs.dictionary }
func (dhs *dictionaryHashState) Append(ctx *exec.KernelCtx, arr *exec.ArraySpan) error {
arrDict := arr.Dictionary().MakeArray()
if dhs.dictionary == nil || array.Equal(dhs.dictionary, arrDict) {
dhs.dictionary = arrDict
return dhs.indicesKernel.Append(ctx, arr)
}
defer arrDict.Release()
// NOTE: this approach computes a new dictionary unification per chunk
// this is in effect O(n*k) where n is the total chunked array length
// and k is the number of chunks (therefore O(n**2) if chunks have a fixed size).
//
// A better approach may be to run the kernel over each individual chunk,
// and then hash-aggregate all results (for example sum-group-by for
// the "value_counts" kernel)
unifier, err := array.NewDictionaryUnifier(dhs.indicesKernel.Allocator(), dhs.dictValueType)
if err != nil {
return err
}
defer unifier.Release()
if err := unifier.Unify(dhs.dictionary); err != nil {
return err
}
transposeMap, err := unifier.UnifyAndTranspose(arrDict)
if err != nil {
return err
}
defer transposeMap.Release()
_, outDict, err := unifier.GetResult()
if err != nil {
return err
}
defer func() {
dhs.dictionary.Release()
dhs.dictionary = outDict
}()
inDict := arr.MakeData()
defer inDict.Release()
tmp, err := array.TransposeDictIndices(dhs.Allocator(), inDict, arr.Type, arr.Type, outDict.Data(), arrow.Int32Traits.CastFromBytes(transposeMap.Bytes()))
if err != nil {
return err
}
defer tmp.Release()
var tmpSpan exec.ArraySpan
tmpSpan.SetMembers(tmp)
return dhs.indicesKernel.Append(ctx, &tmpSpan)
}
func nullHashInit(actionInit initAction) exec.KernelInitFn {
return func(ctx *exec.KernelCtx, args exec.KernelInitArgs) (exec.KernelState, error) {
mem := exec.GetAllocator(ctx.Ctx)
ret := &nullHashState{
mem: mem,
typ: args.Inputs[0],
action: actionInit(args.Inputs[0], args.Options, mem),
}
ret.Reset()
return ret, nil
}
}
func newMemoTable(mem memory.Allocator, dt arrow.Type) (hashing.MemoTable, error) {
switch dt {
case arrow.INT8, arrow.UINT8:
return hashing.NewUint8MemoTable(0), nil
case arrow.INT16, arrow.UINT16:
return hashing.NewUint16MemoTable(0), nil
case arrow.INT32, arrow.UINT32, arrow.FLOAT32, arrow.DECIMAL32,
arrow.DATE32, arrow.TIME32, arrow.INTERVAL_MONTHS:
return hashing.NewUint32MemoTable(0), nil
case arrow.INT64, arrow.UINT64, arrow.FLOAT64, arrow.DECIMAL64,
arrow.DATE64, arrow.TIME64, arrow.TIMESTAMP,
arrow.DURATION, arrow.INTERVAL_DAY_TIME:
return hashing.NewUint64MemoTable(0), nil
case arrow.BINARY, arrow.STRING, arrow.FIXED_SIZE_BINARY, arrow.DECIMAL128,
arrow.DECIMAL256, arrow.INTERVAL_MONTH_DAY_NANO:
return hashing.NewBinaryMemoTable(0, 0,
array.NewBinaryBuilder(mem, arrow.BinaryTypes.Binary)), nil
case arrow.LARGE_BINARY, arrow.LARGE_STRING:
return hashing.NewBinaryMemoTable(0, 0,
array.NewBinaryBuilder(mem, arrow.BinaryTypes.LargeBinary)), nil
default:
return nil, fmt.Errorf("%w: unsupported type %s", arrow.ErrNotImplemented, dt)
}
}
func regularHashInit(dt arrow.DataType, actionInit initAction, appendFn func(Action, hashing.MemoTable, *exec.ArraySpan) error) exec.KernelInitFn {
return func(ctx *exec.KernelCtx, args exec.KernelInitArgs) (exec.KernelState, error) {
mem := exec.GetAllocator(ctx.Ctx)
memoTable, err := newMemoTable(mem, dt.ID())
if err != nil {
return nil, err
}
ret := ®ularHashState{
mem: mem,
typ: args.Inputs[0],
memoTable: memoTable,
action: actionInit(args.Inputs[0], args.Options, mem),
doAppend: appendFn,
}
ret.Reset()
return ret, nil
}
}
func dictionaryHashInit(actionInit initAction) exec.KernelInitFn {
return func(ctx *exec.KernelCtx, args exec.KernelInitArgs) (exec.KernelState, error) {
var (
dictType = args.Inputs[0].(*arrow.DictionaryType)
indicesHasher exec.KernelState
err error
)
switch dictType.IndexType.ID() {
case arrow.INT8, arrow.UINT8:
indicesHasher, err = getHashInit(arrow.UINT8, actionInit)(ctx, args)
case arrow.INT16, arrow.UINT16:
indicesHasher, err = getHashInit(arrow.UINT16, actionInit)(ctx, args)
case arrow.INT32, arrow.UINT32:
indicesHasher, err = getHashInit(arrow.UINT32, actionInit)(ctx, args)
case arrow.INT64, arrow.UINT64:
indicesHasher, err = getHashInit(arrow.UINT64, actionInit)(ctx, args)
default:
return nil, fmt.Errorf("%w: unsupported dictionary index type", arrow.ErrInvalid)
}
if err != nil {
return nil, err
}
return &dictionaryHashState{
indicesKernel: indicesHasher.(HashState),
dictValueType: dictType.ValueType,
}, nil
}
}
type initAction func(arrow.DataType, any, memory.Allocator) Action
func getHashInit(typeID arrow.Type, actionInit initAction) exec.KernelInitFn {
switch typeID {
case arrow.NULL:
return nullHashInit(actionInit)
case arrow.INT8, arrow.UINT8:
return regularHashInit(arrow.PrimitiveTypes.Uint8, actionInit, doAppendNumeric[uint8])
case arrow.INT16, arrow.UINT16:
return regularHashInit(arrow.PrimitiveTypes.Uint16, actionInit, doAppendNumeric[uint16])
case arrow.INT32, arrow.UINT32, arrow.FLOAT32,
arrow.DATE32, arrow.TIME32, arrow.INTERVAL_MONTHS:
return regularHashInit(arrow.PrimitiveTypes.Uint32, actionInit, doAppendNumeric[uint32])
case arrow.INT64, arrow.UINT64, arrow.FLOAT64,
arrow.DATE64, arrow.TIME64, arrow.TIMESTAMP,
arrow.DURATION, arrow.INTERVAL_DAY_TIME:
return regularHashInit(arrow.PrimitiveTypes.Uint64, actionInit, doAppendNumeric[uint64])
case arrow.BINARY, arrow.STRING:
return regularHashInit(arrow.BinaryTypes.Binary, actionInit, doAppendBinary[int32])
case arrow.LARGE_BINARY, arrow.LARGE_STRING:
return regularHashInit(arrow.BinaryTypes.LargeBinary, actionInit, doAppendBinary[int64])
case arrow.FIXED_SIZE_BINARY, arrow.DECIMAL128, arrow.DECIMAL256:
return regularHashInit(arrow.BinaryTypes.Binary, actionInit, doAppendFixedSize)
case arrow.INTERVAL_MONTH_DAY_NANO:
return regularHashInit(arrow.FixedWidthTypes.MonthDayNanoInterval, actionInit, doAppendFixedSize)
default:
debug.Assert(false, "unsupported hash init type")
return nil
}
}
func hashExec(ctx *exec.KernelCtx, batch *exec.ExecSpan, out *exec.ExecResult) error {
impl, ok := ctx.State.(HashState)
if !ok {
return fmt.Errorf("%w: bad initialization of hash state", arrow.ErrInvalid)
}
if err := impl.Append(ctx, &batch.Values[0].Array); err != nil {
return err
}
return impl.Flush(out)
}
func uniqueFinalize(ctx *exec.KernelCtx, results []*exec.ArraySpan) ([]*exec.ArraySpan, error) {
impl, ok := ctx.State.(HashState)
if !ok {
return nil, fmt.Errorf("%w: HashState in invalid state", arrow.ErrInvalid)
}
for _, r := range results {
// release any pre-allocation we did
r.Release()
}
uniques, err := impl.GetDictionary()
if err != nil {
return nil, err
}
defer uniques.Release()
var out exec.ArraySpan
out.TakeOwnership(uniques)
return []*exec.ArraySpan{&out}, nil
}
func ensureHashDictionary(_ *exec.KernelCtx, hash *dictionaryHashState) (*exec.ArraySpan, error) {
out := &exec.ArraySpan{}
if hash.dictionary != nil {
out.TakeOwnership(hash.dictionary.Data())
hash.dictionary.Release()
return out, nil
}
exec.FillZeroLength(hash.DictionaryValueType(), out)
return out, nil
}
func uniqueFinalizeDictionary(ctx *exec.KernelCtx, result []*exec.ArraySpan) (out []*exec.ArraySpan, err error) {
if out, err = uniqueFinalize(ctx, result); err != nil {
return
}
hash, ok := ctx.State.(*dictionaryHashState)
if !ok {
return nil, fmt.Errorf("%w: state should be *dictionaryHashState", arrow.ErrInvalid)
}
dict, err := ensureHashDictionary(ctx, hash)
if err != nil {
return nil, err
}
out[0].SetDictionary(dict)
return
}
func addHashKernels(base exec.VectorKernel, actionInit initAction, outTy exec.OutputType) []exec.VectorKernel {
kernels := make([]exec.VectorKernel, 0)
for _, ty := range primitiveTypes {
base.Init = getHashInit(ty.ID(), actionInit)
base.Signature = &exec.KernelSignature{
InputTypes: []exec.InputType{exec.NewExactInput(ty)},
OutType: outTy,
}
kernels = append(kernels, base)
}
parametricTypes := []arrow.Type{arrow.TIME32, arrow.TIME64, arrow.TIMESTAMP,
arrow.DURATION, arrow.FIXED_SIZE_BINARY, arrow.DECIMAL128, arrow.DECIMAL256,
arrow.INTERVAL_DAY_TIME, arrow.INTERVAL_MONTHS, arrow.INTERVAL_MONTH_DAY_NANO}
for _, ty := range parametricTypes {
base.Init = getHashInit(ty, actionInit)
base.Signature = &exec.KernelSignature{
InputTypes: []exec.InputType{exec.NewIDInput(ty)},
OutType: outTy,
}
kernels = append(kernels, base)
}
return kernels
}
func initUnique(dt arrow.DataType, _ any, mem memory.Allocator) Action {
return uniqueAction{mem: mem, dt: dt}
}
func GetVectorHashKernels() (unique, valueCounts, dictEncode []exec.VectorKernel) {
var base exec.VectorKernel
base.ExecFn = hashExec
// unique
base.Finalize = uniqueFinalize
base.OutputChunked = false
base.CanExecuteChunkWise = true
unique = addHashKernels(base, initUnique, OutputFirstType)
// dictionary unique
base.Init = dictionaryHashInit(initUnique)
base.Finalize = uniqueFinalizeDictionary
base.Signature = &exec.KernelSignature{
InputTypes: []exec.InputType{exec.NewIDInput(arrow.DICTIONARY)},
OutType: OutputFirstType,
}
unique = append(unique, base)
return
}