arrow/scalar/scalar.go (902 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package scalar
import (
"encoding/binary"
"fmt"
"hash/maphash"
"math"
"math/big"
"reflect"
"strconv"
"unsafe"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/bitutil"
"github.com/apache/arrow-go/v18/arrow/decimal128"
"github.com/apache/arrow-go/v18/arrow/decimal256"
"github.com/apache/arrow-go/v18/arrow/encoded"
"github.com/apache/arrow-go/v18/arrow/endian"
"github.com/apache/arrow-go/v18/arrow/float16"
"github.com/apache/arrow-go/v18/arrow/internal/debug"
"github.com/apache/arrow-go/v18/arrow/memory"
"golang.org/x/xerrors"
)
// Scalar represents a single value of a specific DataType as opposed to
// an array.
//
// Scalars are useful for passing single value inputs to compute functions
// (not yet implemented) or for representing individual array elements,
// (with a non-trivial cost though).
type Scalar interface {
fmt.Stringer
// IsValid returns true if the value is non-null, otherwise false.
IsValid() bool
// The datatype of the value in this scalar
DataType() arrow.DataType
// Performs cheap validation checks, returns nil if successful
Validate() error
// Perform more expensive validation checks, returns nil if successful
ValidateFull() error
// Cast the value to the desired DataType (returns an error if unable to do so)
// should take semantics into account and modify the value accordingly.
CastTo(arrow.DataType) (Scalar, error)
// internal only functions for delegation
value() interface{}
equals(Scalar) bool
}
type Releasable interface {
Release()
Retain()
}
func validateOptional(s *scalar, value interface{}, valueDesc string) error {
if s.Valid && value == nil {
return fmt.Errorf("%s scalar is marked valid but doesn't have a %s", s.Type, valueDesc)
}
if !s.Valid && value != nil && !reflect.ValueOf(value).IsNil() {
return fmt.Errorf("%s scalar is marked null but has a %s", s.Type, valueDesc)
}
return nil
}
type scalar struct {
Type arrow.DataType
Valid bool
}
func (s *scalar) String() string {
if !s.Valid {
return "null"
}
return "..."
}
func (s *scalar) IsValid() bool { return s.Valid }
func (s *scalar) Validate() error {
if s.Type == nil {
return xerrors.New("scalar lacks a type")
}
return nil
}
func (s *scalar) ValidateFull() error {
return s.Validate()
}
func (s scalar) DataType() arrow.DataType { return s.Type }
type Null struct {
scalar
}
// by the time we get here we already know that the rhs is the right type
func (n *Null) equals(s Scalar) bool {
debug.Assert(s.DataType().ID() == arrow.NULL, "scalar null equals should only receive null")
return true
}
func (n *Null) value() interface{} { return nil }
func (n *Null) CastTo(dt arrow.DataType) (Scalar, error) {
return MakeNullScalar(dt), nil
}
func (n *Null) Validate() (err error) {
err = n.scalar.Validate()
if err != nil {
return
}
if n.Valid {
err = xerrors.New("null scalar should have Valid = false")
}
return
}
func (n *Null) ValidateFull() error { return n.Validate() }
var (
ScalarNull *Null = &Null{scalar{Type: arrow.Null, Valid: false}}
)
type PrimitiveScalar interface {
Scalar
Data() []byte
}
type Boolean struct {
scalar
Value bool
}
// by the time we get here we already know that the rhs is the right type
func (n *Boolean) equals(rhs Scalar) bool {
return n.Value == rhs.(*Boolean).Value
}
func (s *Boolean) value() interface{} { return s.Value }
func (s *Boolean) Data() []byte {
return (*[1]byte)(unsafe.Pointer(&s.Value))[:]
}
func (s *Boolean) String() string {
if !s.Valid {
return "null"
}
val, err := s.CastTo(arrow.BinaryTypes.String)
if err != nil {
return "..."
}
return string(val.(*String).Value.Bytes())
}
func (s *Boolean) CastTo(dt arrow.DataType) (Scalar, error) {
if !s.Valid {
return MakeNullScalar(dt), nil
}
if dt.ID() == arrow.STRING {
return NewStringScalar(strconv.FormatBool(s.Value)), nil
}
val := 0
if s.Value {
val = 1
}
switch dt.ID() {
case arrow.UINT8:
return NewUint8Scalar(uint8(val)), nil
case arrow.INT8:
return NewInt8Scalar(int8(val)), nil
case arrow.UINT16:
return NewUint16Scalar(uint16(val)), nil
case arrow.INT16:
return NewInt16Scalar(int16(val)), nil
case arrow.UINT32:
return NewUint32Scalar(uint32(val)), nil
case arrow.INT32:
return NewInt32Scalar(int32(val)), nil
case arrow.UINT64:
return NewUint64Scalar(uint64(val)), nil
case arrow.INT64:
return NewInt64Scalar(int64(val)), nil
case arrow.FLOAT16:
return NewFloat16Scalar(float16.New(float32(val))), nil
case arrow.FLOAT32:
return NewFloat32Scalar(float32(val)), nil
case arrow.FLOAT64:
return NewFloat64Scalar(float64(val)), nil
default:
return nil, fmt.Errorf("invalid scalar cast from type bool to type %s", dt)
}
}
func NewBooleanScalar(val bool) *Boolean {
return &Boolean{scalar{arrow.FixedWidthTypes.Boolean, true}, val}
}
type Float16 struct {
scalar
Value float16.Num
}
func (s *Float16) value() interface{} { return s.Value }
func (f *Float16) Data() []byte {
return (*[arrow.Float16SizeBytes]byte)(unsafe.Pointer(&f.Value))[:]
}
func (f *Float16) equals(rhs Scalar) bool {
return f.Value == rhs.(*Float16).Value
}
func (f *Float16) CastTo(to arrow.DataType) (Scalar, error) {
if !f.Valid {
return MakeNullScalar(to), nil
}
if r, ok := numericMap[to.ID()]; ok {
return convertToNumeric(reflect.ValueOf(f.Value.Float32()), r.valueType, r.scalarFunc), nil
}
if to.ID() == arrow.BOOL {
return NewBooleanScalar(f.Value.Uint16() != 0), nil
} else if to.ID() == arrow.STRING {
return NewStringScalar(f.Value.String()), nil
}
return nil, fmt.Errorf("cannot cast non-null float16 scalar to type %s", to)
}
func (s *Float16) String() string {
if !s.Valid {
return "null"
}
val, err := s.CastTo(arrow.BinaryTypes.String)
if err != nil {
return "..."
}
return string(val.(*String).Value.Bytes())
}
func NewFloat16ScalarFromFloat32(val float32) *Float16 {
return NewFloat16Scalar(float16.New(val))
}
func NewFloat16Scalar(val float16.Num) *Float16 {
return &Float16{scalar{arrow.FixedWidthTypes.Float16, true}, val}
}
type Decimal128 struct {
scalar
Value decimal128.Num
}
func (s *Decimal128) Data() []byte {
return (*[arrow.Decimal128SizeBytes]byte)(unsafe.Pointer(&s.Value))[:]
}
func (s *Decimal128) value() interface{} { return s.Value }
func (s *Decimal128) String() string {
if !s.Valid {
return "null"
}
val, err := s.CastTo(arrow.BinaryTypes.String)
if err != nil {
return "..."
}
return string(val.(*String).Value.Bytes())
}
func (s *Decimal128) equals(rhs Scalar) bool {
return s.Value == rhs.(*Decimal128).Value
}
func (s *Decimal128) CastTo(to arrow.DataType) (Scalar, error) {
if !s.Valid {
return MakeNullScalar(to), nil
}
dt := s.Type.(*arrow.Decimal128Type)
switch to.ID() {
case arrow.DECIMAL128:
to := to.(*arrow.Decimal128Type)
newVal, err := s.Value.Rescale(dt.Scale, to.Scale)
if err != nil {
return nil, err
}
if !newVal.FitsInPrecision(to.Precision) {
return nil, fmt.Errorf("decimal128 value %v will not fit in new precision %d", newVal, to.Precision)
}
return NewDecimal128Scalar(newVal, to), nil
case arrow.DECIMAL256:
to := to.(*arrow.Decimal256Type)
newVal, err := decimal256.FromDecimal128(s.Value).Rescale(dt.Scale, to.Scale)
if err != nil {
return nil, err
}
if !newVal.FitsInPrecision(to.Precision) {
return nil, fmt.Errorf("decimal256 value %v will not fit in new precision %d", newVal, to.Precision)
}
return NewDecimal256Scalar(newVal, to), nil
case arrow.STRING:
dt := s.Type.(*arrow.Decimal128Type)
scale := big.NewFloat(math.Pow10(int(dt.Scale)))
val := (&big.Float{}).SetInt(s.Value.BigInt())
return NewStringScalar(val.Quo(val, scale).Text('g', int(dt.Precision))), nil
}
return nil, fmt.Errorf("cannot cast non-nil decimal128 scalar to type %s", to)
}
func NewDecimal128Scalar(val decimal128.Num, typ arrow.DataType) *Decimal128 {
return &Decimal128{scalar{typ, true}, val}
}
type Decimal256 struct {
scalar
Value decimal256.Num
}
func (s *Decimal256) Data() []byte {
return (*[arrow.Decimal256SizeBytes]byte)(unsafe.Pointer(&s.Value))[:]
}
func (s *Decimal256) value() interface{} { return s.Value }
func (s *Decimal256) String() string {
if !s.Valid {
return "null"
}
val, err := s.CastTo(arrow.BinaryTypes.String)
if err != nil {
return "..."
}
return string(val.(*String).Value.Bytes())
}
func (s *Decimal256) equals(rhs Scalar) bool {
return s.Value == rhs.(*Decimal256).Value
}
func (s *Decimal256) CastTo(to arrow.DataType) (Scalar, error) {
if !s.Valid {
return MakeNullScalar(to), nil
}
dt := s.Type.(*arrow.Decimal256Type)
switch to.ID() {
case arrow.DECIMAL256:
to := to.(*arrow.Decimal256Type)
newVal, err := s.Value.Rescale(dt.Scale, to.Scale)
if err != nil {
return nil, err
}
if !newVal.FitsInPrecision(to.Precision) {
return nil, fmt.Errorf("decimal256 value %v will not fit in new precision %d", newVal, to.Precision)
}
return NewDecimal256Scalar(newVal, to), nil
case arrow.STRING:
scale := big.NewFloat(math.Pow10(int(dt.Scale)))
val := (&big.Float{}).SetInt(s.Value.BigInt())
return NewStringScalar(val.Quo(val, scale).Text('g', int(dt.Precision))), nil
}
return nil, fmt.Errorf("cannot cast non-nil decimal256 scalar to type %s", to)
}
func NewDecimal256Scalar(val decimal256.Num, typ arrow.DataType) *Decimal256 {
return &Decimal256{scalar{typ, true}, val}
}
type Extension struct {
scalar
Value Scalar
}
func (s *Extension) value() interface{} { return s.Value }
func (s *Extension) equals(rhs Scalar) bool {
return Equals(s.Value, rhs.(*Extension).Value)
}
func (e *Extension) Validate() (err error) {
if err = e.scalar.Validate(); err != nil {
return err
}
if !e.Valid {
if e.Value != nil {
err = fmt.Errorf("null %s scalar has storage value", e.Type)
}
return
}
switch {
case e.Value == nil:
err = fmt.Errorf("non-null %s scalar doesn't have a storage value", e.Type)
case !e.Value.IsValid():
err = fmt.Errorf("non-null %s scalar has a null storage value", e.Type)
default:
if err = e.Value.Validate(); err != nil {
err = fmt.Errorf("%s scalar fails validation for storage value: %w", e.Type, err)
}
}
return
}
func (e *Extension) ValidateFull() error {
if err := e.Validate(); err != nil {
return err
}
if e.Valid {
return e.Value.ValidateFull()
}
return nil
}
func (s *Extension) CastTo(to arrow.DataType) (Scalar, error) {
if !s.Valid {
return MakeNullScalar(to), nil
}
if arrow.TypeEqual(s.Type, to) {
return s, nil
}
return nil, fmt.Errorf("cannot cast non-null extension scalar of type %s to type %s", s.Type, to)
}
func (s *Extension) String() string {
if !s.Valid {
return "null"
}
val, err := s.CastTo(arrow.BinaryTypes.String)
if err != nil {
return "..."
}
return string(val.(*String).Value.Bytes())
}
func NewExtensionScalar(storage Scalar, typ arrow.DataType) *Extension {
return &Extension{scalar{typ, true}, storage}
}
func convertToNumeric(v reflect.Value, to reflect.Type, fn reflect.Value) Scalar {
return fn.Call([]reflect.Value{v.Convert(to)})[0].Interface().(Scalar)
}
// MakeNullScalar creates a scalar value of the desired type representing a null value
func MakeNullScalar(dt arrow.DataType) Scalar {
return makeNullFn[byte(dt.ID()&0x3f)](dt)
}
func invalidScalarType(dt arrow.DataType) Scalar {
panic("invalid scalar type: " + dt.ID().String())
}
type scalarMakeNullFn func(arrow.DataType) Scalar
var makeNullFn [64]scalarMakeNullFn
func init() {
makeNullFn = [...]scalarMakeNullFn{
arrow.NULL: func(dt arrow.DataType) Scalar { return ScalarNull },
arrow.BOOL: func(dt arrow.DataType) Scalar { return &Boolean{scalar: scalar{dt, false}} },
arrow.UINT8: func(dt arrow.DataType) Scalar { return &Uint8{scalar: scalar{dt, false}} },
arrow.INT8: func(dt arrow.DataType) Scalar { return &Int8{scalar: scalar{dt, false}} },
arrow.UINT16: func(dt arrow.DataType) Scalar { return &Uint16{scalar: scalar{dt, false}} },
arrow.INT16: func(dt arrow.DataType) Scalar { return &Int16{scalar: scalar{dt, false}} },
arrow.UINT32: func(dt arrow.DataType) Scalar { return &Uint32{scalar: scalar{dt, false}} },
arrow.INT32: func(dt arrow.DataType) Scalar { return &Int32{scalar: scalar{dt, false}} },
arrow.UINT64: func(dt arrow.DataType) Scalar { return &Uint64{scalar: scalar{dt, false}} },
arrow.INT64: func(dt arrow.DataType) Scalar { return &Int64{scalar: scalar{dt, false}} },
arrow.FLOAT16: func(dt arrow.DataType) Scalar { return &Float16{scalar: scalar{dt, false}} },
arrow.FLOAT32: func(dt arrow.DataType) Scalar { return &Float32{scalar: scalar{dt, false}} },
arrow.FLOAT64: func(dt arrow.DataType) Scalar { return &Float64{scalar: scalar{dt, false}} },
arrow.STRING: func(dt arrow.DataType) Scalar { return &String{&Binary{scalar: scalar{dt, false}}} },
arrow.BINARY: func(dt arrow.DataType) Scalar { return &Binary{scalar: scalar{dt, false}} },
arrow.FIXED_SIZE_BINARY: func(dt arrow.DataType) Scalar { return &FixedSizeBinary{&Binary{scalar: scalar{dt, false}}} },
arrow.DATE32: func(dt arrow.DataType) Scalar { return &Date32{scalar: scalar{dt, false}} },
arrow.DATE64: func(dt arrow.DataType) Scalar { return &Date64{scalar: scalar{dt, false}} },
arrow.TIMESTAMP: func(dt arrow.DataType) Scalar { return &Timestamp{scalar: scalar{dt, false}} },
arrow.TIME32: func(dt arrow.DataType) Scalar { return &Time32{scalar: scalar{dt, false}} },
arrow.TIME64: func(dt arrow.DataType) Scalar { return &Time64{scalar: scalar{dt, false}} },
arrow.INTERVAL_MONTHS: func(dt arrow.DataType) Scalar { return &MonthInterval{scalar: scalar{dt, false}} },
arrow.INTERVAL_DAY_TIME: func(dt arrow.DataType) Scalar { return &DayTimeInterval{scalar: scalar{dt, false}} },
arrow.INTERVAL_MONTH_DAY_NANO: func(dt arrow.DataType) Scalar { return &MonthDayNanoInterval{scalar: scalar{dt, false}} },
arrow.DECIMAL128: func(dt arrow.DataType) Scalar { return &Decimal128{scalar: scalar{dt, false}} },
arrow.LIST: func(dt arrow.DataType) Scalar { return &List{scalar: scalar{dt, false}} },
arrow.STRUCT: func(dt arrow.DataType) Scalar {
typ := dt.(*arrow.StructType)
values := make([]Scalar, typ.NumFields())
for i, f := range typ.Fields() {
values[i] = MakeNullScalar(f.Type)
}
return &Struct{scalar: scalar{dt, false}, Value: values}
},
arrow.SPARSE_UNION: func(dt arrow.DataType) Scalar {
typ := dt.(*arrow.SparseUnionType)
if typ.NumFields() == 0 {
panic("cannot make scalar of empty union type")
}
values := make([]Scalar, typ.NumFields())
for i, f := range typ.Fields() {
values[i] = MakeNullScalar(f.Type)
}
return NewSparseUnionScalar(values, typ.TypeCodes()[0], typ)
},
arrow.DENSE_UNION: func(dt arrow.DataType) Scalar {
typ := dt.(*arrow.DenseUnionType)
if typ.NumFields() == 0 {
panic("cannot make scalar of empty union type")
}
return NewDenseUnionScalar(MakeNullScalar(typ.Fields()[0].Type), typ.TypeCodes()[0], typ)
},
arrow.DICTIONARY: func(dt arrow.DataType) Scalar { return NewNullDictScalar(dt) },
arrow.LARGE_STRING: func(dt arrow.DataType) Scalar { return &LargeString{&String{&Binary{scalar: scalar{dt, false}}}} },
arrow.LARGE_BINARY: func(dt arrow.DataType) Scalar { return &LargeBinary{&Binary{scalar: scalar{dt, false}}} },
arrow.LARGE_LIST: func(dt arrow.DataType) Scalar { return &LargeList{&List{scalar: scalar{dt, false}}} },
arrow.DECIMAL256: func(dt arrow.DataType) Scalar { return &Decimal256{scalar: scalar{dt, false}} },
arrow.MAP: func(dt arrow.DataType) Scalar { return &Map{&List{scalar: scalar{dt, false}}} },
arrow.EXTENSION: func(dt arrow.DataType) Scalar {
return &Extension{scalar: scalar{dt, false}, Value: MakeNullScalar(dt.(arrow.ExtensionType).StorageType())}
},
arrow.FIXED_SIZE_LIST: func(dt arrow.DataType) Scalar { return &FixedSizeList{&List{scalar: scalar{dt, false}}} },
arrow.DURATION: func(dt arrow.DataType) Scalar { return &Duration{scalar: scalar{dt, false}} },
arrow.RUN_END_ENCODED: func(dt arrow.DataType) Scalar { return &RunEndEncoded{scalar: scalar{dt, false}} },
// invalid data types to fill out array size 2^6 - 1
63: invalidScalarType,
}
f := numericMap[arrow.FLOAT16]
f.scalarFunc = reflect.ValueOf(NewFloat16ScalarFromFloat32)
f.valueType = reflect.TypeOf(float32(0))
numericMap[arrow.FLOAT16] = f
}
// GetScalar creates a scalar object from the value at a given index in the
// passed in array, returns an error if unable to do so.
func GetScalar(arr arrow.Array, idx int) (Scalar, error) {
if arr.DataType().ID() != arrow.DICTIONARY && arr.IsNull(idx) {
return MakeNullScalar(arr.DataType()), nil
}
if idx >= arr.Len() {
return nil, fmt.Errorf("%w: called GetScalar with index larger than array len",
arrow.ErrIndex)
}
switch arr := arr.(type) {
case *array.Binary:
buf := memory.NewBufferBytes(arr.Value(idx))
defer buf.Release()
return NewBinaryScalar(buf, arr.DataType()), nil
case *array.LargeBinary:
buf := memory.NewBufferBytes(arr.Value(idx))
defer buf.Release()
return NewLargeBinaryScalar(buf), nil
case *array.Boolean:
return NewBooleanScalar(arr.Value(idx)), nil
case *array.Date32:
return NewDate32Scalar(arr.Value(idx)), nil
case *array.Date64:
return NewDate64Scalar(arr.Value(idx)), nil
case *array.DayTimeInterval:
return NewDayTimeIntervalScalar(arr.Value(idx)), nil
case *array.Decimal128:
return NewDecimal128Scalar(arr.Value(idx), arr.DataType()), nil
case *array.Decimal256:
return NewDecimal256Scalar(arr.Value(idx), arr.DataType()), nil
case *array.Duration:
return NewDurationScalar(arr.Value(idx), arr.DataType()), nil
case array.ExtensionArray:
storage, err := GetScalar(arr.Storage(), idx)
if err != nil {
return nil, err
}
return NewExtensionScalar(storage, arr.DataType()), nil
case *array.FixedSizeBinary:
buf := memory.NewBufferBytes(arr.Value(idx))
defer buf.Release()
return NewFixedSizeBinaryScalar(buf, arr.DataType()), nil
case *array.FixedSizeList:
size := int(arr.DataType().(*arrow.FixedSizeListType).Len())
slice := array.NewSlice(arr.ListValues(), int64(idx*size), int64((idx+1)*size))
defer slice.Release()
return NewFixedSizeListScalarWithType(slice, arr.DataType()), nil
case *array.Float16:
return NewFloat16Scalar(arr.Value(idx)), nil
case *array.Float32:
return NewFloat32Scalar(arr.Value(idx)), nil
case *array.Float64:
return NewFloat64Scalar(arr.Value(idx)), nil
case *array.Int8:
return NewInt8Scalar(arr.Value(idx)), nil
case *array.Int16:
return NewInt16Scalar(arr.Value(idx)), nil
case *array.Int32:
return NewInt32Scalar(arr.Value(idx)), nil
case *array.Int64:
return NewInt64Scalar(arr.Value(idx)), nil
case *array.Uint8:
return NewUint8Scalar(arr.Value(idx)), nil
case *array.Uint16:
return NewUint16Scalar(arr.Value(idx)), nil
case *array.Uint32:
return NewUint32Scalar(arr.Value(idx)), nil
case *array.Uint64:
return NewUint64Scalar(arr.Value(idx)), nil
case *array.List:
offsets := arr.Offsets()
slice := array.NewSlice(arr.ListValues(), int64(offsets[idx]), int64(offsets[idx+1]))
defer slice.Release()
return NewListScalar(slice), nil
case *array.LargeList:
offsets := arr.Offsets()
slice := array.NewSlice(arr.ListValues(), int64(offsets[idx]), int64(offsets[idx+1]))
defer slice.Release()
return NewLargeListScalar(slice), nil
case *array.Map:
offsets := arr.Offsets()
slice := array.NewSlice(arr.ListValues(), int64(offsets[idx]), int64(offsets[idx+1]))
defer slice.Release()
return NewMapScalar(slice), nil
case *array.MonthInterval:
return NewMonthIntervalScalar(arr.Value(idx)), nil
case *array.MonthDayNanoInterval:
return NewMonthDayNanoIntervalScalar(arr.Value(idx)), nil
case *array.Null:
return ScalarNull, nil
case *array.String:
return NewStringScalar(arr.Value(idx)), nil
case *array.LargeString:
return NewLargeStringScalar(arr.Value(idx)), nil
case *array.Struct:
children := make(Vector, arr.NumField())
for i := range children {
child, err := GetScalar(arr.Field(i), idx)
if err != nil {
return nil, err
}
children[i] = child
}
return NewStructScalar(children, arr.DataType()), nil
case *array.Time32:
return NewTime32Scalar(arr.Value(idx), arr.DataType()), nil
case *array.Time64:
return NewTime64Scalar(arr.Value(idx), arr.DataType()), nil
case *array.Timestamp:
return NewTimestampScalar(arr.Value(idx), arr.DataType()), nil
case *array.RunEndEncoded:
physicalIndex := encoded.FindPhysicalIndex(arr.Data(), arr.Offset()+idx)
value, err := GetScalar(arr.Values(), physicalIndex)
if err != nil {
return nil, err
}
return NewRunEndEncodedScalar(value, arr.DataType().(*arrow.RunEndEncodedType)), nil
case *array.Dictionary:
ty := arr.DataType().(*arrow.DictionaryType)
valid := arr.IsValid(idx)
scalar := &Dictionary{scalar: scalar{ty, valid}}
if valid {
index, err := MakeScalarParam(arr.GetValueIndex(idx), ty.IndexType)
if err != nil {
return nil, err
}
scalar.Value.Index = index
} else {
scalar.Value.Index = MakeNullScalar(ty.IndexType)
}
scalar.Value.Dict = arr.Dictionary()
scalar.Value.Dict.Retain()
return scalar, nil
case *array.SparseUnion:
var err error
typeCode := arr.TypeCode(idx)
children := make([]Scalar, arr.NumFields())
defer func() {
if err != nil {
for _, c := range children {
if c == nil {
break
}
if v, ok := c.(Releasable); ok {
v.Release()
}
}
}
}()
for i := range arr.UnionType().Fields() {
if children[i], err = GetScalar(arr.Field(i), idx); err != nil {
return nil, err
}
}
return NewSparseUnionScalar(children, typeCode, arr.UnionType().(*arrow.SparseUnionType)), nil
case *array.DenseUnion:
typeCode := arr.TypeCode(idx)
child := arr.Field(arr.ChildID(idx))
offset := arr.ValueOffset(idx)
value, err := GetScalar(child, int(offset))
if err != nil {
return nil, err
}
return NewDenseUnionScalar(value, typeCode, arr.UnionType().(*arrow.DenseUnionType)), nil
}
return nil, fmt.Errorf("cannot create scalar from array of type %s", arr.DataType())
}
// MakeArrayOfNull creates an array of size length which is all null of the given data type.
//
// Deprecated: Use array.MakeArrayOfNull
func MakeArrayOfNull(dt arrow.DataType, length int, mem memory.Allocator) arrow.Array {
var (
buffers = []*memory.Buffer{nil}
children []arrow.ArrayData
)
buffers[0] = memory.NewResizableBuffer(mem)
buffers[0].Resize(int(bitutil.BytesForBits(int64(length))))
defer buffers[0].Release()
switch t := dt.(type) {
case arrow.NestedType:
fieldList := t.Fields()
children = make([]arrow.ArrayData, len(fieldList))
for i, f := range fieldList {
arr := MakeArrayOfNull(f.Type, length, mem)
defer arr.Release()
children[i] = arr.Data()
}
case arrow.FixedWidthDataType:
buffers = append(buffers, memory.NewResizableBuffer(mem))
buffers[1].Resize(int(bitutil.BytesForBits(int64(t.BitWidth()))) * length)
defer buffers[1].Release()
case arrow.BinaryDataType:
buffers = append(buffers, memory.NewResizableBuffer(mem), nil)
buffers[1].Resize(arrow.Int32Traits.BytesRequired(length + 1))
defer buffers[1].Release()
}
data := array.NewData(dt, length, buffers, children, length, 0)
defer data.Release()
return array.MakeFromData(data)
}
// MakeArrayFromScalar returns an array filled with the scalar value repeated length times.
// Not yet implemented for nested types such as Struct, List, extension and so on.
func MakeArrayFromScalar(sc Scalar, length int, mem memory.Allocator) (arrow.Array, error) {
if !sc.IsValid() {
return MakeArrayOfNull(sc.DataType(), length, mem), nil
}
createOffsets := func(valLength int32) *memory.Buffer {
buffer := memory.NewResizableBuffer(mem)
buffer.Resize(arrow.Int32Traits.BytesRequired(length + 1))
out := arrow.Int32Traits.CastFromBytes(buffer.Bytes())
for i, offset := 0, int32(0); i < length+1; i, offset = i+1, offset+valLength {
out[i] = offset
}
return buffer
}
createBuffer := func(data []byte) *memory.Buffer {
buffer := memory.NewResizableBuffer(mem)
buffer.Resize(len(data) * length)
out := buffer.Bytes()
copy(out, data)
for j := len(data); j < len(out); j *= 2 {
copy(out[j:], out[:j])
}
return buffer
}
finishFixedWidth := func(data []byte) arrow.ArrayData {
buffer := createBuffer(data)
defer buffer.Release()
return array.NewData(sc.DataType(), length, []*memory.Buffer{nil, buffer}, nil, 0, 0)
}
switch s := sc.(type) {
case *Boolean:
data := memory.NewResizableBuffer(mem)
defer data.Release()
data.Resize(int(bitutil.BytesForBits(int64(length))))
c := byte(0x00)
if s.Value {
c = 0xFF
}
memory.Set(data.Bytes(), c)
return array.NewBoolean(length, data, nil, 0), nil
case BinaryScalar:
if s.DataType().ID() == arrow.FIXED_SIZE_BINARY {
data := finishFixedWidth(s.Data())
defer data.Release()
return array.MakeFromData(data), nil
}
valuesBuf := createBuffer(s.Data())
offsetsBuf := createOffsets(int32(len(s.Data())))
data := array.NewData(sc.DataType(), length, []*memory.Buffer{nil, offsetsBuf, valuesBuf}, nil, 0, 0)
defer func() {
valuesBuf.Release()
offsetsBuf.Release()
data.Release()
}()
return array.MakeFromData(data), nil
case *Decimal128:
data := finishFixedWidth(arrow.Decimal128Traits.CastToBytes([]decimal128.Num{s.Value}))
defer data.Release()
return array.MakeFromData(data), nil
case *Decimal256:
data := finishFixedWidth(arrow.Decimal256Traits.CastToBytes([]decimal256.Num{s.Value}))
defer data.Release()
return array.MakeFromData(data), nil
case PrimitiveScalar:
data := finishFixedWidth(s.Data())
defer data.Release()
return array.MakeFromData(data), nil
case *List:
values := make([]arrow.Array, length)
for i := range values {
values[i] = s.Value
}
valueArray, err := array.Concatenate(values, mem)
if err != nil {
return nil, err
}
defer valueArray.Release()
offsetsBuf := createOffsets(int32(s.Value.Len()))
defer offsetsBuf.Release()
data := array.NewData(s.DataType(), length, []*memory.Buffer{nil, offsetsBuf}, []arrow.ArrayData{valueArray.Data()}, 0, 0)
defer data.Release()
return array.MakeFromData(data), nil
case *FixedSizeList:
values := make([]arrow.Array, length)
for i := range values {
values[i] = s.Value
}
valueArray, err := array.Concatenate(values, mem)
if err != nil {
return nil, err
}
defer valueArray.Release()
data := array.NewData(s.DataType(), length, []*memory.Buffer{nil}, []arrow.ArrayData{valueArray.Data()}, 0, 0)
defer data.Release()
return array.MakeFromData(data), nil
case *Struct:
fields := make([]arrow.ArrayData, 0)
for _, v := range s.Value {
arr, err := MakeArrayFromScalar(v, length, mem)
if err != nil {
return nil, err
}
defer arr.Release()
fields = append(fields, arr.Data())
}
data := array.NewData(s.DataType(), length, []*memory.Buffer{nil}, fields, 0, 0)
defer data.Release()
return array.NewStructData(data), nil
case *Map:
structArr := s.GetList().(*array.Struct)
keys := make([]arrow.Array, length)
values := make([]arrow.Array, length)
for i := 0; i < length; i++ {
keys[i] = structArr.Field(0)
values[i] = structArr.Field(1)
}
keyArr, err := array.Concatenate(keys, mem)
if err != nil {
return nil, err
}
defer keyArr.Release()
valueArr, err := array.Concatenate(values, mem)
if err != nil {
return nil, err
}
defer valueArr.Release()
offsetsBuf := createOffsets(int32(structArr.Len()))
outStructArr := array.NewData(structArr.DataType(), keyArr.Len(), []*memory.Buffer{nil}, []arrow.ArrayData{keyArr.Data(), valueArr.Data()}, 0, 0)
data := array.NewData(s.DataType(), length, []*memory.Buffer{nil, offsetsBuf}, []arrow.ArrayData{outStructArr}, 0, 0)
defer func() {
offsetsBuf.Release()
outStructArr.Release()
data.Release()
}()
return array.MakeFromData(data), nil
case *RunEndEncoded:
dt := s.DataType().(*arrow.RunEndEncodedType)
var endBytes []byte
switch dt.RunEnds().ID() {
case arrow.INT16:
if length > math.MaxInt16 {
return nil, fmt.Errorf("%w: length overflows int16 run ends", arrow.ErrInvalid)
}
v := int16(length)
endBytes = (*[2]byte)(unsafe.Pointer(&v))[:]
case arrow.INT32:
if length > math.MaxInt32 {
return nil, fmt.Errorf("%w: final length overflows int32 run ends", arrow.ErrInvalid)
}
v := int32(length)
endBytes = (*[4]byte)(unsafe.Pointer(&v))[:]
case arrow.INT64:
v := int64(length)
endBytes = (*[8]byte)(unsafe.Pointer(&v))[:]
}
endBuf := createBuffer(endBytes)
defer endBuf.Release()
valueArr, err := MakeArrayFromScalar(s.Value, 1, mem)
if err != nil {
return nil, err
}
defer valueArr.Release()
runEndsData := array.NewData(dt.RunEnds(), 1, []*memory.Buffer{nil, endBuf}, nil, 0, 0)
defer runEndsData.Release()
finalData := array.NewData(s.DataType(), length, []*memory.Buffer{nil},
[]arrow.ArrayData{runEndsData, valueArr.Data()}, 0, 0)
defer finalData.Release()
return array.NewRunEndEncodedData(finalData), nil
default:
return nil, fmt.Errorf("array from scalar not yet implemented for type %s", sc.DataType())
}
}
func Hash(seed maphash.Seed, s Scalar) uint64 {
var h maphash.Hash
h.SetSeed(seed)
binary.Write(&h, endian.Native, arrow.HashType(seed, s.DataType()))
out := h.Sum64()
if !s.IsValid() {
return out
}
hash := func() {
out ^= h.Sum64()
h.Reset()
}
valueHash := func(v interface{}) uint64 {
switch v := v.(type) {
case int32:
h.Write((*[4]byte)(unsafe.Pointer(&v))[:])
case int64:
h.Write((*[8]byte)(unsafe.Pointer(&v))[:])
case arrow.Date32:
binary.Write(&h, endian.Native, uint32(v))
case arrow.Time32:
binary.Write(&h, endian.Native, uint32(v))
case arrow.MonthInterval:
binary.Write(&h, endian.Native, uint32(v))
case arrow.Duration:
binary.Write(&h, endian.Native, uint64(v))
case arrow.Date64:
binary.Write(&h, endian.Native, uint64(v))
case arrow.Time64:
binary.Write(&h, endian.Native, uint64(v))
case arrow.Timestamp:
binary.Write(&h, endian.Native, uint64(v))
case float16.Num:
binary.Write(&h, endian.Native, v.Uint16())
case decimal128.Num:
binary.Write(&h, endian.Native, v.LowBits())
hash()
binary.Write(&h, endian.Native, uint64(v.HighBits()))
case decimal256.Num:
arr := v.Array()
binary.Write(&h, endian.Native, arr[3])
hash()
binary.Write(&h, endian.Native, arr[2])
hash()
binary.Write(&h, endian.Native, arr[1])
hash()
binary.Write(&h, endian.Native, arr[0])
}
hash()
return out
}
h.Reset()
switch s := s.(type) {
case *Null:
case *Extension:
out ^= Hash(seed, s.Value)
case *DayTimeInterval:
return valueHash(s.Value.Days) & valueHash(s.Value.Milliseconds)
case *MonthDayNanoInterval:
return valueHash(s.Value.Months) & valueHash(s.Value.Days) & valueHash(s.Value.Nanoseconds)
case *SparseUnion:
// typecode is ignored when comparing for equality, so don't hash it either
out ^= Hash(seed, s.Value[s.ChildID])
case *DenseUnion:
// typecode is ignored when comparing equality, so don't hash it either
out ^= Hash(seed, s.Value)
case *Dictionary:
if s.Value.Index.IsValid() {
out ^= Hash(seed, s.Value.Index)
}
case *RunEndEncoded:
return Hash(seed, s.Value)
case PrimitiveScalar:
h.Write(s.Data())
hash()
case TemporalScalar:
return valueHash(s.value())
case ListScalar:
array.Hash(&h, s.GetList().Data())
hash()
case *Struct:
for _, c := range s.Value {
if c.IsValid() {
out ^= Hash(seed, c)
}
}
}
return out
}