arrow/compute/internal/kernels/helpers.go (808 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. //go:build go1.18 package kernels import ( "fmt" "unsafe" "github.com/apache/arrow-go/v18/arrow" "github.com/apache/arrow-go/v18/arrow/bitutil" "github.com/apache/arrow-go/v18/arrow/compute/exec" "github.com/apache/arrow-go/v18/arrow/internal/debug" "github.com/apache/arrow-go/v18/arrow/memory" "github.com/apache/arrow-go/v18/arrow/scalar" "github.com/apache/arrow-go/v18/internal/bitutils" "golang.org/x/exp/constraints" ) // ScalarUnary returns a kernel for performing a unary operation on // FixedWidth types which is implemented using the passed in function // which will receive a slice containing the raw input data along with // a slice to populate for the output data. // // Note that bool is not included in arrow.FixedWidthType since it is // represented as a bitmap, not as a slice of bool. func ScalarUnary[OutT, Arg0T arrow.FixedWidthType](op func(*exec.KernelCtx, []Arg0T, []OutT) error) exec.ArrayKernelExec { return func(ctx *exec.KernelCtx, in *exec.ExecSpan, out *exec.ExecResult) error { arg0 := in.Values[0].Array inData := exec.GetSpanValues[Arg0T](&arg0, 1) outData := exec.GetSpanValues[OutT](out, 1) return op(ctx, inData, outData) } } // ScalarUnaryNotNull is for generating a kernel to operate only on the // non-null values in the input array. The zerovalue of the output type // is used for any null input values. func ScalarUnaryNotNull[OutT, Arg0T arrow.FixedWidthType](op func(*exec.KernelCtx, Arg0T, *error) OutT) exec.ArrayKernelExec { return func(ctx *exec.KernelCtx, in *exec.ExecSpan, out *exec.ExecResult) error { var ( arg0 = &in.Values[0].Array arg0Data = exec.GetSpanValues[Arg0T](arg0, 1) outPos = 0 def OutT outData = exec.GetSpanValues[OutT](out, 1) bitmap = arg0.Buffers[0].Buf err error ) bitutils.VisitBitBlocks(bitmap, arg0.Offset, arg0.Len, func(pos int64) { outData[outPos] = op(ctx, arg0Data[pos], &err) outPos++ }, func() { outData[outPos] = def outPos++ }) return err } } // ScalarUnaryBoolOutput is like ScalarUnary only it is for cases of boolean // output. The function should take in a slice of the input type and a slice // of bytes to fill with the output boolean bitmap. func ScalarUnaryBoolOutput[Arg0T arrow.FixedWidthType](op func(*exec.KernelCtx, []Arg0T, []byte) error) exec.ArrayKernelExec { return func(ctx *exec.KernelCtx, in *exec.ExecSpan, out *exec.ExecResult) error { arg0 := in.Values[0].Array inData := exec.GetSpanValues[Arg0T](&arg0, 1) return op(ctx, inData, out.Buffers[1].Buf) } } // ScalarUnaryNotNullBinaryArgBoolOut creates a unary kernel that accepts // a binary type input (Binary [offset int32], String [offset int32], // LargeBinary [offset int64], LargeString [offset int64]) and returns // a boolean output which is never null. // // It implements the handling to iterate the offsets and values calling // the provided function on each byte slice. The provided default value // will be used as the output for elements of the input that are null. func ScalarUnaryNotNullBinaryArgBoolOut[OffsetT int32 | int64](defVal bool, op func(*exec.KernelCtx, []byte, *error) bool) exec.ArrayKernelExec { return func(ctx *exec.KernelCtx, in *exec.ExecSpan, out *exec.ExecResult) error { var ( arg0 = in.Values[0].Array outData = out.Buffers[1].Buf outPos = 0 arg0Offsets = exec.GetSpanOffsets[OffsetT](&arg0, 1) arg0Data = arg0.Buffers[2].Buf bitmap = arg0.Buffers[0].Buf err error ) bitutils.VisitBitBlocks(bitmap, arg0.Offset, arg0.Len, func(pos int64) { v := arg0Data[arg0Offsets[pos]:arg0Offsets[pos+1]] bitutil.SetBitTo(outData, int(out.Offset)+outPos, op(ctx, v, &err)) outPos++ }, func() { bitutil.SetBitTo(outData, int(out.Offset)+outPos, defVal) outPos++ }) return err } } // ScalarUnaryNotNullBinaryArg creates a unary kernel that accepts // a binary type input (Binary [offset int32], String [offset int32], // LargeBinary [offset int64], LargeString [offset int64]) and returns // a FixedWidthType output which is never null. // // It implements the handling to iterate the offsets and values calling // the provided function on each byte slice. The zero value of the OutT // will be used as the output for elements of the input that are null. func ScalarUnaryNotNullBinaryArg[OutT arrow.FixedWidthType, OffsetT int32 | int64](op func(*exec.KernelCtx, []byte, *error) OutT) exec.ArrayKernelExec { return func(ctx *exec.KernelCtx, in *exec.ExecSpan, out *exec.ExecResult) error { var ( arg0 = &in.Values[0].Array outData = exec.GetSpanValues[OutT](out, 1) outPos = 0 arg0Offsets = exec.GetSpanOffsets[OffsetT](arg0, 1) def OutT arg0Data = arg0.Buffers[2].Buf bitmap = arg0.Buffers[0].Buf err error ) bitutils.VisitBitBlocks(bitmap, arg0.Offset, arg0.Len, func(pos int64) { v := arg0Data[arg0Offsets[pos]:arg0Offsets[pos+1]] outData[outPos] = op(ctx, v, &err) outPos++ }, func() { outData[outPos] = def outPos++ }) return err } } // ScalarUnaryBoolArg is like ScalarUnary except it specifically expects a // function that takes a byte slice since booleans arrays are represented // as a bitmap. func ScalarUnaryBoolArg[OutT arrow.FixedWidthType](op func(*exec.KernelCtx, []byte, []OutT) error) exec.ArrayKernelExec { return func(ctx *exec.KernelCtx, input *exec.ExecSpan, out *exec.ExecResult) error { outData := exec.GetSpanValues[OutT](out, 1) return op(ctx, input.Values[0].Array.Buffers[1].Buf, outData) } } func UnboxScalar[T arrow.FixedWidthType](val scalar.PrimitiveScalar) T { return *(*T)(unsafe.Pointer(&val.Data()[0])) } func UnboxBinaryScalar(val scalar.BinaryScalar) []byte { if !val.IsValid() { return nil } return val.Data() } type arrArrFn[OutT, Arg0T, Arg1T arrow.FixedWidthType] func(*exec.KernelCtx, []Arg0T, []Arg1T, []OutT) error type arrScalarFn[OutT, Arg0T, Arg1T arrow.FixedWidthType] func(*exec.KernelCtx, []Arg0T, Arg1T, []OutT) error type scalarArrFn[OutT, Arg0T, Arg1T arrow.FixedWidthType] func(*exec.KernelCtx, Arg0T, []Arg1T, []OutT) error type binaryOps[OutT, Arg0T, Arg1T arrow.FixedWidthType] struct { arrArr arrArrFn[OutT, Arg0T, Arg1T] arrScalar arrScalarFn[OutT, Arg0T, Arg1T] scalarArr scalarArrFn[OutT, Arg0T, Arg1T] } type binaryBoolOps struct { arrArr func(ctx *exec.KernelCtx, lhs, rhs, out bitutil.Bitmap) error arrScalar func(ctx *exec.KernelCtx, lhs bitutil.Bitmap, rhs bool, out bitutil.Bitmap) error scalarArr func(ctx *exec.KernelCtx, lhs bool, rhs, out bitutil.Bitmap) error } func ScalarBinary[OutT, Arg0T, Arg1T arrow.FixedWidthType](ops binaryOps[OutT, Arg0T, Arg1T]) exec.ArrayKernelExec { arrayArray := func(ctx *exec.KernelCtx, arg0, arg1 *exec.ArraySpan, out *exec.ExecResult) error { var ( a0 = exec.GetSpanValues[Arg0T](arg0, 1) a1 = exec.GetSpanValues[Arg1T](arg1, 1) outData = exec.GetSpanValues[OutT](out, 1) ) return ops.arrArr(ctx, a0, a1, outData) } arrayScalar := func(ctx *exec.KernelCtx, arg0 *exec.ArraySpan, arg1 scalar.Scalar, out *exec.ExecResult) error { var ( a0 = exec.GetSpanValues[Arg0T](arg0, 1) a1 = UnboxScalar[Arg1T](arg1.(scalar.PrimitiveScalar)) outData = exec.GetSpanValues[OutT](out, 1) ) return ops.arrScalar(ctx, a0, a1, outData) } scalarArray := func(ctx *exec.KernelCtx, arg0 scalar.Scalar, arg1 *exec.ArraySpan, out *exec.ExecResult) error { var ( a0 = UnboxScalar[Arg0T](arg0.(scalar.PrimitiveScalar)) a1 = exec.GetSpanValues[Arg1T](arg1, 1) outData = exec.GetSpanValues[OutT](out, 1) ) return ops.scalarArr(ctx, a0, a1, outData) } return func(ctx *exec.KernelCtx, batch *exec.ExecSpan, out *exec.ExecResult) error { if batch.Values[0].IsArray() { if batch.Values[1].IsArray() { return arrayArray(ctx, &batch.Values[0].Array, &batch.Values[1].Array, out) } return arrayScalar(ctx, &batch.Values[0].Array, batch.Values[1].Scalar, out) } if batch.Values[1].IsArray() { return scalarArray(ctx, batch.Values[0].Scalar, &batch.Values[1].Array, out) } debug.Assert(false, "should be unreachable") return fmt.Errorf("%w: scalar binary with two scalars?", arrow.ErrInvalid) } } func ScalarBinaryBools(ops *binaryBoolOps) exec.ArrayKernelExec { arrayArray := func(ctx *exec.KernelCtx, arg0, arg1 *exec.ArraySpan, out *exec.ExecResult) error { var ( a0Bm = bitutil.Bitmap{Data: arg0.Buffers[1].Buf, Offset: arg0.Offset, Len: arg0.Len} a1Bm = bitutil.Bitmap{Data: arg1.Buffers[1].Buf, Offset: arg1.Offset, Len: arg1.Len} outBm = bitutil.Bitmap{Data: out.Buffers[1].Buf, Offset: out.Offset, Len: out.Len} ) return ops.arrArr(ctx, a0Bm, a1Bm, outBm) } arrayScalar := func(ctx *exec.KernelCtx, arg0 *exec.ArraySpan, arg1 scalar.Scalar, out *exec.ExecResult) error { var ( a0Bm = bitutil.Bitmap{Data: arg0.Buffers[1].Buf, Offset: arg0.Offset, Len: arg0.Len} a1 = arg1.(*scalar.Boolean).Value outBm = bitutil.Bitmap{Data: out.Buffers[1].Buf, Offset: out.Offset, Len: out.Len} ) return ops.arrScalar(ctx, a0Bm, a1, outBm) } scalarArray := func(ctx *exec.KernelCtx, arg0 scalar.Scalar, arg1 *exec.ArraySpan, out *exec.ExecResult) error { var ( a0 = arg0.(*scalar.Boolean).Value a1Bm = bitutil.Bitmap{Data: arg1.Buffers[1].Buf, Offset: arg1.Offset, Len: arg1.Len} outBm = bitutil.Bitmap{Data: out.Buffers[1].Buf, Offset: out.Offset, Len: out.Len} ) return ops.scalarArr(ctx, a0, a1Bm, outBm) } return func(ctx *exec.KernelCtx, batch *exec.ExecSpan, out *exec.ExecResult) error { if batch.Values[0].IsArray() { if batch.Values[1].IsArray() { return arrayArray(ctx, &batch.Values[0].Array, &batch.Values[1].Array, out) } return arrayScalar(ctx, &batch.Values[0].Array, batch.Values[1].Scalar, out) } if batch.Values[1].IsArray() { return scalarArray(ctx, batch.Values[0].Scalar, &batch.Values[1].Array, out) } debug.Assert(false, "should be unreachable") return fmt.Errorf("%w: scalar binary with two scalars?", arrow.ErrInvalid) } } func ScalarBinaryNotNull[OutT, Arg0T, Arg1T arrow.FixedWidthType](op func(*exec.KernelCtx, Arg0T, Arg1T, *error) OutT) exec.ArrayKernelExec { arrayArray := func(ctx *exec.KernelCtx, arg0, arg1 *exec.ArraySpan, out *exec.ExecResult) (err error) { // fast path if one side is entirely null if arg0.UpdateNullCount() == arg0.Len || arg1.UpdateNullCount() == arg1.Len { return nil } var ( a0 = exec.GetSpanValues[Arg0T](arg0, 1) a1 = exec.GetSpanValues[Arg1T](arg1, 1) outData = exec.GetSpanValues[OutT](out, 1) outPos int64 def OutT ) bitutils.VisitTwoBitBlocks(arg0.Buffers[0].Buf, arg1.Buffers[0].Buf, arg0.Offset, arg1.Offset, out.Len, func(pos int64) { outData[outPos] = op(ctx, a0[pos], a1[pos], &err) outPos++ }, func() { outData[outPos] = def outPos++ }) return } arrayScalar := func(ctx *exec.KernelCtx, arg0 *exec.ArraySpan, arg1 scalar.Scalar, out *exec.ExecResult) (err error) { // fast path if one side is entirely null if arg0.UpdateNullCount() == arg0.Len || !arg1.IsValid() { return nil } var ( a0 = exec.GetSpanValues[Arg0T](arg0, 1) outData = exec.GetSpanValues[OutT](out, 1) outPos int64 def OutT ) if !arg1.IsValid() { return nil } a1 := UnboxScalar[Arg1T](arg1.(scalar.PrimitiveScalar)) bitutils.VisitBitBlocks(arg0.Buffers[0].Buf, arg0.Offset, arg0.Len, func(pos int64) { outData[outPos] = op(ctx, a0[pos], a1, &err) outPos++ }, func() { outData[outPos] = def outPos++ }) return } scalarArray := func(ctx *exec.KernelCtx, arg0 scalar.Scalar, arg1 *exec.ArraySpan, out *exec.ExecResult) (err error) { // fast path if one side is entirely null if arg1.UpdateNullCount() == arg1.Len || !arg0.IsValid() { return nil } var ( a1 = exec.GetSpanValues[Arg1T](arg1, 1) outData = exec.GetSpanValues[OutT](out, 1) outPos int64 def OutT ) if !arg0.IsValid() { return nil } a0 := UnboxScalar[Arg0T](arg0.(scalar.PrimitiveScalar)) bitutils.VisitBitBlocks(arg1.Buffers[0].Buf, arg1.Offset, arg1.Len, func(pos int64) { outData[outPos] = op(ctx, a0, a1[pos], &err) outPos++ }, func() { outData[outPos] = def outPos++ }) return } return func(ctx *exec.KernelCtx, batch *exec.ExecSpan, out *exec.ExecResult) error { if batch.Values[0].IsArray() { if batch.Values[1].IsArray() { return arrayArray(ctx, &batch.Values[0].Array, &batch.Values[1].Array, out) } return arrayScalar(ctx, &batch.Values[0].Array, batch.Values[1].Scalar, out) } if batch.Values[1].IsArray() { return scalarArray(ctx, batch.Values[0].Scalar, &batch.Values[1].Array, out) } debug.Assert(false, "should be unreachable") return fmt.Errorf("%w: scalar binary with two scalars?", arrow.ErrInvalid) } } type binaryBinOp[T arrow.FixedWidthType | bool] func(ctx *exec.KernelCtx, arg0, arg1 []byte) T func ScalarBinaryBinaryArgsBoolOut(itrFn func(*exec.ArraySpan) exec.ArrayIter[[]byte], op binaryBinOp[bool]) exec.ArrayKernelExec { arrArr := func(ctx *exec.KernelCtx, arg0, arg1 *exec.ArraySpan, out *exec.ExecResult) error { var ( arg0It = itrFn(arg0) arg1It = itrFn(arg1) ) bitutils.GenerateBitsUnrolled(out.Buffers[1].Buf, out.Offset, out.Len, func() bool { return op(ctx, arg0It.Next(), arg1It.Next()) }) return nil } arrScalar := func(ctx *exec.KernelCtx, arg0 *exec.ArraySpan, arg1 scalar.Scalar, out *exec.ExecResult) error { var ( arg0It = itrFn(arg0) a1 = UnboxBinaryScalar(arg1.(scalar.BinaryScalar)) ) bitutils.GenerateBitsUnrolled(out.Buffers[1].Buf, out.Offset, out.Len, func() bool { return op(ctx, arg0It.Next(), a1) }) return nil } scalarArr := func(ctx *exec.KernelCtx, arg0 scalar.Scalar, arg1 *exec.ArraySpan, out *exec.ExecResult) error { var ( arg1It = itrFn(arg1) a0 = UnboxBinaryScalar(arg0.(scalar.BinaryScalar)) ) bitutils.GenerateBitsUnrolled(out.Buffers[1].Buf, out.Offset, out.Len, func() bool { return op(ctx, a0, arg1It.Next()) }) return nil } return func(ctx *exec.KernelCtx, batch *exec.ExecSpan, out *exec.ExecResult) error { if batch.Values[0].IsArray() { if batch.Values[1].IsArray() { return arrArr(ctx, &batch.Values[0].Array, &batch.Values[1].Array, out) } return arrScalar(ctx, &batch.Values[0].Array, batch.Values[1].Scalar, out) } if batch.Values[1].IsArray() { return scalarArr(ctx, batch.Values[0].Scalar, &batch.Values[1].Array, out) } debug.Assert(false, "should be unreachable") return fmt.Errorf("%w: scalar binary with two scalars?", arrow.ErrInvalid) } } // SizeOf determines the size in number of bytes for an integer // based on the generic value in a way that the compiler should // be able to easily evaluate and create as a constant. func SizeOf[T constraints.Integer]() uint { x := uint16(1 << 8) y := uint32(2 << 16) z := uint64(4 << 32) return 1 + uint(T(x))>>8 + uint(T(y))>>16 + uint(T(z))>>32 } // MinOf returns the minimum value for a given type since there is not // currently a generic way to do this with Go generics yet. func MinOf[T constraints.Integer]() T { if ones := ^T(0); ones < 0 { return ones << (8*SizeOf[T]() - 1) } return 0 } // MaxOf determines the max value for a given type since there is not // currently a generic way to do this for Go generics yet as all of the // math.Max/Min values are constants. func MaxOf[T constraints.Integer]() T { ones := ^T(0) if ones < 0 { return ones ^ (ones << (8*SizeOf[T]() - 1)) } return ones } func getSafeMinSameSign[I, O constraints.Integer]() I { if SizeOf[I]() > SizeOf[O]() { return I(MinOf[O]()) } return MinOf[I]() } func getSafeMaxSameSign[I, O constraints.Integer]() I { if SizeOf[I]() > SizeOf[O]() { return I(MaxOf[O]()) } return MaxOf[I]() } func getSafeMaxSignedUnsigned[I constraints.Signed, O constraints.Unsigned]() I { if SizeOf[I]() <= SizeOf[O]() { return MaxOf[I]() } return I(MaxOf[O]()) } func getSafeMaxUnsignedSigned[I constraints.Unsigned, O constraints.Signed]() I { if SizeOf[I]() < SizeOf[O]() { return MaxOf[I]() } return I(MaxOf[O]()) } func getSafeMinMaxSigned[T constraints.Signed](target arrow.Type) (min, max T) { switch target { case arrow.UINT8: min, max = 0, getSafeMaxSignedUnsigned[T, uint8]() case arrow.UINT16: min, max = 0, getSafeMaxSignedUnsigned[T, uint16]() case arrow.UINT32: min, max = 0, getSafeMaxSignedUnsigned[T, uint32]() case arrow.UINT64: min, max = 0, getSafeMaxSignedUnsigned[T, uint64]() case arrow.INT8: min = getSafeMinSameSign[T, int8]() max = getSafeMaxSameSign[T, int8]() case arrow.INT16: min = getSafeMinSameSign[T, int16]() max = getSafeMaxSameSign[T, int16]() case arrow.INT32: min = getSafeMinSameSign[T, int32]() max = getSafeMaxSameSign[T, int32]() case arrow.INT64: min = getSafeMinSameSign[T, int64]() max = getSafeMaxSameSign[T, int64]() } return } func getSafeMinMaxUnsigned[T constraints.Unsigned](target arrow.Type) (min, max T) { min = 0 switch target { case arrow.UINT8: max = getSafeMaxSameSign[T, uint8]() case arrow.UINT16: max = getSafeMaxSameSign[T, uint16]() case arrow.UINT32: max = getSafeMaxSameSign[T, uint32]() case arrow.UINT64: max = getSafeMaxSameSign[T, uint64]() case arrow.INT8: max = getSafeMaxUnsignedSigned[T, int8]() case arrow.INT16: max = getSafeMaxUnsignedSigned[T, int16]() case arrow.INT32: max = getSafeMaxUnsignedSigned[T, int32]() case arrow.INT64: max = getSafeMaxUnsignedSigned[T, int64]() } return } func intsCanFit(data *exec.ArraySpan, target arrow.Type) error { if !arrow.IsInteger(target) { return fmt.Errorf("%w: target type is not an integer type %s", arrow.ErrInvalid, target) } switch data.Type.ID() { case arrow.INT8: min, max := getSafeMinMaxSigned[int8](target) return intsInRange(data, min, max) case arrow.UINT8: min, max := getSafeMinMaxUnsigned[uint8](target) return intsInRange(data, min, max) case arrow.INT16: min, max := getSafeMinMaxSigned[int16](target) return intsInRange(data, min, max) case arrow.UINT16: min, max := getSafeMinMaxUnsigned[uint16](target) return intsInRange(data, min, max) case arrow.INT32: min, max := getSafeMinMaxSigned[int32](target) return intsInRange(data, min, max) case arrow.UINT32: min, max := getSafeMinMaxUnsigned[uint32](target) return intsInRange(data, min, max) case arrow.INT64: min, max := getSafeMinMaxSigned[int64](target) return intsInRange(data, min, max) case arrow.UINT64: min, max := getSafeMinMaxUnsigned[uint64](target) return intsInRange(data, min, max) default: return fmt.Errorf("%w: invalid type for int bounds checking", arrow.ErrInvalid) } } func intsInRange[T arrow.IntType | arrow.UintType](data *exec.ArraySpan, lowerBound, upperBound T) error { if MinOf[T]() >= lowerBound && MaxOf[T]() <= upperBound { return nil } isOutOfBounds := func(val T) bool { return val < lowerBound || val > upperBound } isOutOfBoundsMaybeNull := func(val T, isValid bool) bool { return isValid && (val < lowerBound || val > upperBound) } getError := func(val T) error { return fmt.Errorf("%w: integer value %d not in range: %d to %d", arrow.ErrInvalid, val, lowerBound, upperBound) } values := exec.GetSpanValues[T](data, 1) bitmap := data.Buffers[0].Buf bitCounter := bitutils.NewOptionalBitBlockCounter(bitmap, data.Offset, data.Len) pos, offsetPos := 0, data.Offset for pos < int(data.Len) { block := bitCounter.NextBlock() outOfBounds := false if block.Popcnt == block.Len { // fast path: branchless i := 0 for chunk := 0; chunk < int(block.Len)/8; chunk++ { for j := 0; j < 8; j++ { outOfBounds = outOfBounds || isOutOfBounds(values[i]) i++ } } for ; i < int(block.Len); i++ { outOfBounds = outOfBounds || isOutOfBounds(values[i]) } } else if block.Popcnt > 0 { // values may be null, only bounds check non-null vals i := 0 for chunk := 0; chunk < int(block.Len)/8; chunk++ { for j := 0; j < 8; j++ { outOfBounds = outOfBounds || isOutOfBoundsMaybeNull( values[i], bitutil.BitIsSet(bitmap, int(offsetPos)+i)) i++ } } for ; i < int(block.Len); i++ { outOfBounds = outOfBounds || isOutOfBoundsMaybeNull( values[i], bitutil.BitIsSet(bitmap, int(offsetPos)+i)) } } if outOfBounds { if data.Nulls > 0 { for i := 0; i < int(block.Len); i++ { if isOutOfBoundsMaybeNull(values[i], bitutil.BitIsSet(bitmap, int(offsetPos)+i)) { return getError(values[i]) } } } else { for i := 0; i < int(block.Len); i++ { if isOutOfBounds(values[i]) { return getError(values[i]) } } } } values = values[block.Len:] pos += int(block.Len) offsetPos += int64(block.Len) } return nil } type numeric interface { arrow.IntType | arrow.UintType | constraints.Float } func memCpySpan[T numeric](in, out *exec.ArraySpan) { inData := exec.GetSpanValues[T](in, 1) outData := exec.GetSpanValues[T](out, 1) copy(outData, inData) } func castNumberMemCpy(in, out *exec.ArraySpan) { switch in.Type.ID() { case arrow.INT8: memCpySpan[int8](in, out) case arrow.UINT8: memCpySpan[uint8](in, out) case arrow.INT16: memCpySpan[int16](in, out) case arrow.UINT16: memCpySpan[uint16](in, out) case arrow.INT32: memCpySpan[int32](in, out) case arrow.UINT32: memCpySpan[uint32](in, out) case arrow.INT64: memCpySpan[int64](in, out) case arrow.UINT64: memCpySpan[uint64](in, out) case arrow.FLOAT32: memCpySpan[float32](in, out) case arrow.FLOAT64: memCpySpan[float64](in, out) } } func castNumberToNumberUnsafe(in, out *exec.ArraySpan) { if in.Type.ID() == out.Type.ID() { castNumberMemCpy(in, out) return } inputOffset := in.Type.(arrow.FixedWidthDataType).Bytes() * int(in.Offset) outputOffset := out.Type.(arrow.FixedWidthDataType).Bytes() * int(out.Offset) castNumericUnsafe(in.Type.ID(), out.Type.ID(), in.Buffers[1].Buf[inputOffset:], out.Buffers[1].Buf[outputOffset:], int(in.Len)) } func MaxDecimalDigitsForInt(id arrow.Type) (int32, error) { switch id { case arrow.INT8, arrow.UINT8: return 3, nil case arrow.INT16, arrow.UINT16: return 5, nil case arrow.INT32, arrow.UINT32: return 10, nil case arrow.INT64: return 19, nil case arrow.UINT64: return 20, nil } return -1, fmt.Errorf("%w: not an integer type: %s", arrow.ErrInvalid, id) } func ResolveOutputFromOptions(ctx *exec.KernelCtx, _ []arrow.DataType) (arrow.DataType, error) { opts := ctx.State.(CastState) return opts.ToType, nil } var OutputTargetType = exec.NewComputedOutputType(ResolveOutputFromOptions) var OutputFirstType = exec.NewComputedOutputType(func(_ *exec.KernelCtx, args []arrow.DataType) (arrow.DataType, error) { return args[0], nil }) var OutputLastType = exec.NewComputedOutputType(func(_ *exec.KernelCtx, args []arrow.DataType) (arrow.DataType, error) { return args[len(args)-1], nil }) func resolveDecimalBinaryOpOutput(types []arrow.DataType, resolver func(prec1, scale1, prec2, scale2 int32) (prec, scale int32)) (arrow.DataType, error) { leftType, rightType := types[0].(arrow.DecimalType), types[1].(arrow.DecimalType) debug.Assert(leftType.ID() == rightType.ID(), "decimal binary ops should have casted to the same type") prec, scale := resolver(leftType.GetPrecision(), leftType.GetScale(), rightType.GetPrecision(), rightType.GetScale()) return arrow.NewDecimalType(leftType.ID(), prec, scale) } func resolveDecimalAddOrSubtractType(_ *exec.KernelCtx, args []arrow.DataType) (arrow.DataType, error) { return resolveDecimalBinaryOpOutput(args, func(prec1, scale1, prec2, scale2 int32) (prec int32, scale int32) { debug.Assert(scale1 == scale2, "decimal operations should use the same scale") scale = scale1 prec = exec.Max(prec1-scale1, prec2-scale2) + scale + 1 return }) } func resolveDecimalMultiplyOutput(_ *exec.KernelCtx, args []arrow.DataType) (arrow.DataType, error) { return resolveDecimalBinaryOpOutput(args, func(prec1, scale1, prec2, scale2 int32) (prec int32, scale int32) { scale = scale1 + scale2 prec = prec1 + prec2 + 1 return }) } func resolveDecimalDivideOutput(_ *exec.KernelCtx, args []arrow.DataType) (arrow.DataType, error) { return resolveDecimalBinaryOpOutput(args, func(prec1, scale1, prec2, scale2 int32) (prec int32, scale int32) { debug.Assert(scale1 >= scale2, "when dividing decimal values numerator scale should be greater/equal to denom scale") scale = scale1 - scale2 prec = prec1 return }) } func resolveTemporalOutput(_ *exec.KernelCtx, args []arrow.DataType) (arrow.DataType, error) { debug.Assert(args[0].ID() == args[1].ID(), "should only be used on the same types") leftType, rightType := args[0].(*arrow.TimestampType), args[1].(*arrow.TimestampType) debug.Assert(leftType.Unit == rightType.Unit, "should match units") if (leftType.TimeZone == "" || rightType.TimeZone == "") && (leftType.TimeZone != rightType.TimeZone) { return nil, fmt.Errorf("%w: subtraction of zoned and non-zoned times is ambiguous (%s, %s)", arrow.ErrInvalid, leftType.TimeZone, rightType.TimeZone) } return &arrow.DurationType{Unit: rightType.Unit}, nil } var OutputResolveTemporal = exec.NewComputedOutputType(resolveTemporalOutput) type validityBuilder struct { mem memory.Allocator buffer *memory.Buffer data []byte bitLength int falseCount int } func (v *validityBuilder) Resize(n int64) { if v.buffer == nil { v.buffer = memory.NewResizableBuffer(v.mem) } v.buffer.ResizeNoShrink(int(bitutil.BytesForBits(n))) v.data = v.buffer.Bytes() } func (v *validityBuilder) Reserve(n int64) { if v.buffer == nil { v.buffer = memory.NewResizableBuffer(v.mem) } v.buffer.Reserve(v.buffer.Cap() + int(bitutil.BytesForBits(n))) v.data = v.buffer.Buf() } func (v *validityBuilder) UnsafeAppend(val bool) { bitutil.SetBitTo(v.data, v.bitLength, val) if !val { v.falseCount++ } v.bitLength++ } func (v *validityBuilder) UnsafeAppendN(n int64, val bool) { bitutil.SetBitsTo(v.data, int64(v.bitLength), n, val) if !val { v.falseCount += int(n) } v.bitLength += int(n) } func (v *validityBuilder) Append(val bool) { v.Reserve(1) v.UnsafeAppend(val) } func (v *validityBuilder) AppendN(n int64, val bool) { v.Reserve(n) v.UnsafeAppendN(n, val) } func (v *validityBuilder) Finish() (buf *memory.Buffer) { if v.bitLength > 0 { v.buffer.Resize(int(bitutil.BytesForBits(int64(v.bitLength)))) } v.bitLength, v.falseCount = 0, 0 buf = v.buffer v.buffer = nil return } type execBufBuilder struct { mem memory.Allocator buffer *memory.Buffer data []byte sz int } func (bldr *execBufBuilder) reserve(additional int) { if bldr.buffer == nil { bldr.buffer = memory.NewResizableBuffer(bldr.mem) } mincap := bldr.sz + additional if mincap <= cap(bldr.data) { return } bldr.buffer.ResizeNoShrink(mincap) bldr.data = bldr.buffer.Buf() } func (bldr *execBufBuilder) unsafeAppend(data []byte) { copy(bldr.data[bldr.sz:], data) bldr.sz += len(data) } func (bldr *execBufBuilder) finish() (buf *memory.Buffer) { if bldr.buffer == nil { buf = memory.NewBufferBytes(nil) return } bldr.buffer.Resize(bldr.sz) buf = bldr.buffer bldr.buffer, bldr.sz = nil, 0 return } type bufferBuilder[T arrow.FixedWidthType] struct { execBufBuilder zero T } func newBufferBuilder[T arrow.FixedWidthType](mem memory.Allocator) *bufferBuilder[T] { return &bufferBuilder[T]{ execBufBuilder: execBufBuilder{ mem: mem, }, } } func (b *bufferBuilder[T]) reserve(additional int) { b.execBufBuilder.reserve(additional * int(unsafe.Sizeof(b.zero))) } func (b *bufferBuilder[T]) unsafeAppend(value T) { b.execBufBuilder.unsafeAppend(arrow.GetBytes([]T{value})) } func (b *bufferBuilder[T]) unsafeAppendSlice(values []T) { b.execBufBuilder.unsafeAppend(arrow.GetBytes(values)) } func (b *bufferBuilder[T]) len() int { return b.sz / int(unsafe.Sizeof(b.zero)) } func (b *bufferBuilder[T]) cap() int { return cap(b.data) / int(unsafe.Sizeof(b.zero)) } func checkIndexBoundsImpl[T arrow.IntType | arrow.UintType](values *exec.ArraySpan, upperLimit uint64) error { // for unsigned integers, if the values array is larger // than the maximum index value, then there's no need to bounds check isSigned := !arrow.IsUnsignedInteger(values.Type.ID()) if !isSigned && upperLimit > uint64(MaxOf[T]()) { return nil } valuesData := exec.GetSpanValues[T](values, 1) bitmap := values.Buffers[0].Buf isOutOfBounds := func(val T) bool { return ((isSigned && val < 0) || val >= 0 && uint64(val) >= upperLimit) } return bitutils.VisitSetBitRuns(bitmap, values.Offset, values.Len, func(pos, length int64) error { outOfBounds := false for i := int64(0); i < length; i++ { outOfBounds = outOfBounds || isOutOfBounds(valuesData[pos+i]) } if outOfBounds { for i := int64(0); i < length; i++ { if isOutOfBounds(valuesData[pos+i]) { return fmt.Errorf("%w: %d out of bounds", arrow.ErrIndex, valuesData[pos+i]) } } } return nil }) } func checkIndexBounds(values *exec.ArraySpan, upperLimit uint64) error { switch values.Type.ID() { case arrow.INT8: return checkIndexBoundsImpl[int8](values, upperLimit) case arrow.UINT8: return checkIndexBoundsImpl[uint8](values, upperLimit) case arrow.INT16: return checkIndexBoundsImpl[int16](values, upperLimit) case arrow.UINT16: return checkIndexBoundsImpl[uint16](values, upperLimit) case arrow.INT32: return checkIndexBoundsImpl[int32](values, upperLimit) case arrow.UINT32: return checkIndexBoundsImpl[uint32](values, upperLimit) case arrow.INT64: return checkIndexBoundsImpl[int64](values, upperLimit) case arrow.UINT64: return checkIndexBoundsImpl[uint64](values, upperLimit) default: return fmt.Errorf("%w: invalid index type for bounds checking", arrow.ErrInvalid) } } func checkIndexBoundsChunked(values *arrow.Chunked, upperLimit uint64) error { var span exec.ArraySpan for _, v := range values.Chunks() { span.SetMembers(v.Data()) if err := checkIndexBounds(&span, upperLimit); err != nil { return err } } return nil } func packBits(vals [32]uint32, out []byte) { const batchSize = 32 for i := 0; i < batchSize; i += 8 { out[0] = byte(vals[i] | vals[i+1]<<1 | vals[i+2]<<2 | vals[i+3]<<3 | vals[i+4]<<4 | vals[i+5]<<5 | vals[i+6]<<6 | vals[i+7]<<7) out = out[1:] } }