arrow/scalar/parse.go (296 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package scalar import ( "math/bits" "strconv" "time" "github.com/aliyun/aliyun-odps-go-sdk/arrow" "github.com/aliyun/aliyun-odps-go-sdk/arrow/array" "github.com/aliyun/aliyun-odps-go-sdk/arrow/float16" "github.com/aliyun/aliyun-odps-go-sdk/arrow/memory" "golang.org/x/xerrors" ) // MakeScalarParam is for converting a value to a scalar when it requires a // parameterized data type such as a time type that needs units, or a fixed // size list which needs it's size. // // Will fall back to MakeScalar without the passed in type if not one of the // parameterized types. func MakeScalarParam(val interface{}, dt arrow.DataType) (Scalar, error) { switch v := val.(type) { case []byte: buf := memory.NewBufferBytes(v) defer buf.Release() switch dt.ID() { case arrow.BINARY: return NewBinaryScalar(buf, dt), nil case arrow.STRING: return NewStringScalarFromBuffer(buf), nil case arrow.FIXED_SIZE_BINARY: if buf.Len() == dt.(*arrow.FixedSizeBinaryType).ByteWidth { return NewFixedSizeBinaryScalar(buf, dt), nil } return nil, xerrors.Errorf("invalid scalar value of len %d for type %s", v, dt) } case *memory.Buffer: switch dt.ID() { case arrow.BINARY: return NewBinaryScalar(v, dt), nil case arrow.STRING: return NewStringScalarFromBuffer(v), nil case arrow.FIXED_SIZE_BINARY: if v.Len() == dt.(*arrow.FixedSizeBinaryType).ByteWidth { return NewFixedSizeBinaryScalar(v, dt), nil } return nil, xerrors.Errorf("invalid scalar value of len %d for type %s", v.Len(), dt) } case arrow.Time32: return NewTime32Scalar(v, dt), nil case arrow.Time64: return NewTime64Scalar(v, dt), nil case arrow.Timestamp: return NewTimestampScalar(v, dt), nil case array.Interface: switch dt.ID() { case arrow.LIST: if !arrow.TypeEqual(v.DataType(), dt.(*arrow.ListType).Elem()) { return nil, xerrors.Errorf("inconsistent type for list scalar array and data type") } return NewListScalar(v), nil case arrow.FIXED_SIZE_LIST: if !arrow.TypeEqual(v.DataType(), dt.(*arrow.FixedSizeListType).Elem()) { return nil, xerrors.Errorf("inconsistent type for list scalar array and data type") } return NewFixedSizeListScalarWithType(v, dt), nil case arrow.MAP: if !arrow.TypeEqual(dt.(*arrow.MapType).ValueType(), v.DataType()) { return nil, xerrors.Errorf("inconsistent type for map scalar type") } return NewMapScalar(v), nil } } return MakeScalar(val), nil } // MakeScalar creates a scalar of the passed in type via reflection. func MakeScalar(val interface{}) Scalar { switch v := val.(type) { case nil: return ScalarNull case bool: return NewBooleanScalar(v) case int8: return NewInt8Scalar(v) case uint8: return NewUint8Scalar(v) case int16: return NewInt16Scalar(v) case uint16: return NewUint16Scalar(v) case int32: return NewInt32Scalar(v) case uint32: return NewUint32Scalar(v) case int64: return NewInt64Scalar(v) case uint64: return NewUint64Scalar(v) case int: // determine size of an int on this system switch bits.UintSize { case 32: return NewInt32Scalar(int32(v)) case 64: return NewInt64Scalar(int64(v)) } case uint: // determine size of an int on this system switch bits.UintSize { case 32: return NewUint32Scalar(uint32(v)) case 64: return NewUint64Scalar(uint64(v)) } case []byte: buf := memory.NewBufferBytes(v) defer buf.Release() return NewBinaryScalar(buf, arrow.BinaryTypes.Binary) case string: return NewStringScalar(v) case arrow.Date32: return NewDate32Scalar(v) case arrow.Date64: return NewDate64Scalar(v) case float16.Num: return NewFloat16Scalar(v) case float32: return NewFloat32Scalar(v) case float64: return NewFloat64Scalar(v) case arrow.MonthInterval: return NewMonthIntervalScalar(v) case arrow.DayTimeInterval: return NewDayTimeIntervalScalar(v) case arrow.MonthDayNanoInterval: return NewMonthDayNanoIntervalScalar(v) case arrow.DataType: return MakeNullScalar(v) } panic(xerrors.Errorf("makescalar not implemented for type value %#v", val)) } // MakeIntegerScalar is a helper function for creating an integer scalar of a // given bitsize. func MakeIntegerScalar(v int64, bitsize int) (Scalar, error) { switch bitsize { case 8: return NewInt8Scalar(int8(v)), nil case 16: return NewInt16Scalar(int16(v)), nil case 32: return NewInt32Scalar(int32(v)), nil case 64: return NewInt64Scalar(int64(v)), nil } return nil, xerrors.Errorf("invalid bitsize for integer scalar: %d", bitsize) } // MakeUnsignedIntegerScalar is a helper function for creating an unsigned int // scalar of the specified bit width. func MakeUnsignedIntegerScalar(v uint64, bitsize int) (Scalar, error) { switch bitsize { case 8: return NewUint8Scalar(uint8(v)), nil case 16: return NewUint16Scalar(uint16(v)), nil case 32: return NewUint32Scalar(uint32(v)), nil case 64: return NewUint64Scalar(uint64(v)), nil } return nil, xerrors.Errorf("invalid bitsize for uint scalar: %d", bitsize) } // ParseScalar parses a string to create a scalar of the passed in type. Currently // does not support any nested types such as Structs or Lists. func ParseScalar(dt arrow.DataType, val string) (Scalar, error) { switch dt.ID() { case arrow.STRING: return NewStringScalar(val), nil case arrow.BINARY: buf := memory.NewBufferBytes([]byte(val)) defer buf.Release() return NewBinaryScalar(buf, dt), nil case arrow.FIXED_SIZE_BINARY: if len(val) != dt.(*arrow.FixedSizeBinaryType).ByteWidth { return nil, xerrors.Errorf("invalid value %s for scalar of type %s", val, dt) } buf := memory.NewBufferBytes([]byte(val)) defer buf.Release() return NewFixedSizeBinaryScalar(buf, dt), nil case arrow.BOOL: val, err := strconv.ParseBool(val) if err != nil { return nil, err } return NewBooleanScalar(val), nil case arrow.INT8, arrow.INT16, arrow.INT32, arrow.INT64: width := dt.(arrow.FixedWidthDataType).BitWidth() val, err := strconv.ParseInt(val, 0, width) if err != nil { return nil, err } return MakeIntegerScalar(val, width) case arrow.UINT8, arrow.UINT16, arrow.UINT32, arrow.UINT64: width := dt.(arrow.FixedWidthDataType).BitWidth() val, err := strconv.ParseUint(val, 0, width) if err != nil { return nil, err } return MakeUnsignedIntegerScalar(val, width) case arrow.FLOAT16: val, err := strconv.ParseFloat(val, 32) if err != nil { return nil, err } return NewFloat16ScalarFromFloat32(float32(val)), nil case arrow.FLOAT32, arrow.FLOAT64: width := dt.(arrow.FixedWidthDataType).BitWidth() val, err := strconv.ParseFloat(val, width) if err != nil { return nil, err } switch width { case 32: return NewFloat32Scalar(float32(val)), nil case 64: return NewFloat64Scalar(float64(val)), nil } case arrow.TIMESTAMP: format := "2006-01-02" if val[len(val)-1] == 'Z' { val = val[:len(val)-1] } switch { case len(val) == 13: format += string(val[10]) + "15" case len(val) == 16: format += string(val[10]) + "15:04" case len(val) >= 19: format += string(val[10]) + "15:04:05.999999999" } out, err := time.ParseInLocation(format, val, time.UTC) if err != nil { return nil, err } value := arrow.Timestamp(ConvertTimestampValue(arrow.Nanosecond, dt.(*arrow.TimestampType).Unit, out.UnixNano())) return NewTimestampScalar(value, dt), nil case arrow.DURATION: value, err := time.ParseDuration(val) if err != nil { return nil, err } unit := dt.(*arrow.DurationType).Unit var out arrow.Duration switch unit { case arrow.Nanosecond: out = arrow.Duration(value.Nanoseconds()) case arrow.Microsecond: out = arrow.Duration(value.Microseconds()) case arrow.Millisecond: out = arrow.Duration(value.Milliseconds()) case arrow.Second: out = arrow.Duration(value.Seconds()) } return NewDurationScalar(out, dt), nil case arrow.DATE32, arrow.DATE64: out, err := time.ParseInLocation("2006-01-02", val, time.UTC) if err != nil { return nil, err } if dt.ID() == arrow.DATE32 { return NewDate32Scalar(arrow.Date32(out.Unix() / int64((time.Hour * 24).Seconds()))), nil } else { return NewDate64Scalar(arrow.Date64(out.Unix() * 1000)), nil } case arrow.TIME32: var ( out time.Time err error ) switch { case len(val) == 5: out, err = time.ParseInLocation("15:04", val, time.UTC) default: out, err = time.ParseInLocation("15:04:05.999", val, time.UTC) } if err != nil { return nil, err } t := out.Sub(time.Date(0, 1, 1, 0, 0, 0, 0, time.UTC)) if dt.(*arrow.Time32Type).Unit == arrow.Second { return NewTime32Scalar(arrow.Time32(t.Seconds()), dt), nil } return NewTime32Scalar(arrow.Time32(t.Milliseconds()), dt), nil case arrow.TIME64: var ( out time.Time err error ) switch { case len(val) == 5: out, err = time.ParseInLocation("15:04", val, time.UTC) default: out, err = time.ParseInLocation("15:04:05.999999999", val, time.UTC) } if err != nil { return nil, err } t := out.Sub(time.Date(0, 1, 1, 0, 0, 0, 0, time.UTC)) if dt.(*arrow.Time64Type).Unit == arrow.Microsecond { return NewTime64Scalar(arrow.Time64(t.Microseconds()), dt), nil } return NewTime64Scalar(arrow.Time64(t.Nanoseconds()), dt), nil } return nil, xerrors.Errorf("parsing of scalar for type %s not implemented", dt) }