memstore/common/data_value.go (833 lines of code) (raw):
// Copyright (c) 2017-2018 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package common
import (
"encoding/hex"
"encoding/json"
"fmt"
"github.com/uber/aresdb/utils"
"math"
"reflect"
"strconv"
"strings"
"unsafe"
)
// NullDataValue is a global data value that stands a null value where the newly added
// columns haven't received any data.
var NullDataValue = DataValue{}
// SizeOfGeoPoint is the size of GeoPointGo in memory
const SizeOfGeoPoint = unsafe.Sizeof(GeoPointGo{})
// ZeroLengthArrayFlag is the value to represent 0 length array value
// used in offset part of OffsetLength vector
// when length part is non-zero value, the offset is real offset in memory
// when length is 0 and offset is 0, the array value is invalid (by default)
// when length is 0 and offset is ZeroLengthArrayFlag, this is a 0 length array value
const ZeroLengthArrayFlag = math.MaxUint32
// CompareFunc represents compare function
type CompareFunc func(a, b unsafe.Pointer) int
// CompareBool compares boolean value
func CompareBool(a, b bool) int {
if a != b {
if a {
return 1
}
return -1
}
return 0
}
// CompareInt8 compares int8 value
func CompareInt8(a, b unsafe.Pointer) int {
return int(*(*int8)(a)) - int(*(*int8)(b))
}
// CompareUint8 compares uint8 value
func CompareUint8(a, b unsafe.Pointer) int {
return int(*(*uint8)(a)) - int(*(*uint8)(b))
}
// CompareInt16 compares int16 value
func CompareInt16(a, b unsafe.Pointer) int {
return int(*(*int16)(a)) - int(*(*int16)(b))
}
// CompareUint16 compares uint16 value
func CompareUint16(a, b unsafe.Pointer) int {
return int(*(*uint16)(a)) - int(*(*uint16)(b))
}
// CompareInt32 compares int32 value
func CompareInt32(a, b unsafe.Pointer) int {
return int(*(*int32)(a)) - int(*(*int32)(b))
}
// CompareUint32 compares uint32 value
func CompareUint32(a, b unsafe.Pointer) int {
return int(*(*uint32)(a)) - int(*(*uint32)(b))
}
// CompareInt64 compares int64 value
func CompareInt64(a, b unsafe.Pointer) int {
return int(*(*int64)(a)) - int(*(*int64)(b))
}
// CompareFloat32 compares float32 value
func CompareFloat32(a, b unsafe.Pointer) int {
fa := *(*float32)(a)
fb := *(*float32)(b)
if fa < fb {
return -1
} else if fa == fb {
return 0
} else {
return 1
}
}
// CompareUUID compare UUID values
func CompareUUID(a, b unsafe.Pointer) int {
uuid1 := *(*[2]uint64)(a)
uuid2 := *(*[2]uint64)(b)
var res int
if res = int(uuid1[0] - uuid2[0]); res == 0 {
res = int(uuid1[1] - uuid2[1])
}
return res
}
// CompareGeoPoint compare GeoPoint Values
func CompareGeoPoint(a, b unsafe.Pointer) int {
point1 := *(*[2]float32)(a)
point2 := *(*[2]float32)(b)
var val float32
if val = point1[0] - point2[0]; val == 0 {
val = point1[1] - point2[1]
}
if val == 0 {
return 0
} else if val > 0 {
return 1
}
return -1
}
// CompareArray compare array values, the main purpose of this comparsion is for equal comparison
// larger/less comparision may not be accurate
func CompareArray(dataType DataType, a, b unsafe.Pointer) int {
len1 := int(*(*uint32)(a))
len2 := int(*(*uint32)(b))
if len1 != len2 {
return len1 - len2
}
if len1 == 0 {
return 0
}
bytes := CalculateListElementBytes(dataType, len1)
// skip 4 bytes for length
return utils.MemCmp(a, b, 4, bytes)
}
// ArrayLengthCompare compare
func ArrayLengthCompare(v1, v2 *DataValue) int {
if !v1.Valid && v2.Valid {
return 1
} else if v1.Valid && !v2.Valid {
return -1
} else if !v1.Valid && !v2.Valid {
return 0
}
len1 := int(*(*uint32)(v1.OtherVal))
len2 := int(*(*uint32)(v2.OtherVal))
return len1 - len2
}
// GetCompareFunc get the compare function for specific data type
func GetCompareFunc(dataType DataType) CompareFunc {
switch dataType {
case Int8:
return CompareInt8
case Uint8, SmallEnum:
return CompareUint8
case Int16:
return CompareInt16
case Uint16, BigEnum:
return CompareUint16
case Int32:
return CompareInt32
case Uint32:
return CompareUint32
case Int64:
return CompareInt64
case Float32:
return CompareFloat32
case UUID:
return CompareUUID
case GeoPoint:
return CompareGeoPoint
}
return nil
}
// GoDataValue represents a value backed in golang memory
type GoDataValue interface {
// GetBytes returns number of bytes copied in golang memory for this value
GetBytes() int
// GetSerBytes return the number of bytes required for serialize this value
GetSerBytes() int
Write(writer *utils.StreamDataWriter) error
Read(reader *utils.StreamDataReader) error
}
// DataValueIterator is a iterator of data value
type DataValueIterator interface {
// read the current data value
read() DataValue
// advance iterator
next()
// whether iterator is done
done() bool
}
// DataValue is the wrapper to encapsulate validity, bool value and other value type
// into a single struct to make it easier for value comparison.
type DataValue struct {
// Used for golang vector party
GoVal GoDataValue
OtherVal unsafe.Pointer
DataType DataType
CmpFunc CompareFunc
Valid bool
IsBool bool
BoolVal bool
}
// GeoPointGo represents GeoPoint Golang Type
type GeoPointGo [2]float32
// GeoShapeGo represents GeoShape Golang Type
type GeoShapeGo struct {
Polygons [][]GeoPointGo
}
// Array value representation in Go for UpsertBatch
type ArrayValue struct {
// item data type
DataType DataType
// item list
Items []interface{}
}
// Compare compares two value wrapper.
func (v1 DataValue) Compare(v2 DataValue) int {
if !v1.Valid || !v2.Valid {
return CompareBool(v1.Valid, v2.Valid)
}
if v1.IsBool {
return CompareBool(v1.BoolVal, v2.BoolVal)
}
if v1.CmpFunc != nil {
return v1.CmpFunc(v1.OtherVal, v2.OtherVal)
}
if IsArrayType(v1.DataType) {
return CompareArray(v1.DataType, v1.OtherVal, v2.OtherVal)
}
return 0
}
// ConvertToHumanReadable convert DataValue to meaningful golang data types
func (v1 DataValue) ConvertToHumanReadable(dataType DataType) interface{} {
if !v1.Valid {
return nil
}
if v1.IsBool {
return v1.BoolVal
}
switch dataType {
case Int8:
return *(*int8)(v1.OtherVal)
case Uint8, SmallEnum:
return *(*uint8)(v1.OtherVal)
case Int16:
return *(*int16)(v1.OtherVal)
case Uint16, BigEnum:
return *(*uint16)(v1.OtherVal)
case Int32:
return *(*int32)(v1.OtherVal)
case Uint32:
return *(*uint32)(v1.OtherVal)
case Int64:
return *(*int64)(v1.OtherVal)
case Float32:
return *(*float32)(v1.OtherVal)
case UUID:
bys := *(*[16]byte)(v1.OtherVal)
uuidStr := hex.EncodeToString(bys[:])
if len(uuidStr) == 32 {
return fmt.Sprintf("%s-%s-%s-%s-%s",
uuidStr[:8],
uuidStr[8:12],
uuidStr[12:16],
uuidStr[16:20],
uuidStr[20:])
}
case GeoPoint:
latLngs := *(*[2]float32)(v1.OtherVal)
// in string format, lng goes first and lat second
return fmt.Sprintf("Point(%.4f,%.4f)", latLngs[1], latLngs[0])
case GeoShape:
shape, ok := (v1.GoVal).(*GeoShapeGo)
if ok {
polygons := make([]string, len(shape.Polygons))
for i, points := range shape.Polygons {
pointsStrs := make([]string, len(points))
for j, point := range points {
// in string format, lng goes first and lat second
// https://en.wikipedia.org/wiki/Well-known_text_representation_of_geometry
pointsStrs[j] = fmt.Sprintf("%.4f+%.4f", point[1], point[0])
}
polygons[i] = fmt.Sprintf("(%s)", strings.Join(pointsStrs, ","))
}
return fmt.Sprintf("Polygon(%s)", strings.Join(polygons, ","))
}
default:
if IsArrayType(dataType) {
reader := NewArrayValueReader(dataType, v1.OtherVal)
num := reader.GetLength()
arrVal := make([]interface{}, num)
for i := 0; i < int(num); i++ {
if reader.IsItemValid(i) {
var dataValue DataValue
if reader.itemType == Bool {
dataValue = DataValue{
DataType: reader.itemType,
Valid: true,
IsBool: true,
BoolVal: reader.GetBool(i),
}
} else {
dataValue = DataValue{
DataType: reader.itemType,
Valid: true,
OtherVal: reader.Get(i),
}
}
arrVal[i] = dataValue.ConvertToHumanReadable(reader.itemType)
} else {
arrVal[i] = nil
}
}
bytes, _ := json.Marshal(arrVal)
return string(bytes)
}
}
return nil
}
// ValueFromString converts raw string value to actual value given input data type.
func ValueFromString(str string, dataType DataType) (val DataValue, err error) {
val.DataType = dataType
if len(str) == 0 || str == "null" {
return
}
var b bool
var i int64
var f float64
var ui uint64
switch dataType {
case Bool:
val.IsBool = true
b, err = strconv.ParseBool(str)
if err != nil {
err = utils.StackError(err, "")
return
}
val.Valid = true
val.BoolVal = b
return
case Int8:
i, err = strconv.ParseInt(str, 10, 8)
if err != nil {
err = utils.StackError(err, "")
return
}
// We need to convert it from i64 to i8 since strconv.ParseXXX
// always returns the largest bit size value.
i8 := int8(i)
val.Valid = true
val.OtherVal = unsafe.Pointer(&i8)
return
case Uint8, SmallEnum:
ui, err = strconv.ParseUint(str, 10, 8)
if err != nil {
err = utils.StackError(err, "")
return
}
ui8 := uint8(ui)
val.Valid = true
val.OtherVal = unsafe.Pointer(&ui8)
return
case Int16:
i, err = strconv.ParseInt(str, 10, 16)
if err != nil {
err = utils.StackError(err, "")
return
}
i16 := int16(i)
val.Valid = true
val.OtherVal = unsafe.Pointer(&i16)
return
case Uint16, BigEnum:
ui, err = strconv.ParseUint(str, 10, 16)
if err != nil {
err = utils.StackError(err, "")
return
}
ui16 := uint16(ui)
val.Valid = true
val.OtherVal = unsafe.Pointer(&ui16)
return
case Int32:
i, err = strconv.ParseInt(str, 10, 32)
if err != nil {
err = utils.StackError(err, "")
return
}
i32 := int32(i)
val.Valid = true
val.OtherVal = unsafe.Pointer(&i32)
return
case Uint32:
ui, err = strconv.ParseUint(str, 10, 32)
if err != nil {
err = utils.StackError(err, "")
return
}
ui32 := uint32(ui)
val.Valid = true
val.OtherVal = unsafe.Pointer(&ui32)
return
case Int64:
i, err = strconv.ParseInt(str, 10, 64)
if err != nil {
err = utils.StackError(err, "")
return
}
val.Valid = true
val.OtherVal = unsafe.Pointer(&i)
return
case Float32:
f, err = strconv.ParseFloat(str, 32)
if err != nil {
err = utils.StackError(err, "")
return
}
f32 := float32(f)
val.Valid = true
val.OtherVal = unsafe.Pointer(&f32)
return
case UUID:
var uuidBytes []byte
if strings.HasPrefix(str, "0x") {
str = str[2:]
}
uuidBytes, err = hex.DecodeString(strings.Replace(str, "-", "", -1))
if err != nil || len(uuidBytes) != 16 {
err = utils.StackError(err, "Failed to decode uuid string: %s", str)
return
}
val.Valid = true
val.OtherVal = unsafe.Pointer(&uuidBytes[0])
return
case GeoPoint:
var point [2]float32
point, err = GeoPointFromString(str)
if err != nil {
err = utils.StackError(err, "Failed to read geopoint string: %s", str)
return
}
val.Valid = true
val.OtherVal = unsafe.Pointer(&point[0])
return
default:
if IsArrayType(dataType) {
var value interface{}
value, err = ArrayValueFromString(str, GetElementDataType(dataType))
if err != nil {
err = utils.StackError(err, "Failed to read array string: %s", str)
return
}
arrayValue := value.(*ArrayValue)
bytes := arrayValue.GetSerBytes()
buffer := make([]byte, bytes)
valueWriter := utils.NewBufferWriter(buffer)
err = arrayValue.Write(&valueWriter)
if err != nil {
err = utils.StackError(err, "Unable to write array value to buffer: %s", str)
return
}
val.Valid = true
val.OtherVal = unsafe.Pointer(&buffer[0])
return
}
err = utils.StackError(nil, "Unsupported data type value %#x", dataType)
return
}
}
// GetBytes implements GoDataValue interface
func (gs *GeoShapeGo) GetBytes() int {
numBytes := 0
for _, polygon := range gs.Polygons {
numPoints := len(polygon)
numBytes += numPoints * int(SizeOfGeoPoint)
}
return numBytes
}
// GetSerBytes implements GoDataValue interface
func (gs *GeoShapeGo) GetSerBytes() int {
totalBytes := 0
// 1. numPolygons (uint32)
totalBytes += 4
for _, polygon := range gs.Polygons {
numPoints := len(polygon)
// numPoints (uint32)
totalBytes += 4
// 8 bytes per point [2]float32
totalBytes += numPoints * 8
}
return totalBytes
}
// Read implements Read interface for GoDataValue
func (gs *GeoShapeGo) Read(dataReader *utils.StreamDataReader) error {
numPolygons, err := dataReader.ReadUint32()
if err != nil {
return err
}
gs.Polygons = make([][]GeoPointGo, numPolygons)
for i := 0; i < int(numPolygons); i++ {
numPoints, err := dataReader.ReadUint32()
if err != nil {
return err
}
polygon := make([]GeoPointGo, numPoints)
allBytes := make([]byte, numPoints*8)
err = dataReader.Read(allBytes)
if err != nil {
return err
}
offset := 0
for j := 0; j < int(numPoints); j++ {
lat := *(*float32)(unsafe.Pointer(&allBytes[offset]))
lng := *(*float32)(unsafe.Pointer(&allBytes[offset+4]))
point := GeoPointGo{lat, lng}
polygon[j] = point
offset += 8
}
gs.Polygons[i] = polygon
}
return dataReader.ReadPadding(int(dataReader.GetBytesRead()), 4)
}
// Write implements Read interface for GoDataValue
func (gs *GeoShapeGo) Write(dataWriter *utils.StreamDataWriter) error {
numPolygons := len(gs.Polygons)
err := dataWriter.WriteUint32(uint32(numPolygons))
if err != nil {
return err
}
for _, polygon := range gs.Polygons {
numPoints := len(polygon)
err = dataWriter.WriteUint32(uint32(numPoints))
if err != nil {
return err
}
for _, point := range polygon {
err = dataWriter.WriteFloat32(point[0])
if err != nil {
return err
}
err = dataWriter.WriteFloat32(point[1])
if err != nil {
return err
}
}
}
return dataWriter.WritePadding(int(dataWriter.GetBytesWritten()), 4)
}
// GetLength return item numbers for the array value
func (av *ArrayValue) GetLength() int {
return len(av.Items)
}
// AddItem add new item into array
func (av *ArrayValue) AddItem(item interface{}) {
av.Items = append(av.Items, item)
}
// GetSerBytes return the bytes will be used in upsertbatch serialized format
func (av *ArrayValue) GetSerBytes() int {
if av.GetLength() == 0 {
// we always align to 8 bytes in upsertbatch
return 8
}
return CalculateListElementBytes(av.DataType, av.GetLength())
}
// NewArrayValue create a new ArrayValue instance
func NewArrayValue(dataType DataType) *ArrayValue {
return &ArrayValue{
DataType: dataType,
Items: make([]interface{}, 0),
}
}
// Write serialize data into writer
// Serialized Array data format:
// number of items: 4 bytes
// item values: per item bytes * number of items, align to byte
// item validity: 1 bit * number of items
// final align to 8 bytes
func (av *ArrayValue) Write(writer *utils.BufferWriter) error {
num := av.GetLength()
err := writer.AppendUint32(uint32(num))
if err != nil {
return err
}
// add value for each item
for _, val := range av.Items {
switch av.DataType {
case Bool:
if val == nil {
err = writer.AppendBool(false)
} else {
err = writer.AppendBool(val.(bool))
}
case Int8:
if val == nil {
err = writer.AppendInt8(0)
} else {
err = writer.AppendInt8(val.(int8))
}
case Uint8, SmallEnum:
if val == nil {
err = writer.AppendUint8(0)
} else {
err = writer.AppendUint8(val.(uint8))
}
case Int16:
if val == nil {
err = writer.AppendInt16(0)
} else {
err = writer.AppendInt16(val.(int16))
}
case Uint16, BigEnum:
if val == nil {
err = writer.AppendUint16(0)
} else {
err = writer.AppendUint16(val.(uint16))
}
case Int32:
if val == nil {
err = writer.AppendInt32(0)
} else {
err = writer.AppendInt32(val.(int32))
}
case Uint32:
if val == nil {
err = writer.AppendUint32(0)
} else {
err = writer.AppendUint32(val.(uint32))
}
case Float32:
if val == nil {
err = writer.AppendFloat32(0)
} else {
err = writer.AppendFloat32(val.(float32))
}
case Int64:
if val == nil {
err = writer.AppendInt64(0)
} else {
err = writer.AppendInt64(val.(int64))
}
case UUID:
if val == nil {
err = writer.AppendUint64(0)
if err == nil {
err = writer.AppendUint64(0)
}
} else {
err := writer.AppendUint64(val.([2]uint64)[0])
if err == nil {
err = writer.AppendUint64(val.([2]uint64)[1])
}
}
case GeoPoint:
if val == nil {
err = writer.AppendFloat32(0)
if err == nil {
err = writer.AppendFloat32(0)
}
} else {
err := writer.AppendFloat32(val.([2]float32)[0])
if err == nil {
err = writer.AppendFloat32(val.([2]float32)[1])
}
}
}
if err != nil {
return err
}
}
writer.AlignBytes(1)
// add validity bit for each item
for _, val := range av.Items {
if val == nil {
err = writer.AppendBool(false)
} else {
err = writer.AppendBool(true)
}
if err != nil {
return err
}
}
writer.AlignBytes(8)
return nil
}
// ArrayValueReader is an aux class to reader item data from bytes buffer
type ArrayValueReader struct {
itemType DataType
value unsafe.Pointer
length int
}
// NewArrayValueReader is to create ArrayValueReader to read from upsertbatch, which includes the item number
func NewArrayValueReader(dataType DataType, value unsafe.Pointer) *ArrayValueReader {
if value == nil {
return &ArrayValueReader{
itemType: GetElementDataType(dataType),
value: nil,
length: int(0),
}
}
return &ArrayValueReader{
itemType: GetElementDataType(dataType),
value: unsafe.Pointer(uintptr(value) + 4),
length: int(*((*uint32)(value))),
}
}
// GetLength return item numbers inside the array
func (reader *ArrayValueReader) GetLength() int {
return reader.length
}
// GetBytes returns the bytes counts this value occopies
func (reader *ArrayValueReader) GetBytes() int {
return CalculateListElementBytes(reader.itemType, reader.length)
}
// GetBool returns bool value for Bool item type at index
func (reader *ArrayValueReader) GetBool(index int) bool {
if index < 0 || index >= reader.length {
return false
}
val := *(*byte)(unsafe.Pointer(uintptr(reader.value) + uintptr(index/8)))
return val&(0x1<<uint8(index%8)) != 0x0
}
// Get returns the buffer pointer for the index-th item
func (reader *ArrayValueReader) Get(index int) unsafe.Pointer {
if index < 0 || index >= reader.length {
return nil
}
return unsafe.Pointer(uintptr(reader.value) + uintptr(index*DataTypeBytes(reader.itemType)))
}
// IsItemValid check if the item in index-th place is valid or not
func (reader *ArrayValueReader) IsItemValid(index int) bool {
nilOffset := CalculateListNilOffset(reader.itemType, int(reader.length))
nilByte := *(*byte)(unsafe.Pointer(uintptr(reader.value) + uintptr(nilOffset) + uintptr(index/8)))
return nilByte&(0x1<<uint8(index%8)) != 0x0
}
// CalculateListElementBytes returns the total size in bytes needs to be allocated for a list type column for a single
// row along with the validity vector start.
func CalculateListElementBytes(dataType DataType, length int) int {
if length == 0 {
return 0
}
// there is a item number at beginning
// element_number_bits => 8 * 4 (4 bytes)
// DataTypeBits(dataType) * length => element_bits, round to byte
// 1 * length => null bits, round to byte
// (element_number_bits + element_bits + null_bits + 63) / 64 => round by 64 bits (8 bytes)
return (4*8 + (DataTypeBits(dataType)*length+7)/8*8 + (length+7)/8*8 + 63) / 64 * 8
}
func CalculateListNilOffset(dataType DataType, length int) int {
return (DataTypeBits(dataType)*length + 7) / 8
}
// GetDataValue returns the DataValue for the given column value.
func GetDataValue(col interface{}, columnIDInSchema int, columnType string) (DataValue, error) {
dataType := DataTypeFromString(columnType)
if dataStr, ok := col.(string); ok {
return ValueFromString(dataStr, dataType)
}
val := DataValue{
DataType: dataType,
}
var ok bool
switch dataType {
case Bool:
var b bool
val.IsBool = true
if b, ok = col.(bool); !ok {
return val, fmt.Errorf("Invalid bool value, col:%d, val:%v, type:%s",
columnIDInSchema, col, reflect.TypeOf(col))
}
val.Valid = true
val.BoolVal = b
case Int8:
var i8 int8
if i8, ok = col.(int8); !ok {
t := reflect.TypeOf(i8)
if reflect.TypeOf(col).ConvertibleTo(t) {
i8 = int8(reflect.ValueOf(col).Convert(t).Int())
} else {
return val, fmt.Errorf("Invalid int8 value, col:%d, val:%v, type:%s",
columnIDInSchema, col, reflect.TypeOf(col))
}
}
val.Valid = true
val.OtherVal = unsafe.Pointer(&i8)
case Uint8, SmallEnum:
var ui8 uint8
if ui8, ok = col.(uint8); !ok {
t := reflect.TypeOf(ui8)
if reflect.TypeOf(col).ConvertibleTo(t) {
ui8 = uint8(reflect.ValueOf(col).Convert(t).Uint())
} else {
return val, fmt.Errorf("Invalid uint8 value, col:%d, val:%v, type:%s",
columnIDInSchema, col, reflect.TypeOf(col))
}
}
val.Valid = true
val.OtherVal = unsafe.Pointer(&ui8)
case Int16:
var i16 int16
if i16, ok = col.(int16); !ok {
t := reflect.TypeOf(i16)
if reflect.TypeOf(col).ConvertibleTo(t) {
i16 = int16(reflect.ValueOf(col).Convert(t).Int())
} else {
return val, fmt.Errorf("Invalid int16 value, col:%d, val:%v, type:%s",
columnIDInSchema, col, reflect.TypeOf(col))
}
}
val.Valid = true
val.OtherVal = unsafe.Pointer(&i16)
case Uint16, BigEnum:
var ui16 uint16
if ui16, ok = col.(uint16); !ok {
t := reflect.TypeOf(ui16)
if reflect.TypeOf(col).ConvertibleTo(t) {
ui16 = uint16(reflect.ValueOf(col).Convert(t).Uint())
} else {
return val, fmt.Errorf("Invalid uint16 value, col:%d, val:%v, type:%s",
columnIDInSchema, col, reflect.TypeOf(col))
}
}
val.Valid = true
val.OtherVal = unsafe.Pointer(&ui16)
case Int32:
var i32 int32
if i32, ok = col.(int32); !ok {
t := reflect.TypeOf(i32)
if reflect.TypeOf(col).ConvertibleTo(t) {
i32 = int32(reflect.ValueOf(col).Convert(t).Int())
} else {
return val, fmt.Errorf("Invalid int32 value, col:%d, val:%v, type:%s",
columnIDInSchema, col, reflect.TypeOf(col))
}
}
val.Valid = true
val.OtherVal = unsafe.Pointer(&i32)
case Uint32:
var ui32 uint32
if ui32, ok = col.(uint32); !ok {
t := reflect.TypeOf(ui32)
if reflect.TypeOf(col).ConvertibleTo(t) {
ui32 = uint32(reflect.ValueOf(col).Convert(t).Uint())
} else {
return val, fmt.Errorf("Invalid uint32 value, col:%d, val:%v, type:%s",
columnIDInSchema, col, reflect.TypeOf(col))
}
}
val.Valid = true
val.OtherVal = unsafe.Pointer(&ui32)
case Int64:
var i64 int64
if i64, ok = col.(int64); !ok {
t := reflect.TypeOf(i64)
if reflect.TypeOf(col).ConvertibleTo(t) {
i64 = int64(reflect.ValueOf(col).Convert(t).Int())
} else {
return val, fmt.Errorf("Invalid int64 value, col:%d, val:%v, type:%s",
columnIDInSchema, col, reflect.TypeOf(col))
}
}
val.Valid = true
val.OtherVal = unsafe.Pointer(&i64)
case Float32:
var f32 float32
if f32, ok = col.(float32); !ok {
t := reflect.TypeOf(f32)
if reflect.TypeOf(col).ConvertibleTo(t) {
f32 = float32(reflect.ValueOf(col).Convert(t).Float())
} else {
return val, fmt.Errorf("Invalid float32 value, col:%d, val:%v, type:%s",
columnIDInSchema, col, reflect.TypeOf(col))
}
}
val.Valid = true
val.OtherVal = unsafe.Pointer(&f32)
default:
return val, fmt.Errorf("Invalid data type, col:%d, val:%v, type:%d",
columnIDInSchema, col, dataType)
}
return val, nil
}
type baseDataValueIterator struct {
curIdx int
size int
}
func (iter *baseDataValueIterator) read() DataValue {
return NullDataValue
}
func (iter *baseDataValueIterator) next() {
iter.curIdx += 1
}
func (iter *baseDataValueIterator) done() bool {
return iter.curIdx >= iter.size
}
type primaryKeyDataValueIterator struct {
baseDataValueIterator
primaryKeyCols []int
batchReader BatchReader
row int
}
// NewPrimaryKeyDataValueIterator creates DataValueIterator from upsert batch, primary key cols and row number
func NewPrimaryKeyDataValueIterator(batchReader BatchReader, row int, primaryKeyCols []int) DataValueIterator{
return &primaryKeyDataValueIterator{
baseDataValueIterator: baseDataValueIterator{
size: len(primaryKeyCols),
},
primaryKeyCols: primaryKeyCols,
batchReader: batchReader,
row: row,
}
}
func (iter *primaryKeyDataValueIterator) read() DataValue {
if !iter.done() {
col := iter.primaryKeyCols[iter.curIdx]
return iter.batchReader.GetDataValue(iter.row, col)
}
return NullDataValue
}
type sliceDataValueIterator struct {
baseDataValueIterator
values []DataValue
}
// NewSliceDataValueIterator creates DataValueIterator from slice of DataValue
func NewSliceDataValueIterator(values []DataValue) DataValueIterator {
return &sliceDataValueIterator{
baseDataValueIterator: baseDataValueIterator{
size: len(values),
},
values: values,
}
}
func (iter *sliceDataValueIterator) read() DataValue {
if !iter.done() {
return iter.values[iter.curIdx]
}
return NullDataValue
}